Skip to content

[SPARK-55551][SQL] Improve BroadcastHashJoinExec output partitioning#54335

Open
peter-toth wants to merge 2 commits intoapache:masterfrom
peter-toth:SPARK-55551-improve-broadcasthashjoinexec-output-partitioning
Open

[SPARK-55551][SQL] Improve BroadcastHashJoinExec output partitioning#54335
peter-toth wants to merge 2 commits intoapache:masterfrom
peter-toth:SPARK-55551-improve-broadcasthashjoinexec-output-partitioning

Conversation

@peter-toth
Copy link
Contributor

What changes were proposed in this pull request?

This is a minor refector of BroadcastHashJoinExec.outputPartitioning to:

  • simlify the logic and
  • make it future proof by using Partitioning with Expression instead of HashPartitioningLike.

Why are the changes needed?

Code cleanup and add support for future partitionings that implement Expression but not HashPartitioningLike. (Like KeyedPartitioning is in #54330.)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

HashPartitioning(Seq(l3), 1)))),
right = DummySparkPlan())
expected = PartitioningCollection(Seq(
PartitioningCollection(Seq(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping a PartitioningCollection in a PartitioningCollection has no benefit.

@@ -96,28 +97,23 @@ case class BroadcastHashJoinExec private(
}

// Expands the given partitioning collection recursively.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revise this method description a little more according to your code change? The method is slightly changed to handle Partitioning instead of PartitionCollection.

Copy link
Contributor Author

@peter-toth peter-toth Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I fixed the comments in 3d75c6b.

case other => other
val expandedPartitioning = expandOutputPartitioning(streamedPlan.outputPartitioning)
expandedPartitioning match {
case Nil => UnknownPartitioning(streamedPlan.outputPartitioning.numPartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic looks new to me. Is this an improvement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I added a comment in 3d75c6b. This could only happen if there was an empty PartitioningCollection in streamedPlan.outputPartitioning. UnknownPartitioning is always a valid alternative as it satisfies only distributions with the lowest requirements.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring itself looks reasonable to me. I have a few comments.

@peter-toth peter-toth force-pushed the SPARK-55551-improve-broadcasthashjoinexec-output-partitioning branch from c67158d to 3d75c6b Compare February 17, 2026 08:59
@peter-toth
Copy link
Contributor Author

Thanks @dongjoon-hyun for taking a look. Cc @cloud-fan as well.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants