Related:
Is your feature request related to a problem or challenge?
Partitioned TopK operations like PartitionedTopKExec require Distribution::KeyPartitioned, but Partitioning::Range does not yet generally satisfy this through Partitioning::satisfaction.
This means DF can insert an unnecessary repartition even when the input is already range partitioned which satisfies the distribution.
Example:
PartitionedTopKExec: partition=[a], order=[...]
Input: Partitioning::Range(ordering=[a ASC], split_points=[...])
Rows with the same a value are colocated in one range partition, so PartitionedTopK does not need a hash repartition just to satisfy the KeyPartitioned distribution.
Describe the solution you'd like
Allow compatible Partitioning::Range inputs to satisfy partition requirements for PartitionedTopKExec.
This should include:
PartitionedTopKExec::required_input_distribution
- Exact range satisfaction, such as
Range([a]) satisfying partition=[a]
- Subset range satisfaction, such as
Range([a]) satisfying partition=[a, b], when subset satisfaction is enabled
- Fallback to hash repartitioning when the range key is incompatible.
- This should include a suit of slt tests in
range_partitioning.slt that tests core behavior across positive / negative cases and settings like:
datafusion.optimizer.preserve_file_partitions
datafusion.execution.target_partitions
datafusion.optimizer.subset_repartition_threshold
This should be a private satisfaction implementation. General Partitioning::satisfaction for Partitioning::Range will be implemented once a more operators have been covered to reduce blast-radius of optimizer changes.
Additional context
Related:
Aggregations: Aggregations SupportPartitioning::Range#23239Is your feature request related to a problem or challenge?
Partitioned TopK operations like
PartitionedTopKExecrequireDistribution::KeyPartitioned, butPartitioning::Rangedoes not yet generally satisfy this throughPartitioning::satisfaction.This means DF can insert an unnecessary repartition even when the input is already range partitioned which satisfies the distribution.
Example:
Rows with the same
avalue are colocated in one range partition, so PartitionedTopK does not need a hash repartition just to satisfy theKeyPartitioneddistribution.Describe the solution you'd like
Allow compatible
Partitioning::Rangeinputs to satisfy partition requirements forPartitionedTopKExec.This should include:
PartitionedTopKExec::required_input_distributionRange([a])satisfyingpartition=[a]Range([a])satisfyingpartition=[a, b], when subset satisfaction is enabledrange_partitioning.sltthat tests core behavior across positive / negative cases and settings like:datafusion.optimizer.preserve_file_partitionsdatafusion.execution.target_partitionsdatafusion.optimizer.subset_repartition_thresholdThis should be a private
satisfactionimplementation. GeneralPartitioning::satisfactionforPartitioning::Rangewill be implemented once a more operators have been covered to reduce blast-radius of optimizer changes.Additional context
Partitioning::Range#23239Distribution::HashPartitionedin favor ofDistribution::KeyPartitioned: Replace / rename HashPartitioned distribution as KeyPartitioned #23236Distribution::KeyPartitionedviaPartitioning::Range: AllowPartitioning::Rangeto satisfyDistribution::KeyPartitionedgenerally #23266