Skip to content

Fix(optimizer): Make EnsureCooperative optimizer idempotent under multiple runs#19757

Merged
alamb merged 4 commits intoapache:mainfrom
danielhumanmod:idempotent-coopereative
Jan 16, 2026
Merged

Fix(optimizer): Make EnsureCooperative optimizer idempotent under multiple runs#19757
alamb merged 4 commits intoapache:mainfrom
danielhumanmod:idempotent-coopereative

Conversation

@danielhumanmod
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

The previous logic of EnsureCooperative optimizer lacked context awareness regarding ancestor nodes, making it not idempotent across multiple runs.

Specifically, we need to ensure that:

  1. Idempotency: Running the rule multiple times does not produce nested CooperativeExec wrappers.
  2. Context Awareness: If a subtree is already protected by a CooperativeExec, we should skip and not double-wrap its children.

What changes are included in this PR?

To solve this, we cannot rely solely on transform_up (which lacks parent context) or transform_down (which makes safe mutation difficult). This PR adopts transform_down_up with a depth counter to strictly enforce that nodes are only wrapped when they are not currently under a CooperativeExec scope.

Are these changes tested?

More unit tests coverage

Are there any user-facing changes?

No

@github-actions github-actions bot added the optimizer Optimizer rules label Jan 11, 2026
@danielhumanmod
Copy link
Contributor Author

cc @milenkovicm

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, cc @pepijnve

// 1. Node is a leaf or exchange point
// 2. Node is not already cooperative
// 3. Not under any CooperativeExec (depth == 0)
if (is_leaf || is_exchange) && !is_cooperative && coop_depth.get() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There aren't any implementations in the library that you could use to test this, but I'm not sure this is 100% correct if someone ever implements a non-cooperative exchange operator (i.e. one that doesn't use a Tokio mpsc::channel). I'll see if I can come up with a test case for this.

Copy link
Contributor

@pepijnve pepijnve Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's some very contrived test code (in details section below) that illustrates this. The code will output

aggr Lazy NonCooperative
  filter Lazy NonCooperative
    exch Eager Cooperative
      filter Lazy NonCooperative
        CooperativeExec
          exch Eager NonCooperative
            filter Lazy NonCooperative
              scan Lazy NonCooperative

Notice that there's a coop missing around the final scan.

The code used to produce this (with the incorrect double coop). The double coop is not intentional, but the two layers of coop are.

aggr Lazy NonCooperative
  filter Lazy NonCooperative
    exch Eager Cooperative
      filter Lazy NonCooperative
        CooperativeExec
          CooperativeExec
            exch Eager NonCooperative
              filter Lazy NonCooperative
                CooperativeExec
                  scan Lazy NonCooperative
Details
#[tokio::test]
async fn test_exchange() {
    let scan = Arc::new(DummyExec::new("scan".to_string(), None, SchedulingType::NonCooperative, EvaluationType::Lazy));
    let filter = Arc::new(DummyExec::new("filter".to_string(), Some(scan), SchedulingType::NonCooperative, EvaluationType::Lazy));
    let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Eager));
    let coop = Arc::new(CooperativeExec::new(exchange));
    let filter = Arc::new(DummyExec::new("filter".to_string(), Some(coop), SchedulingType::NonCooperative, EvaluationType::Lazy));
    let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::Cooperative, EvaluationType::Eager));
    let filter = Arc::new(DummyExec::new("filter".to_string(), Some(exchange), SchedulingType::NonCooperative, EvaluationType::Lazy));
    let aggregate = Arc::new(DummyExec::new("aggr".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Lazy));

    let config = ConfigOptions::new();
    let optimized = EnsureCooperative::new()
        .optimize(aggregate as Arc<dyn ExecutionPlan>, &config)
        .unwrap();

    let display = displayable(optimized.as_ref()).indent(true).to_string();

    println!("{}", display);
}

#[derive(Debug)]
struct DummyExec {
    name: String,
    input: Option<Arc<dyn ExecutionPlan>>,
    scheduling_type: SchedulingType,
    evaluation_type: EvaluationType,
    properties: PlanProperties,
}

impl DummyExec {
    fn new(
        name: String,
        input: Option<Arc<dyn ExecutionPlan>>,
        scheduling_type: SchedulingType,
        evaluation_type: EvaluationType,
    ) -> Self {
        DummyExec {
            name,
            input,
            scheduling_type,
            evaluation_type,
            properties: PlanProperties::new(
                EquivalenceProperties::new(Arc::new(Schema::empty())),
                Partitioning::UnknownPartitioning(1),
                EmissionType::Incremental,
                Boundedness::Bounded
            ).with_scheduling_type(scheduling_type).with_evaluation_type(evaluation_type),
        }
    }
}

impl DisplayAs for DummyExec {
    fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
        write!(f, "{} {:?} {:?}", self.name, self.evaluation_type, self.scheduling_type)
    }
}

impl ExecutionPlan for DummyExec {
    fn name(&self) -> &str {
        self.name.as_str()
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn properties(&self) -> &PlanProperties {
        &self.properties
    }

    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        match &self.input {
            None => vec![],
            Some(i) => vec![i],
        }
    }

    fn with_new_children(
        self: Arc<Self>,
        children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(Arc::new(DummyExec::new(
            self.name.clone(),
            match children.len() {
                0 => None,
                _ => Some(children[0].clone()),
            },
            self.scheduling_type,
            self.evaluation_type,
        )))
    }

    fn execute(
        &self,
        _partition: usize,
        _context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        todo!()
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Totally missed the case where an Eager node breaks the cooperative chain.

My plan is to maintain an ancestry stack that tracks both SchedulingType and EvaluationType. The new logic checks the stack bottom-up: a node is only considered 'protected' (and thus skips wrapping) if it encounters a Cooperative ancestor before any Eager pipeline breaker.

I have also added a test case to cover this scenario. Thanks for the insight!

@pepijnve
Copy link
Contributor

@danielhumanmod thanks for fixing this. I had completely forgotten about the need for idempotence when I wrote this.

@milenkovicm
Copy link
Contributor

Just for illustration, this is what I was getting if physical operator run multiple times (I would bet it run 3 times in this example 😀 )

    AdaptiveDatafusionExec: is_final=false, plan_id=1, stage_id=pending
      ProjectionExec: expr=[big_col@1 as big_col, big_col@0 as big_col]
        CrossJoinExec
          CoalescePartitionsExec
            ExchangeExec: partitioning=None, plan_id=2, stage_id=pending, stage_resolved=false
              CooperativeExec
                CooperativeExec
                  CooperativeExec
                    MockPartitionedScan: num_partitions=2, statistics=[Rows=Inexact(1024), Bytes=Inexact(8192), [(Col[0]:)]]
          ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0, stage_id=0, stage_resolved=true
            CooperativeExec
              CooperativeExec
                CooperativeExec
                  StatisticsExec: col_count=1, row_count=Inexact(262144)

my naive (wrong) tinking was that change to transform down would fix it.

anyway, thanks @danielhumanmod

Copy link
Contributor

@pepijnve pepijnve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read through the revised code and test cases. Looks correct to me.

@danielhumanmod
Copy link
Contributor Author

Thanks team, can you help me to merge it when you have a chance as I don't have permission right now, appreciate it!

@alamb alamb added this pull request to the merge queue Jan 16, 2026
@alamb
Copy link
Contributor

alamb commented Jan 16, 2026

Thanks @xudong963 @pepijnve and @danielhumanmod !

Merged via the queue into apache:main with commit ab81d3b Jan 16, 2026
32 checks passed
rkrishn7 pushed a commit to massive-com/arrow-datafusion that referenced this pull request Jan 22, 2026
…ultiple runs (apache#19757)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes apache#19756.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

The previous logic of `EnsureCooperative` optimizer lacked context
awareness regarding ancestor nodes, making it not idempotent across
multiple runs.

Specifically, we need to ensure that:
1. **Idempotency**: Running the rule multiple times does not produce
nested `CooperativeExec` wrappers.
2. **Context Awareness**: If a subtree is already protected by a
`CooperativeExec`, we should skip and not double-wrap its children.

## What changes are included in this PR?

To solve this, we cannot rely solely on `transform_up` (which lacks
parent context) or `transform_down` (which makes safe mutation
difficult). This PR adopts `transform_down_up` with a depth counter to
strictly enforce that nodes are only wrapped when they are not currently
under a `CooperativeExec` scope.

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

## Are these changes tested?

More unit tests coverage

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

## Are there any user-facing changes?

No

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] EnsureCooperative is not idempotent

5 participants