diff --git a/datafusion/core/tests/physical_optimizer/ensure_requirements.rs b/datafusion/core/tests/physical_optimizer/ensure_requirements.rs index d106daf4a152a..3fdbc9d312151 100644 --- a/datafusion/core/tests/physical_optimizer/ensure_requirements.rs +++ b/datafusion/core/tests/physical_optimizer/ensure_requirements.rs @@ -1113,7 +1113,7 @@ fn test_idempotent_window_over_multi_partition() { ]) .unwrap(); - let dist = Distribution::HashPartitioned(vec![Arc::new(Column::new("a", 0))]); + let dist = Distribution::KeyPartitioned(vec![Arc::new(Column::new("a", 0))]); let window_like: Arc = Arc::new(MockReqExec::new(source, dist, Some(ord))); diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 9f83f070d0286..24ec633d48d23 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -724,7 +724,7 @@ fn test_output_req_after_projection() -> Result<()> { ] .into(), )), - Distribution::HashPartitioned(vec![ + Distribution::KeyPartitioned(vec![ Arc::new(Column::new("a", 0)), Arc::new(Column::new("b", 1)), ]), @@ -746,7 +746,7 @@ fn test_output_req_after_projection() -> Result<()> { actual, @r" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] - OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], dist_by=HashPartitioned[[a@0, b@1]]) + OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], dist_by=KeyPartitioned[[a@0, b@1]]) DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false " ); @@ -762,7 +762,7 @@ fn test_output_req_after_projection() -> Result<()> { assert_snapshot!( actual, @r" - OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=HashPartitioned[[new_a@1, b@2]]) + OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=KeyPartitioned[[new_a@1, b@2]]) DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, a@0 as new_a, b], file_type=csv, has_header=false " ); @@ -797,7 +797,7 @@ fn test_output_req_after_projection() -> Result<()> { Arc::new(Column::new("new_a", 1)), Arc::new(Column::new("b", 2)), ]; - if let Distribution::HashPartitioned(vec) = after_optimize + if let Distribution::KeyPartitioned(vec) = after_optimize .downcast_ref::() .unwrap() .required_input_distribution()[0] @@ -809,7 +809,7 @@ fn test_output_req_after_projection() -> Result<()> { .all(|(actual, expected)| actual.eq(&expected)) ); } else { - panic!("Expected HashPartitioned distribution!"); + panic!("Expected KeyPartitioned distribution!"); }; Ok(()) diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 2e0aaaf3fb4b7..25777ebeba7b7 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -485,10 +485,10 @@ impl Partitioning { PartitioningSatisfaction::Exact } // When partition count is 1, hash requirement is satisfied. - Distribution::HashPartitioned(_) if self.partition_count() == 1 => { + Distribution::KeyPartitioned(_) if self.partition_count() == 1 => { PartitioningSatisfaction::Exact } - Distribution::HashPartitioned(required_exprs) => match self { + Distribution::KeyPartitioned(required_exprs) => match self { // Here we do not check the partition count for hash partitioning and assumes the partition count // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, // then we need to have the partition count and hash functions validation. @@ -595,7 +595,7 @@ pub enum Distribution { SinglePartition, /// Requires children to be distributed in such a way that the same /// values of the keys end up in the same partition - HashPartitioned(Vec>), + KeyPartitioned(Vec>), } impl Distribution { @@ -606,7 +606,7 @@ impl Distribution { Partitioning::UnknownPartitioning(partition_count) } Distribution::SinglePartition => Partitioning::UnknownPartitioning(1), - Distribution::HashPartitioned(expr) => { + Distribution::KeyPartitioned(expr) => { Partitioning::Hash(expr, partition_count) } } @@ -618,8 +618,8 @@ impl Display for Distribution { match self { Distribution::UnspecifiedDistribution => write!(f, "Unspecified"), Distribution::SinglePartition => write!(f, "SinglePartition"), - Distribution::HashPartitioned(exprs) => { - write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs)) + Distribution::KeyPartitioned(exprs) => { + write!(f, "KeyPartitioned[{}])", format_physical_expr_list(exprs)) } } } @@ -693,7 +693,7 @@ mod tests { &self, indices: impl IntoIterator, ) -> Distribution { - Distribution::HashPartitioned(self.cols(indices)) + Distribution::KeyPartitioned(self.cols(indices)) } fn range_sort_expr( @@ -790,7 +790,7 @@ mod tests { Distribution::SinglePartition => { assert_eq!(result, (true, false, false, false, false)) } - Distribution::HashPartitioned(_) => { + Distribution::KeyPartitioned(_) => { assert_eq!(result, (true, false, false, true, false)) } } @@ -1034,14 +1034,14 @@ mod tests { ( "Hash([a, b]) vs Hash([unknown])", fixture.hash_partitioning([0, 1], 4), - Distribution::HashPartitioned(vec![Arc::clone(&unknown)]), + Distribution::KeyPartitioned(vec![Arc::clone(&unknown)]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( "Hash([unknown]) vs Hash([unknown])", Partitioning::Hash(vec![Arc::clone(&unknown)], 4), - Distribution::HashPartitioned(vec![Arc::clone(&unknown)]), + Distribution::KeyPartitioned(vec![Arc::clone(&unknown)]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), @@ -1081,14 +1081,14 @@ mod tests { ( "Hash([a]) vs Hash([])", fixture.hash_partitioning([0], 4), - Distribution::HashPartitioned(vec![]), + Distribution::KeyPartitioned(vec![]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), ( "Hash([]) vs Hash([])", Partitioning::Hash(vec![], 4), - Distribution::HashPartitioned(vec![]), + Distribution::KeyPartitioned(vec![]), PartitioningSatisfaction::NotSatisfied, PartitioningSatisfaction::NotSatisfied, ), diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 76cb59a305a5f..eb0ea2f93a572 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -730,7 +730,7 @@ fn add_hash_on_top( return Ok(input); } - let dist = Distribution::HashPartitioned(hash_exprs); + let dist = Distribution::KeyPartitioned(hash_exprs); let satisfaction = input.plan.output_partitioning().satisfaction( &dist, input.plan.equivalence_properties(), @@ -1024,7 +1024,7 @@ fn get_repartition_requirement_status( Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size), Precision::Absent => true, }; - let is_hash = matches!(requirement, Distribution::HashPartitioned(_)); + let is_hash = matches!(requirement, Distribution::KeyPartitioned(_)); // Hash re-partitioning is necessary when the input has more than one // partitions: let multi_partitions = child.output_partitioning().partition_count() > 1; @@ -1206,7 +1206,7 @@ pub fn ensure_distribution( // Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash // partitioning on all group columns including __grouping_id to ensure partial // aggregates from different partitions are correctly combined. - let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs) + let requires_grouping_id = matches!(&requirement, Distribution::KeyPartitioned(exprs) if exprs.iter().any(|expr| { (expr.as_ref() as &dyn Any) .downcast_ref::() @@ -1244,7 +1244,7 @@ pub fn ensure_distribution( Distribution::SinglePartition => { child = add_merge_on_top(child, removed_fetch); } - Distribution::HashPartitioned(exprs) => { + Distribution::KeyPartitioned(exprs) => { // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. if hash_necessary { @@ -1311,7 +1311,7 @@ pub fn ensure_distribution( // no ordering requirement match requirement { // Operator requires specific distribution. - Distribution::SinglePartition | Distribution::HashPartitioned(_) => { + Distribution::SinglePartition | Distribution::KeyPartitioned(_) => { // If the parent doesn't maintain input order, preserving // ordering is pointless. However, if it does maintain // input order, we keep order-preserving variants so diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs index 261cf701c870f..8df74d1a38d59 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_sorting/sort_pushdown.rs @@ -118,8 +118,8 @@ fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution { (Distribution::SinglePartition, _) | (_, Distribution::SinglePartition) => { Distribution::SinglePartition } - (Distribution::HashPartitioned(_), _) => a.clone(), - (_, Distribution::HashPartitioned(_)) => b.clone(), + (Distribution::KeyPartitioned(_), _) => a.clone(), + (_, Distribution::KeyPartitioned(_)) => b.clone(), _ => Distribution::UnspecifiedDistribution, } } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index fb91ae46a2a08..c9102bec87455 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -272,7 +272,7 @@ impl ExecutionPlan for OutputRequirementExec { } let dist_req = match &self.required_input_distribution()[0] { - Distribution::HashPartitioned(exprs) => { + Distribution::KeyPartitioned(exprs) => { let mut updated_exprs = vec![]; for expr in exprs { let Some(new_expr) = update_expr(expr, projection.expr(), false)? @@ -281,7 +281,7 @@ impl ExecutionPlan for OutputRequirementExec { }; updated_exprs.push(new_expr); } - Distribution::HashPartitioned(updated_exprs) + Distribution::KeyPartitioned(updated_exprs) } dist => dist.clone(), }; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4f5b893578d74..25dbbe234b290 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1709,7 +1709,7 @@ impl ExecutionPlan for AggregateExec { vec![Distribution::UnspecifiedDistribution] } AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => { - vec![Distribution::HashPartitioned(self.group_by.input_exprs())] + vec![Distribution::KeyPartitioned(self.group_by.input_exprs())] } AggregateMode::Final | AggregateMode::Single => { vec![Distribution::SinglePartition] diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a5da391ee7635..50a90b0f54633 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1253,8 +1253,8 @@ impl ExecutionPlan for HashJoinExec { .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); vec![ - Distribution::HashPartitioned(left_expr), - Distribution::HashPartitioned(right_expr), + Distribution::KeyPartitioned(left_expr), + Distribution::KeyPartitioned(right_expr), ] } PartitionMode::Auto => vec![ diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index a8d25fd002b76..2dc7065eee04e 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -430,8 +430,8 @@ impl ExecutionPlan for SortMergeJoinExec { .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); vec![ - Distribution::HashPartitioned(left_expr), - Distribution::HashPartitioned(right_expr), + Distribution::KeyPartitioned(left_expr), + Distribution::KeyPartitioned(right_expr), ] } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index a56ad1712aa8e..52a1aa056d244 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -434,8 +434,8 @@ impl ExecutionPlan for SymmetricHashJoinExec { .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _)) .unzip(); vec![ - Distribution::HashPartitioned(left_expr), - Distribution::HashPartitioned(right_expr), + Distribution::KeyPartitioned(left_expr), + Distribution::KeyPartitioned(right_expr), ] } StreamJoinPartitionMode::SinglePartition => { diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index 1040abcb75ea9..e09147ed90274 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -304,7 +304,7 @@ impl ExecutionPlan for PartitionedTopKExec { .iter() .map(|e| Arc::clone(&e.expr)) .collect(); - vec![Distribution::HashPartitioned(partition_exprs)] + vec![Distribution::KeyPartitioned(partition_exprs)] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index b0a0330441e94..a9d580f4c687d 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -336,7 +336,7 @@ impl ExecutionPlan for BoundedWindowAggExec { debug!("No partition defined for BoundedWindowAggExec!!!"); vec![Distribution::SinglePartition] } else { - vec![Distribution::HashPartitioned(self.partition_keys().clone())] + vec![Distribution::KeyPartitioned(self.partition_keys().clone())] } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index f4bc40cf35d5a..72474c6a55483 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -244,7 +244,7 @@ impl ExecutionPlan for WindowAggExec { if self.partition_keys().is_empty() { vec![Distribution::SinglePartition] } else { - vec![Distribution::HashPartitioned(self.partition_keys())] + vec![Distribution::KeyPartitioned(self.partition_keys())] } }