Fix(optimizer): Make EnsureCooperative optimizer idempotent under multiple runs#19757
Conversation
|
cc @milenkovicm |
| // 1. Node is a leaf or exchange point | ||
| // 2. Node is not already cooperative | ||
| // 3. Not under any CooperativeExec (depth == 0) | ||
| if (is_leaf || is_exchange) && !is_cooperative && coop_depth.get() == 0 { |
There was a problem hiding this comment.
There aren't any implementations in the library that you could use to test this, but I'm not sure this is 100% correct if someone ever implements a non-cooperative exchange operator (i.e. one that doesn't use a Tokio mpsc::channel). I'll see if I can come up with a test case for this.
There was a problem hiding this comment.
Here's some very contrived test code (in details section below) that illustrates this. The code will output
aggr Lazy NonCooperative
filter Lazy NonCooperative
exch Eager Cooperative
filter Lazy NonCooperative
CooperativeExec
exch Eager NonCooperative
filter Lazy NonCooperative
scan Lazy NonCooperative
Notice that there's a coop missing around the final scan.
The code used to produce this (with the incorrect double coop). The double coop is not intentional, but the two layers of coop are.
aggr Lazy NonCooperative
filter Lazy NonCooperative
exch Eager Cooperative
filter Lazy NonCooperative
CooperativeExec
CooperativeExec
exch Eager NonCooperative
filter Lazy NonCooperative
CooperativeExec
scan Lazy NonCooperative
Details
#[tokio::test]
async fn test_exchange() {
let scan = Arc::new(DummyExec::new("scan".to_string(), None, SchedulingType::NonCooperative, EvaluationType::Lazy));
let filter = Arc::new(DummyExec::new("filter".to_string(), Some(scan), SchedulingType::NonCooperative, EvaluationType::Lazy));
let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Eager));
let coop = Arc::new(CooperativeExec::new(exchange));
let filter = Arc::new(DummyExec::new("filter".to_string(), Some(coop), SchedulingType::NonCooperative, EvaluationType::Lazy));
let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::Cooperative, EvaluationType::Eager));
let filter = Arc::new(DummyExec::new("filter".to_string(), Some(exchange), SchedulingType::NonCooperative, EvaluationType::Lazy));
let aggregate = Arc::new(DummyExec::new("aggr".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Lazy));
let config = ConfigOptions::new();
let optimized = EnsureCooperative::new()
.optimize(aggregate as Arc<dyn ExecutionPlan>, &config)
.unwrap();
let display = displayable(optimized.as_ref()).indent(true).to_string();
println!("{}", display);
}
#[derive(Debug)]
struct DummyExec {
name: String,
input: Option<Arc<dyn ExecutionPlan>>,
scheduling_type: SchedulingType,
evaluation_type: EvaluationType,
properties: PlanProperties,
}
impl DummyExec {
fn new(
name: String,
input: Option<Arc<dyn ExecutionPlan>>,
scheduling_type: SchedulingType,
evaluation_type: EvaluationType,
) -> Self {
DummyExec {
name,
input,
scheduling_type,
evaluation_type,
properties: PlanProperties::new(
EquivalenceProperties::new(Arc::new(Schema::empty())),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded
).with_scheduling_type(scheduling_type).with_evaluation_type(evaluation_type),
}
}
}
impl DisplayAs for DummyExec {
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{} {:?} {:?}", self.name, self.evaluation_type, self.scheduling_type)
}
}
impl ExecutionPlan for DummyExec {
fn name(&self) -> &str {
self.name.as_str()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
match &self.input {
None => vec![],
Some(i) => vec![i],
}
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DummyExec::new(
self.name.clone(),
match children.len() {
0 => None,
_ => Some(children[0].clone()),
},
self.scheduling_type,
self.evaluation_type,
)))
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
todo!()
}
}
There was a problem hiding this comment.
Great catch! Totally missed the case where an Eager node breaks the cooperative chain.
My plan is to maintain an ancestry stack that tracks both SchedulingType and EvaluationType. The new logic checks the stack bottom-up: a node is only considered 'protected' (and thus skips wrapping) if it encounters a Cooperative ancestor before any Eager pipeline breaker.
I have also added a test case to cover this scenario. Thanks for the insight!
|
@danielhumanmod thanks for fixing this. I had completely forgotten about the need for idempotence when I wrote this. |
|
Just for illustration, this is what I was getting if physical operator run multiple times (I would bet it run 3 times in this example 😀 ) my naive (wrong) tinking was that change to transform down would fix it. anyway, thanks @danielhumanmod |
pepijnve
left a comment
There was a problem hiding this comment.
I read through the revised code and test cases. Looks correct to me.
|
Thanks team, can you help me to merge it when you have a chance as I don't have permission right now, appreciate it! |
|
Thanks @xudong963 @pepijnve and @danielhumanmod ! |
…ultiple runs (apache#19757) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#19756. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The previous logic of `EnsureCooperative` optimizer lacked context awareness regarding ancestor nodes, making it not idempotent across multiple runs. Specifically, we need to ensure that: 1. **Idempotency**: Running the rule multiple times does not produce nested `CooperativeExec` wrappers. 2. **Context Awareness**: If a subtree is already protected by a `CooperativeExec`, we should skip and not double-wrap its children. ## What changes are included in this PR? To solve this, we cannot rely solely on `transform_up` (which lacks parent context) or `transform_down` (which makes safe mutation difficult). This PR adopts `transform_down_up` with a depth counter to strictly enforce that nodes are only wrapped when they are not currently under a `CooperativeExec` scope. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? More unit tests coverage <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
EnsureCooperativeis not idempotent #19756.Rationale for this change
The previous logic of
EnsureCooperativeoptimizer lacked context awareness regarding ancestor nodes, making it not idempotent across multiple runs.Specifically, we need to ensure that:
CooperativeExecwrappers.CooperativeExec, we should skip and not double-wrap its children.What changes are included in this PR?
To solve this, we cannot rely solely on
transform_up(which lacks parent context) ortransform_down(which makes safe mutation difficult). This PR adoptstransform_down_upwith a depth counter to strictly enforce that nodes are only wrapped when they are not currently under aCooperativeExecscope.Are these changes tested?
More unit tests coverage
Are there any user-facing changes?
No