Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ fn test_idempotent_window_over_multi_partition() {
])
.unwrap();

let dist = Distribution::HashPartitioned(vec![Arc::new(Column::new("a", 0))]);
let dist = Distribution::KeyPartitioned(vec![Arc::new(Column::new("a", 0))]);
let window_like: Arc<dyn ExecutionPlan> =
Arc::new(MockReqExec::new(source, dist, Some(ord)));

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ fn test_output_req_after_projection() -> Result<()> {
]
.into(),
)),
Distribution::HashPartitioned(vec![
Distribution::KeyPartitioned(vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
]),
Expand All @@ -746,7 +746,7 @@ fn test_output_req_after_projection() -> Result<()> {
actual,
@r"
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], dist_by=HashPartitioned[[a@0, b@1]])
OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], dist_by=KeyPartitioned[[a@0, b@1]])
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
"
);
Expand All @@ -762,7 +762,7 @@ fn test_output_req_after_projection() -> Result<()> {
assert_snapshot!(
actual,
@r"
OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=HashPartitioned[[new_a@1, b@2]])
OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=KeyPartitioned[[new_a@1, b@2]])
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, a@0 as new_a, b], file_type=csv, has_header=false
"
);
Expand Down Expand Up @@ -797,7 +797,7 @@ fn test_output_req_after_projection() -> Result<()> {
Arc::new(Column::new("new_a", 1)),
Arc::new(Column::new("b", 2)),
];
if let Distribution::HashPartitioned(vec) = after_optimize
if let Distribution::KeyPartitioned(vec) = after_optimize
.downcast_ref::<OutputRequirementExec>()
.unwrap()
.required_input_distribution()[0]
Expand All @@ -809,7 +809,7 @@ fn test_output_req_after_projection() -> Result<()> {
.all(|(actual, expected)| actual.eq(&expected))
);
} else {
panic!("Expected HashPartitioned distribution!");
panic!("Expected KeyPartitioned distribution!");
};

Ok(())
Expand Down
24 changes: 12 additions & 12 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,10 @@ impl Partitioning {
PartitioningSatisfaction::Exact
}
// When partition count is 1, hash requirement is satisfied.
Distribution::HashPartitioned(_) if self.partition_count() == 1 => {
Distribution::KeyPartitioned(_) if self.partition_count() == 1 => {
PartitioningSatisfaction::Exact
}
Distribution::HashPartitioned(required_exprs) => match self {
Distribution::KeyPartitioned(required_exprs) => match self {
// Here we do not check the partition count for hash partitioning and assumes the partition count
// and hash functions in the system are the same. In future if we plan to support storage partition-wise joins,
// then we need to have the partition count and hash functions validation.
Expand Down Expand Up @@ -595,7 +595,7 @@ pub enum Distribution {
SinglePartition,
/// Requires children to be distributed in such a way that the same
/// values of the keys end up in the same partition
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}

impl Distribution {
Expand All @@ -606,7 +606,7 @@ impl Distribution {
Partitioning::UnknownPartitioning(partition_count)
}
Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
Distribution::HashPartitioned(expr) => {
Distribution::KeyPartitioned(expr) => {
Partitioning::Hash(expr, partition_count)
}
}
Expand All @@ -618,8 +618,8 @@ impl Display for Distribution {
match self {
Distribution::UnspecifiedDistribution => write!(f, "Unspecified"),
Distribution::SinglePartition => write!(f, "SinglePartition"),
Distribution::HashPartitioned(exprs) => {
write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs))
Distribution::KeyPartitioned(exprs) => {
write!(f, "KeyPartitioned[{}])", format_physical_expr_list(exprs))
}
}
}
Expand Down Expand Up @@ -693,7 +693,7 @@ mod tests {
&self,
indices: impl IntoIterator<Item = usize>,
) -> Distribution {
Distribution::HashPartitioned(self.cols(indices))
Distribution::KeyPartitioned(self.cols(indices))
}

fn range_sort_expr(
Expand Down Expand Up @@ -790,7 +790,7 @@ mod tests {
Distribution::SinglePartition => {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
Distribution::KeyPartitioned(_) => {
assert_eq!(result, (true, false, false, true, false))
}
}
Expand Down Expand Up @@ -1034,14 +1034,14 @@ mod tests {
(
"Hash([a, b]) vs Hash([unknown])",
fixture.hash_partitioning([0, 1], 4),
Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
Distribution::KeyPartitioned(vec![Arc::clone(&unknown)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([unknown]) vs Hash([unknown])",
Partitioning::Hash(vec![Arc::clone(&unknown)], 4),
Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
Distribution::KeyPartitioned(vec![Arc::clone(&unknown)]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
Expand Down Expand Up @@ -1081,14 +1081,14 @@ mod tests {
(
"Hash([a]) vs Hash([])",
fixture.hash_partitioning([0], 4),
Distribution::HashPartitioned(vec![]),
Distribution::KeyPartitioned(vec![]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
(
"Hash([]) vs Hash([])",
Partitioning::Hash(vec![], 4),
Distribution::HashPartitioned(vec![]),
Distribution::KeyPartitioned(vec![]),
PartitioningSatisfaction::NotSatisfied,
PartitioningSatisfaction::NotSatisfied,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ fn add_hash_on_top(
return Ok(input);
}

let dist = Distribution::HashPartitioned(hash_exprs);
let dist = Distribution::KeyPartitioned(hash_exprs);
let satisfaction = input.plan.output_partitioning().satisfaction(
&dist,
input.plan.equivalence_properties(),
Expand Down Expand Up @@ -1024,7 +1024,7 @@ fn get_repartition_requirement_status(
Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
Precision::Absent => true,
};
let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
let is_hash = matches!(requirement, Distribution::KeyPartitioned(_));
// Hash re-partitioning is necessary when the input has more than one
// partitions:
let multi_partitions = child.output_partitioning().partition_count() > 1;
Expand Down Expand Up @@ -1206,7 +1206,7 @@ pub fn ensure_distribution(
// Grouping set aggregates (ROLLUP, CUBE, GROUPING SETS) require exact hash
// partitioning on all group columns including __grouping_id to ensure partial
// aggregates from different partitions are correctly combined.
let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs)
let requires_grouping_id = matches!(&requirement, Distribution::KeyPartitioned(exprs)
if exprs.iter().any(|expr| {
(expr.as_ref() as &dyn Any)
.downcast_ref::<Column>()
Expand Down Expand Up @@ -1244,7 +1244,7 @@ pub fn ensure_distribution(
Distribution::SinglePartition => {
child = add_merge_on_top(child, removed_fetch);
}
Distribution::HashPartitioned(exprs) => {
Distribution::KeyPartitioned(exprs) => {
// See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
// When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
if hash_necessary {
Expand Down Expand Up @@ -1311,7 +1311,7 @@ pub fn ensure_distribution(
// no ordering requirement
match requirement {
// Operator requires specific distribution.
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
Distribution::SinglePartition | Distribution::KeyPartitioned(_) => {
// If the parent doesn't maintain input order, preserving
// ordering is pointless. However, if it does maintain
// input order, we keep order-preserving variants so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution {
(Distribution::SinglePartition, _) | (_, Distribution::SinglePartition) => {
Distribution::SinglePartition
}
(Distribution::HashPartitioned(_), _) => a.clone(),
(_, Distribution::HashPartitioned(_)) => b.clone(),
(Distribution::KeyPartitioned(_), _) => a.clone(),
(_, Distribution::KeyPartitioned(_)) => b.clone(),
_ => Distribution::UnspecifiedDistribution,
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl ExecutionPlan for OutputRequirementExec {
}

let dist_req = match &self.required_input_distribution()[0] {
Distribution::HashPartitioned(exprs) => {
Distribution::KeyPartitioned(exprs) => {
let mut updated_exprs = vec![];
for expr in exprs {
let Some(new_expr) = update_expr(expr, projection.expr(), false)?
Expand All @@ -281,7 +281,7 @@ impl ExecutionPlan for OutputRequirementExec {
};
updated_exprs.push(new_expr);
}
Distribution::HashPartitioned(updated_exprs)
Distribution::KeyPartitioned(updated_exprs)
}
dist => dist.clone(),
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1709,7 +1709,7 @@ impl ExecutionPlan for AggregateExec {
vec![Distribution::UnspecifiedDistribution]
}
AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => {
vec![Distribution::HashPartitioned(self.group_by.input_exprs())]
vec![Distribution::KeyPartitioned(self.group_by.input_exprs())]
}
AggregateMode::Final | AggregateMode::Single => {
vec![Distribution::SinglePartition]
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,8 +1253,8 @@ impl ExecutionPlan for HashJoinExec {
.map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
.unzip();
vec![
Distribution::HashPartitioned(left_expr),
Distribution::HashPartitioned(right_expr),
Distribution::KeyPartitioned(left_expr),
Distribution::KeyPartitioned(right_expr),
]
}
PartitionMode::Auto => vec![
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ impl ExecutionPlan for SortMergeJoinExec {
.map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
.unzip();
vec![
Distribution::HashPartitioned(left_expr),
Distribution::HashPartitioned(right_expr),
Distribution::KeyPartitioned(left_expr),
Distribution::KeyPartitioned(right_expr),
]
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
.map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
.unzip();
vec![
Distribution::HashPartitioned(left_expr),
Distribution::HashPartitioned(right_expr),
Distribution::KeyPartitioned(left_expr),
Distribution::KeyPartitioned(right_expr),
]
}
StreamJoinPartitionMode::SinglePartition => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/sorts/partitioned_topk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ impl ExecutionPlan for PartitionedTopKExec {
.iter()
.map(|e| Arc::clone(&e.expr))
.collect();
vec![Distribution::HashPartitioned(partition_exprs)]
vec![Distribution::KeyPartitioned(partition_exprs)]
}

fn maintains_input_order(&self) -> Vec<bool> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
debug!("No partition defined for BoundedWindowAggExec!!!");
vec![Distribution::SinglePartition]
} else {
vec![Distribution::HashPartitioned(self.partition_keys().clone())]
vec![Distribution::KeyPartitioned(self.partition_keys().clone())]
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl ExecutionPlan for WindowAggExec {
if self.partition_keys().is_empty() {
vec![Distribution::SinglePartition]
} else {
vec![Distribution::HashPartitioned(self.partition_keys())]
vec![Distribution::KeyPartitioned(self.partition_keys())]
}
}

Expand Down
Loading