[branch-52] Backport fix: SortMergeJoin don't wait for all input before emitting #20482#20699
Merged
mbutrovich merged 4 commits intoapache:branch-52from Mar 4, 2026
Merged
Conversation
…pache#19614) Refactored the sort-merge join implementation to improve code organization by extracting all filter-related logic into a dedicated filter.rs module. Changes: - Created new filter.rs module (~576 lines) containing: - Filter metadata tracking (FilterMetadata struct) - Deferred filtering decision logic (needs_deferred_filtering) - Filter mask correction for different join types (get_corrected_filter_mask) - Filter application with null-joined row handling (filter_record_batch_by_join_type) - Helper functions for filter column extraction and batch filtering - Updated stream.rs: - Removed ~450 lines of filter-specific code - Now delegates to filter module functions - Simplified main join logic to focus on stream processing - Updated tests.rs: - Updated imports to use new filter module - Changed test code to use FilterMetadata struct - All 47 sort-merge join tests passing 🤖 Generated with [Claude Code](https://claude.com/claude-code) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
…pache#20463) ## Summary Follows on from apache#20464 which adds new criterion benchmarks. - When the join indices form a contiguous ascending range (e.g. `[3,4,5,6]`), replace the O(n) Arrow `take` kernel with O(1) `RecordBatch::slice` (zero-copy pointer arithmetic) - Applies to both the streamed (left) and buffered (right) sides of the sort merge join ## Rationale In SMJ, the streamed side cursor advances sequentially, so its indices are almost always contiguous. The buffered side is scanned sequentially within each key group, so its indices are also contiguous for 1:1 and 1:few joins. The `take` kernel allocates new arrays and copies data even when a simple slice would suffice. ## Benchmark Results Criterion micro-benchmark (100K rows, pre-sorted, no sort/scan overhead): | Benchmark | Baseline | Optimized | Improvement | |-----------|----------|-----------|-------------| | inner_1to1 (unique keys) | 5.11 ms | 3.88 ms | **-24%** | | inner_1to10 (10K keys) | 17.64 ms | 16.29 ms | **-8%** | | left_1to1_unmatched (5% unmatched) | 4.80 ms | 3.87 ms | **-19%** | | left_semi_1to10 (10K keys) | 3.65 ms | 3.11 ms | **-15%** | | left_anti_partial (partial match) | 3.58 ms | 3.43 ms | **-4%** | All improvements are statistically significant (p < 0.05). TPC-H SF1 with SMJ forced (`prefer_hash_join=false`) shows no regressions across all 22 queries, with modest end-to-end improvements on join-heavy queries (Q3 -7%, Q19 -5%, Q21 -2%). ## Implementation - `is_contiguous_range()`: checks if a `UInt64Array` is a contiguous ascending range. Uses quick endpoint rejection then verifies every element sequentially. - `freeze_streamed()`: uses `slice` instead of `take` for streamed (left) columns when indices are contiguous. - `fetch_right_columns_from_batch_by_idxs()`: uses `slice` instead of `take` for buffered (right) columns when indices are contiguous. When indices are not contiguous (e.g. repeated indices in many-to-many joins), falls back to the existing `take` path. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…0482) ## Which issue does this PR close? N/A ## Rationale for this change I noticed while playing around with local tests and debugging memory issue, that `SortMergeJoinStream` wait for all input before start emitting, which shouldn't be the case as we can emit early when we have enough data. also, this cause huge memory pressure ## What changes are included in this PR? Trying to fix the issue, not sure yet ## Are these changes tested? Yes ## Are there any user-facing changes? ----- ## TODO: - [x] update docs - [x] finish fix
12 tasks
rluvaton
approved these changes
Mar 4, 2026
Contributor
|
Seems there are a few test failures https://github.com/apache/datafusion/actions/runs/22677362632/job/65737685125?pr=20699 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Backport of #20482 to branch-52.
Rationale for this change
Cherry-pick fix and prerequisites so that SortMergeJoin emits output incrementally instead of waiting for all input to
complete. This resolves OOM issues Comet is seeing with DataFusion 52.
What changes are included in this PR?
Cherry-picks of the following commits from
main:Are these changes tested?
Yes, covered by existing and new tests included in #20482.
Are there any user-facing changes?
No.