feat: physical execution for range partitioning#23231
Conversation
| )?; | ||
|
|
||
| indices.iter_mut().for_each(|v| v.clear()); | ||
| let sort_options: Vec<SortOptions> = |
There was a problem hiding this comment.
I could create this at construction time to avoid re-creating on every invocation of partition_iter?
| /// This function takes the `arrays` associated with the evaluated expressions for the ordering, split points and sort options, and indices array | ||
| /// Then for every row, creates the "row key" based on the given ordering for the range, and binary searches through the split points to find the appropriate partition index | ||
| /// That partition index is associated with the array in `indices`, which is given the row index, meaning that the row is sent to the partition at that index |
There was a problem hiding this comment.
This comment might not be the best right now lol - I'll try to sleep on this and come may edit with something more descriptive tomorrow, open to any suggestions :)
| Partitioning::Range(_) => { | ||
| // Range partitioning optimizer propagation is tracked in | ||
| // https://github.com/apache/datafusion/issues/22395 | ||
| // https://github.com/apache/datafusion/issues/23230 |
There was a problem hiding this comment.
Intentionally left these un-implemented and created #23230 for them if that's OK!
|
cc @gene-bordegaray ! 🎉 |
| return not_impl_err!( | ||
| "Range partitioning execution is not implemented by RepartitionExec" | ||
| ); | ||
| Partitioning::Range(range_partitioning) => { |
There was a problem hiding this comment.
Should we be delegating to BatchPartitioner's try_new method for these?
|
Thank you for the work @saadtajwar , I think this will be very useful in upcoming efforts 😄 Before really diving into this we shoudl step back and plan how repartitioning will work from a high level first before diving into the nitty gritty. Per descussions here #23236 it seems that we will be working toward deprecating So essentially we are going to have operators that require a These are some things I would like to discuss with other before we decide to implement anything regarding repartitioning (as of now we just preserve it from a |
|
Hey @gene-bordegaray - that makes sense, thanks! I just posted some thoughts in #23236 just to help us keep the discussion centralized in one spot - looking forward to working on this all together! |
Filters involving ranges (at least BETWEEN, <=, <, >, >=, involving literals at first, but possibly we can do something smart for columns and more complex expressions too) could benefit from range partitioning too, as it would allow partition pruning of entire partitions without evaluation. |
From briefly looking around, I only see a few cases where a logical optimizer might want to request Range rather than Hash (things like non-equi joins, re-organizing data for output that preserves partitioning, global window functions.)
They benefit from existing Range partitioning, but it probably wouldn't make sense to repartition data using Range for that purpose: Hash will get you better balance more cheaply (rather than via binary search), and then each partition can directly evaluate the filter.
But the choice to introduce Range partitioning would be a logical decision, right? So, while I agree that changing logical optimizers to request Range would take a lot of thought and design, implementing the physical side (this PR) doesn't seem to be blocked on that? Or are you concerned that the API might still shift, or that it won't have enough test-coverage? |
|
Agree with @stuhood on the above, especially on the below - while I'm still trying to understand the
|
@stuhood I am most concerned with implementing physical layer behavior before having a real use for it that we can represent. What would the use case of being able to repartiution on range right now be? Do you have a use case where you would like to phsyically insert a repartition on range? maybe this is a good place to start the conversation on where and how this should be decided 🤔 |
The main usecase we have at the moment for range partitioning is when the input source data is already range partitioned and the point of the work in this epic is for DataFusion to know about that (pre-existing) partitioning and take advantage of it I think you guys are talking about having hte optimizer decide to repartition data into ranges (e.g. when it wants to add more parallelism to the plan). That would probably need to be a cost based decision based on statistics (like value distributions) that we don't yet have in DataFusion (and maybe never will have). |
|
TLDR is I agree with @stuhood
👍 |
Understood. Yea, I don't feel strongly about it either way... but I don't really love the idea of leaving in |
I agree panics are not great -- returing an NotYetImplemned error would be better. |
|
I agree panics are not good and to keep the existing |
Which issue does this PR close?
Rationale for this change
Range repartitioning was already planned and serialized into physical plans, but
RepartitionExeccould not execute it. This PR completes the core execution path so rows in an input batch are routed to the correct output partition based on range split points and the ordering defined on the partitioning scheme.What changes are included in this PR?
This PR adds a
Rangevariant toBatchPartitionerthat evaluates the ordering expressions on each input batch, compares each row's key against split points usingcompare_rows(respecting ASC/DESC and null ordering), and assigns row indices to output partitions via binary search. The partitioned row indices are then materialized into sub-batches using the samepartition_grouped_takepath as hash repartitioning.pull_from_inputis wired to construct a range partitioner forPartitioning::Range, replacing the previousnot_impl_err!at execution time.Optimizer-related paths remain intentionally unimplemented and are tracked in #23230: projection pushdown through
RepartitionExec(try_swapping_with_projection), sort pushdown (try_pushdown_sort), and changing partition counts viarepartitioned().Are these changes tested?
Yes!
Are there any user-facing changes?
No public API changes