From 858990a7d13d44aacd1f96658d3da8120f49f1f0 Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Tue, 30 Jun 2026 09:27:29 -0400 Subject: [PATCH] Add KeyPartitioned distribution bridge --- .../physical_optimizer/ensure_requirements.rs | 2 +- .../physical_optimizer/projection_pushdown.rs | 10 +- datafusion/physical-expr/src/partitioning.rs | 146 ++++++++++++------ .../enforce_distribution.rs | 25 ++- .../enforce_sorting/sort_pushdown.rs | 14 +- .../src/output_requirements.rs | 9 +- .../physical-plan/src/aggregates/mod.rs | 2 +- .../physical-plan/src/joins/hash_join/exec.rs | 4 +- .../src/joins/sort_merge_join/exec.rs | 4 +- .../src/joins/symmetric_hash_join.rs | 4 +- .../src/sorts/partitioned_topk.rs | 2 +- .../src/windows/bounded_window_agg_exec.rs | 2 +- .../src/windows/window_agg_exec.rs | 2 +- 13 files changed, 153 insertions(+), 73 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/ensure_requirements.rs b/datafusion/core/tests/physical_optimizer/ensure_requirements.rs index d106daf4a152a..3fdbc9d312151 100644 --- a/datafusion/core/tests/physical_optimizer/ensure_requirements.rs +++ b/datafusion/core/tests/physical_optimizer/ensure_requirements.rs @@ -1113,7 +1113,7 @@ fn test_idempotent_window_over_multi_partition() { ]) .unwrap(); - let dist = Distribution::HashPartitioned(vec![Arc::new(Column::new("a", 0))]); + let dist = Distribution::KeyPartitioned(vec![Arc::new(Column::new("a", 0))]); let window_like: Arc = Arc::new(MockReqExec::new(source, dist, Some(ord))); diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 9f83f070d0286..24ec633d48d23 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -724,7 +724,7 @@ fn test_output_req_after_projection() -> Result<()> { ] .into(), )), - Distribution::HashPartitioned(vec![ + Distribution::KeyPartitioned(vec![ Arc::new(Column::new("a", 0)), Arc::new(Column::new("b", 1)), ]), @@ -746,7 +746,7 @@ fn test_output_req_after_projection() -> Result<()> { actual, @r" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] - OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], dist_by=HashPartitioned[[a@0, b@1]]) + OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], dist_by=KeyPartitioned[[a@0, b@1]]) DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false " ); @@ -762,7 +762,7 @@ fn test_output_req_after_projection() -> Result<()> { assert_snapshot!( actual, @r" - OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=HashPartitioned[[new_a@1, b@2]]) + OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=KeyPartitioned[[new_a@1, b@2]]) DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, a@0 as new_a, b], file_type=csv, has_header=false " ); @@ -797,7 +797,7 @@ fn test_output_req_after_projection() -> Result<()> { Arc::new(Column::new("new_a", 1)), Arc::new(Column::new("b", 2)), ]; - if let Distribution::HashPartitioned(vec) = after_optimize + if let Distribution::KeyPartitioned(vec) = after_optimize .downcast_ref::() .unwrap() .required_input_distribution()[0] @@ -809,7 +809,7 @@ fn test_output_req_after_projection() -> Result<()> { .all(|(actual, expected)| actual.eq(&expected)) ); } else { - panic!("Expected HashPartitioned distribution!"); + panic!("Expected KeyPartitioned distribution!"); }; Ok(()) diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 2e0aaaf3fb4b7..b662207e383a0 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -473,6 +473,10 @@ impl Partitioning { /// Returns how this [`Partitioning`] satisfies the partitioning scheme mandated /// by the `required` [`Distribution`]. + #[expect( + deprecated, + reason = "HashPartitioned is accepted during the KeyPartitioned migration" + )] pub fn satisfaction( &self, required: &Distribution, @@ -484,11 +488,14 @@ impl Partitioning { Distribution::SinglePartition if self.partition_count() == 1 => { PartitioningSatisfaction::Exact } - // When partition count is 1, hash requirement is satisfied. - Distribution::HashPartitioned(_) if self.partition_count() == 1 => { + // When partition count is 1, key partitioning is satisfied. + Distribution::HashPartitioned(_) | Distribution::KeyPartitioned(_) + if self.partition_count() == 1 => + { PartitioningSatisfaction::Exact } - Distribution::HashPartitioned(required_exprs) => match self { + Distribution::HashPartitioned(required_exprs) + | Distribution::KeyPartitioned(required_exprs) => match self { // Here we do not check the partition count for hash partitioning and assumes the partition count // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, // then we need to have the partition count and hash functions validation. @@ -593,11 +600,19 @@ pub enum Distribution { UnspecifiedDistribution, /// A single partition is required SinglePartition, + /// Deprecated historical name for [`Distribution::KeyPartitioned`]. + /// See for details. + #[deprecated(since = "55.0.0", note = "Use Distribution::KeyPartitioned")] + HashPartitioned(Vec>), /// Requires children to be distributed in such a way that the same /// values of the keys end up in the same partition - HashPartitioned(Vec>), + KeyPartitioned(Vec>), } +#[expect( + deprecated, + reason = "HashPartitioned is accepted during the KeyPartitioned migration" +)] impl Distribution { /// Creates a `Partitioning` that satisfies this `Distribution` pub fn create_partitioning(self, partition_count: usize) -> Partitioning { @@ -606,13 +621,17 @@ impl Distribution { Partitioning::UnknownPartitioning(partition_count) } Distribution::SinglePartition => Partitioning::UnknownPartitioning(1), - Distribution::HashPartitioned(expr) => { + Distribution::HashPartitioned(expr) | Distribution::KeyPartitioned(expr) => { Partitioning::Hash(expr, partition_count) } } } } +#[expect( + deprecated, + reason = "HashPartitioned display is preserved during the KeyPartitioned migration" +)] impl Display for Distribution { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -621,6 +640,9 @@ impl Display for Distribution { Distribution::HashPartitioned(exprs) => { write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs)) } + Distribution::KeyPartitioned(exprs) => { + write!(f, "KeyPartitioned[{}])", format_physical_expr_list(exprs)) + } } } } @@ -689,11 +711,11 @@ mod tests { Partitioning::Hash(self.cols(indices), partition_count) } - fn hash_distribution( + fn key_distribution( &self, indices: impl IntoIterator, ) -> Distribution { - Distribution::HashPartitioned(self.cols(indices)) + Distribution::KeyPartitioned(self.cols(indices)) } fn range_sort_expr( @@ -746,6 +768,10 @@ mod tests { } #[test] + #[expect( + deprecated, + reason = "test intentionally covers deprecated HashPartitioned compatibility" + )] fn partitioning_satisfy_distribution() -> Result<()> { let fixture = PartitioningTestFixture::new(vec![ ("column_1", DataType::Int64), @@ -755,7 +781,8 @@ mod tests { let distribution_types = vec![ Distribution::UnspecifiedDistribution, Distribution::SinglePartition, - fixture.hash_distribution([0, 1]), + Distribution::HashPartitioned(fixture.cols([0, 1])), + fixture.key_distribution([0, 1]), ]; let single_partition = Partitioning::UnknownPartitioning(1); @@ -790,7 +817,7 @@ mod tests { Distribution::SinglePartition => { assert_eq!(result, (true, false, false, false, false)) } - Distribution::HashPartitioned(_) => { + Distribution::HashPartitioned(_) | Distribution::KeyPartitioned(_) => { assert_eq!(result, (true, false, false, true, false)) } } @@ -799,43 +826,66 @@ mod tests { Ok(()) } + #[test] + #[expect( + deprecated, + reason = "test intentionally covers deprecated HashPartitioned compatibility" + )] + fn deprecated_hash_partitioned_matches_key_partitioned() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let partitioning = fixture.hash_partitioning([0, 1], 4); + let hash_distribution = Distribution::HashPartitioned(fixture.cols([0, 1])); + let key_distribution = fixture.key_distribution([0, 1]); + + assert_eq!( + partitioning.satisfaction(&hash_distribution, &fixture.eq_properties, false), + partitioning.satisfaction(&key_distribution, &fixture.eq_properties, false) + ); + assert_eq!( + hash_distribution.create_partitioning(4), + key_distribution.create_partitioning(4) + ); + + Ok(()) + } + #[test] fn test_partitioning_satisfy_by_subset() -> Result<()> { let fixture = PartitioningTestFixture::int64(&["a", "b", "c"])?; let test_cases = vec![ ( - "Hash([a]) vs Hash([a, b])", + "KeyPartitioned([a, b]) satisfied by Hash([a])", fixture.hash_partitioning([0], 4), - fixture.hash_distribution([0, 1]), + fixture.key_distribution([0, 1]), PartitioningSatisfaction::Subset, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([a]) vs Hash([a, b, c])", + "KeyPartitioned([a, b, c]) satisfied by Hash([a])", fixture.hash_partitioning([0], 4), - fixture.hash_distribution([0, 1, 2]), + fixture.key_distribution([0, 1, 2]), PartitioningSatisfaction::Subset, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([a, b]) vs Hash([a, b, c])", + "KeyPartitioned([a, b, c]) satisfied by Hash([a, b])", fixture.hash_partitioning([0, 1], 4), - fixture.hash_distribution([0, 1, 2]), + fixture.key_distribution([0, 1, 2]), PartitioningSatisfaction::Subset, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([b]) vs Hash([a, b, c])", + "KeyPartitioned([a, b, c]) satisfied by Hash([b])", fixture.hash_partitioning([1], 4), - fixture.hash_distribution([0, 1, 2]), + fixture.key_distribution([0, 1, 2]), PartitioningSatisfaction::Subset, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([b, a]) vs Hash([a, b, c])", + "KeyPartitioned([a, b, c]) satisfied by Hash([b, a])", fixture.hash_partitioning([1, 0], 4), - fixture.hash_distribution([0, 1, 2]), + fixture.key_distribution([0, 1, 2]), PartitioningSatisfaction::Subset, PartitioningSatisfaction::NotSatisfied, ), @@ -866,23 +916,23 @@ mod tests { let test_cases = vec![ ( - "Hash([a, b]) vs Hash([a])", + "KeyPartitioned([a]) satisfied by Hash([a, b])", fixture.hash_partitioning([0, 1], 4), - fixture.hash_distribution([0]), + fixture.key_distribution([0]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([a, b, c]) vs Hash([a])", + "KeyPartitioned([a]) satisfied by Hash([a, b, c])", fixture.hash_partitioning([0, 1, 2], 4), - fixture.hash_distribution([0]), + fixture.key_distribution([0]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([a, b, c]) vs Hash([a, b])", + "KeyPartitioned([a, b]) satisfied by Hash([a, b, c])", fixture.hash_partitioning([0, 1, 2], 4), - fixture.hash_distribution([0, 1]), + fixture.key_distribution([0, 1]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), @@ -912,9 +962,9 @@ mod tests { let fixture = PartitioningTestFixture::int64(&["a", "b", "c"])?; let test_cases = vec![( - "Partial overlap: Hash([a, c]) vs Hash([a, b])", + "Partial overlap: KeyPartitioned([a, b]) satisfied by Hash([a, c])", fixture.hash_partitioning([0, 2], 4), - fixture.hash_distribution([0, 1]), + fixture.key_distribution([0, 1]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, )]; @@ -944,16 +994,16 @@ mod tests { let test_cases = vec![ ( - "Hash([a]) vs Hash([b, c])", + "KeyPartitioned([b, c]) satisfied by Hash([a])", fixture.hash_partitioning([0], 4), - fixture.hash_distribution([1, 2]), + fixture.key_distribution([1, 2]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([a, b]) vs Hash([c])", + "KeyPartitioned([c]) satisfied by Hash([a, b])", fixture.hash_partitioning([0, 1], 4), - fixture.hash_distribution([2]), + fixture.key_distribution([2]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), @@ -984,16 +1034,16 @@ mod tests { let test_cases = vec![ ( - "Hash([a, b]) vs Hash([a, b])", + "KeyPartitioned([a, b]) satisfied by Hash([a, b])", fixture.hash_partitioning([0, 1], 4), - fixture.hash_distribution([0, 1]), + fixture.key_distribution([0, 1]), PartitioningSatisfaction::Exact, PartitioningSatisfaction::Exact, ), ( - "Hash([a]) vs Hash([a])", + "KeyPartitioned([a]) satisfied by Hash([a])", fixture.hash_partitioning([0], 4), - fixture.hash_distribution([0]), + fixture.key_distribution([0]), PartitioningSatisfaction::Exact, PartitioningSatisfaction::Exact, ), @@ -1025,23 +1075,23 @@ mod tests { let test_cases = vec![ ( - "Hash([unknown]) vs Hash([a, b])", + "KeyPartitioned([a, b]) satisfied by Hash([unknown])", Partitioning::Hash(vec![Arc::clone(&unknown)], 4), - fixture.hash_distribution([0, 1]), + fixture.key_distribution([0, 1]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([a, b]) vs Hash([unknown])", + "KeyPartitioned([unknown]) satisfied by Hash([a, b])", fixture.hash_partitioning([0, 1], 4), - Distribution::HashPartitioned(vec![Arc::clone(&unknown)]), + Distribution::KeyPartitioned(vec![Arc::clone(&unknown)]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([unknown]) vs Hash([unknown])", + "KeyPartitioned([unknown]) satisfied by Hash([unknown])", Partitioning::Hash(vec![Arc::clone(&unknown)], 4), - Distribution::HashPartitioned(vec![Arc::clone(&unknown)]), + Distribution::KeyPartitioned(vec![Arc::clone(&unknown)]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), @@ -1072,23 +1122,23 @@ mod tests { let test_cases = vec![ ( - "Hash([]) vs Hash([a])", + "KeyPartitioned([a]) satisfied by Hash([])", Partitioning::Hash(vec![], 4), - fixture.hash_distribution([0]), + fixture.key_distribution([0]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([a]) vs Hash([])", + "KeyPartitioned([]) satisfied by Hash([a])", fixture.hash_partitioning([0], 4), - Distribution::HashPartitioned(vec![]), + Distribution::KeyPartitioned(vec![]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( - "Hash([]) vs Hash([])", + "KeyPartitioned([]) satisfied by Hash([])", Partitioning::Hash(vec![], 4), - Distribution::HashPartitioned(vec![]), + Distribution::KeyPartitioned(vec![]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), @@ -1410,7 +1460,7 @@ mod tests { let fixture = PartitioningTestFixture::int64(&["a", "b"])?; let range_partitioning = fixture.range_partitioning([0, 1], vec![int_split_point([10, 100])]); - let required = fixture.hash_distribution([0, 1]); + let required = fixture.key_distribution([0, 1]); assert_eq!( range_partitioning.satisfaction(&required, &fixture.eq_properties, false), diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 76cb59a305a5f..305d811fa2156 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -730,7 +730,7 @@ fn add_hash_on_top( return Ok(input); } - let dist = Distribution::HashPartitioned(hash_exprs); + let dist = Distribution::KeyPartitioned(hash_exprs); let satisfaction = input.plan.output_partitioning().satisfaction( &dist, input.plan.equivalence_properties(), @@ -1001,6 +1001,10 @@ struct RepartitionRequirementStatus { /// hash_necessary: true /// } /// ``` +#[expect( + deprecated, + reason = "HashPartitioned is accepted during the KeyPartitioned migration" +)] fn get_repartition_requirement_status( plan: &Arc, batch_size: usize, @@ -1024,7 +1028,10 @@ fn get_repartition_requirement_status( Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size), Precision::Absent => true, }; - let is_hash = matches!(requirement, Distribution::HashPartitioned(_)); + let is_hash = matches!( + requirement, + Distribution::HashPartitioned(_) | Distribution::KeyPartitioned(_) + ); // Hash re-partitioning is necessary when the input has more than one // partitions: let multi_partitions = child.output_partitioning().partition_count() > 1; @@ -1066,6 +1073,10 @@ fn get_repartition_requirement_status( /// This function is intended to be used in a bottom up traversal, as it /// can first repartition (or newly partition) at the datasources -- these /// source partitions may be later repartitioned with additional data exchange operators. +#[expect( + deprecated, + reason = "HashPartitioned is accepted during the KeyPartitioned migration" +)] pub fn ensure_distribution( dist_context: DistributionContext, config: &ConfigOptions, @@ -1206,7 +1217,8 @@ pub fn ensure_distribution( // Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash // partitioning on all group columns including __grouping_id to ensure partial // aggregates from different partitions are correctly combined. - let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs) + let requires_grouping_id = matches!(&requirement, + Distribution::HashPartitioned(exprs) | Distribution::KeyPartitioned(exprs) if exprs.iter().any(|expr| { (expr.as_ref() as &dyn Any) .downcast_ref::() @@ -1244,7 +1256,8 @@ pub fn ensure_distribution( Distribution::SinglePartition => { child = add_merge_on_top(child, removed_fetch); } - Distribution::HashPartitioned(exprs) => { + Distribution::HashPartitioned(exprs) + | Distribution::KeyPartitioned(exprs) => { // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. if hash_necessary { @@ -1311,7 +1324,9 @@ pub fn ensure_distribution( // no ordering requirement match requirement { // Operator requires specific distribution. - Distribution::SinglePartition | Distribution::HashPartitioned(_) => { + Distribution::SinglePartition + | Distribution::HashPartitioned(_) + | Distribution::KeyPartitioned(_) => { // If the parent doesn't maintain input order, preserving // ordering is pointless. However, if it does maintain // input order, we keep order-preserving variants so diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs index 261cf701c870f..43ec3eabbfd2f 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs @@ -113,13 +113,23 @@ fn min_fetch(f1: Option, f2: Option) -> Option { /// Returns the stricter of two distribution requirements. /// `SinglePartition` is the strictest. +#[expect( + deprecated, + reason = "HashPartitioned is accepted during the KeyPartitioned migration" +)] fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution { match (a, b) { (Distribution::SinglePartition, _) | (_, Distribution::SinglePartition) => { Distribution::SinglePartition } - (Distribution::HashPartitioned(_), _) => a.clone(), - (_, Distribution::HashPartitioned(_)) => b.clone(), + (Distribution::HashPartitioned(exprs), _) + | (Distribution::KeyPartitioned(exprs), _) => { + Distribution::KeyPartitioned(exprs.clone()) + } + (_, Distribution::HashPartitioned(exprs)) + | (_, Distribution::KeyPartitioned(exprs)) => { + Distribution::KeyPartitioned(exprs.clone()) + } _ => Distribution::UnspecifiedDistribution, } } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index fb91ae46a2a08..4391c5ff6c981 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -247,6 +247,10 @@ impl ExecutionPlan for OutputRequirementExec { args.compute_child_statistics(&self.input, args.partition()) } + #[expect( + deprecated, + reason = "HashPartitioned is accepted during the KeyPartitioned migration" + )] fn try_swapping_with_projection( &self, projection: &ProjectionExec, @@ -272,7 +276,8 @@ impl ExecutionPlan for OutputRequirementExec { } let dist_req = match &self.required_input_distribution()[0] { - Distribution::HashPartitioned(exprs) => { + Distribution::HashPartitioned(exprs) + | Distribution::KeyPartitioned(exprs) => { let mut updated_exprs = vec![]; for expr in exprs { let Some(new_expr) = update_expr(expr, projection.expr(), false)? @@ -281,7 +286,7 @@ impl ExecutionPlan for OutputRequirementExec { }; updated_exprs.push(new_expr); } - Distribution::HashPartitioned(updated_exprs) + Distribution::KeyPartitioned(updated_exprs) } dist => dist.clone(), }; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4f5b893578d74..25dbbe234b290 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1709,7 +1709,7 @@ impl ExecutionPlan for AggregateExec { vec![Distribution::UnspecifiedDistribution] } AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => { - vec![Distribution::HashPartitioned(self.group_by.input_exprs())] + vec![Distribution::KeyPartitioned(self.group_by.input_exprs())] } AggregateMode::Final | AggregateMode::Single => { vec![Distribution::SinglePartition] diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a5da391ee7635..50a90b0f54633 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1253,8 +1253,8 @@ impl ExecutionPlan for HashJoinExec { .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); vec![ - Distribution::HashPartitioned(left_expr), - Distribution::HashPartitioned(right_expr), + Distribution::KeyPartitioned(left_expr), + Distribution::KeyPartitioned(right_expr), ] } PartitionMode::Auto => vec![ diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index a8d25fd002b76..2dc7065eee04e 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -430,8 +430,8 @@ impl ExecutionPlan for SortMergeJoinExec { .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); vec![ - Distribution::HashPartitioned(left_expr), - Distribution::HashPartitioned(right_expr), + Distribution::KeyPartitioned(left_expr), + Distribution::KeyPartitioned(right_expr), ] } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index a56ad1712aa8e..52a1aa056d244 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -434,8 +434,8 @@ impl ExecutionPlan for SymmetricHashJoinExec { .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _)) .unzip(); vec![ - Distribution::HashPartitioned(left_expr), - Distribution::HashPartitioned(right_expr), + Distribution::KeyPartitioned(left_expr), + Distribution::KeyPartitioned(right_expr), ] } StreamJoinPartitionMode::SinglePartition => { diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index 1040abcb75ea9..e09147ed90274 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -304,7 +304,7 @@ impl ExecutionPlan for PartitionedTopKExec { .iter() .map(|e| Arc::clone(&e.expr)) .collect(); - vec![Distribution::HashPartitioned(partition_exprs)] + vec![Distribution::KeyPartitioned(partition_exprs)] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index b0a0330441e94..a9d580f4c687d 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -336,7 +336,7 @@ impl ExecutionPlan for BoundedWindowAggExec { debug!("No partition defined for BoundedWindowAggExec!!!"); vec![Distribution::SinglePartition] } else { - vec![Distribution::HashPartitioned(self.partition_keys().clone())] + vec![Distribution::KeyPartitioned(self.partition_keys().clone())] } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index f4bc40cf35d5a..72474c6a55483 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -244,7 +244,7 @@ impl ExecutionPlan for WindowAggExec { if self.partition_keys().is_empty() { vec![Distribution::SinglePartition] } else { - vec![Distribution::HashPartitioned(self.partition_keys())] + vec![Distribution::KeyPartitioned(self.partition_keys())] } }