Skip to content

[branch-52] Backport fix: SortMergeJoin don't wait for all input before emitting #20482#20699

Merged
mbutrovich merged 4 commits intoapache:branch-52from
mbutrovich:backport_20482
Mar 4, 2026
Merged

[branch-52] Backport fix: SortMergeJoin don't wait for all input before emitting #20482#20699
mbutrovich merged 4 commits intoapache:branch-52from
mbutrovich:backport_20482

Conversation

@mbutrovich
Copy link
Contributor

Which issue does this PR close?

Backport of #20482 to branch-52.

Rationale for this change

Cherry-pick fix and prerequisites so that SortMergeJoin emits output incrementally instead of waiting for all input to
complete. This resolves OOM issues Comet is seeing with DataFusion 52.

What changes are included in this PR?

Cherry-picks of the following commits from main:

  1. refactor: Extract sort-merge join filter logic into separate module #19614 — Extract sort-merge join filter logic into separate module
  2. perf: Use zero-copy slice instead of take kernel in sort merge join #20463 — Use zero-copy slice instead of take kernel in sort merge join
  3. fix: SortMergeJoin don't wait for all input before emitting #20482 — Fix SortMergeJoin to not wait for all input before emitting

Are these changes tested?

Yes, covered by existing and new tests included in #20482.

Are there any user-facing changes?

No.

viirya and others added 3 commits March 4, 2026 10:51
…pache#19614)

Refactored the sort-merge join implementation to improve code
organization by extracting all filter-related logic into a dedicated
filter.rs module.

Changes:
- Created new filter.rs module (~576 lines) containing:
  - Filter metadata tracking (FilterMetadata struct)
  - Deferred filtering decision logic (needs_deferred_filtering)
- Filter mask correction for different join types
(get_corrected_filter_mask)
- Filter application with null-joined row handling
(filter_record_batch_by_join_type)
  - Helper functions for filter column extraction and batch filtering

- Updated stream.rs:
  - Removed ~450 lines of filter-specific code
  - Now delegates to filter module functions
  - Simplified main join logic to focus on stream processing

- Updated tests.rs:
  - Updated imports to use new filter module
  - Changed test code to use FilterMetadata struct
  - All 47 sort-merge join tests passing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->

Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
…pache#20463)

## Summary

Follows on from apache#20464 which
adds new criterion benchmarks.

- When the join indices form a contiguous ascending range (e.g.
`[3,4,5,6]`), replace the O(n) Arrow `take` kernel with O(1)
`RecordBatch::slice` (zero-copy pointer arithmetic)
- Applies to both the streamed (left) and buffered (right) sides of the
sort merge join

## Rationale

In SMJ, the streamed side cursor advances sequentially, so its indices
are almost always contiguous. The buffered side is scanned sequentially
within each key group, so its indices are also contiguous for 1:1 and
1:few joins. The `take` kernel allocates new arrays and copies data even
when a simple slice would suffice.

## Benchmark Results

Criterion micro-benchmark (100K rows, pre-sorted, no sort/scan
overhead):

| Benchmark | Baseline | Optimized | Improvement |
|-----------|----------|-----------|-------------|
| inner_1to1 (unique keys) | 5.11 ms | 3.88 ms | **-24%** |
| inner_1to10 (10K keys) | 17.64 ms | 16.29 ms | **-8%** |
| left_1to1_unmatched (5% unmatched) | 4.80 ms | 3.87 ms | **-19%** |
| left_semi_1to10 (10K keys) | 3.65 ms | 3.11 ms | **-15%** |
| left_anti_partial (partial match) | 3.58 ms | 3.43 ms | **-4%** |

All improvements are statistically significant (p < 0.05).

TPC-H SF1 with SMJ forced (`prefer_hash_join=false`) shows no
regressions across all 22 queries, with modest end-to-end improvements
on join-heavy queries (Q3 -7%, Q19 -5%, Q21 -2%).

## Implementation

- `is_contiguous_range()`: checks if a `UInt64Array` is a contiguous
ascending range. Uses quick endpoint rejection then verifies every
element sequentially.
- `freeze_streamed()`: uses `slice` instead of `take` for streamed
(left) columns when indices are contiguous.
- `fetch_right_columns_from_batch_by_idxs()`: uses `slice` instead of
`take` for buffered (right) columns when indices are contiguous.

When indices are not contiguous (e.g. repeated indices in many-to-many
joins), falls back to the existing `take` path.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…0482)

## Which issue does this PR close?

N/A

## Rationale for this change

I noticed while playing around with local tests and debugging memory
issue, that `SortMergeJoinStream` wait for all input before start
emitting, which shouldn't be the case as we can emit early when we have
enough data.

also, this cause huge memory pressure

## What changes are included in this PR?

Trying to fix the issue, not sure yet

## Are these changes tested?

Yes

## Are there any user-facing changes?


-----


## TODO:
- [x] update docs
- [x] finish fix
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Mar 4, 2026
@alamb
Copy link
Contributor

alamb commented Mar 4, 2026

Seems there are a few test failures

https://github.com/apache/datafusion/actions/runs/22677362632/job/65737685125?pr=20699

/__w/datafusion/datafusion/datafusion/spark)
error[E0061]: this function takes 8 arguments but 9 arguments were supplied
    --> datafusion/physical-plan/src/joins/sort_merge_join/tests.rs:3232:22
     |
3232 |         let oracle = HashJoinExec::try_new(
     |                      ^^^^^^^^^^^^^^^^^^^^^
...
3241 |             false,
     |             ----- unexpected argument #9 of type `bool`
     |
note: associated function defined here
    --> datafusion/physical-plan/src/joins/hash_join/exec.rs:402:12
     |
 402 |     pub fn try_new(
     |            ^^^^^^^
help: remove the extra argument
     |
3240 -             null_equality,
3241 -             false,
3240 +             null_equality,
     |

   Compiling datafusion-ffi v52.2.0

@alamb alamb changed the title [branch-52] Backport #20482 and dependencies [branch-52] Backport fix: SortMergeJoin don't wait for all input before emitting #20482 Mar 4, 2026
@mbutrovich mbutrovich merged commit 19a0fca into apache:branch-52 Mar 4, 2026
35 of 36 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants