Skip to content

Fix sort merge interleave overflow#20922

Open
xudong963 wants to merge 7 commits intoapache:mainfrom
xudong963:fix_overflow
Open

Fix sort merge interleave overflow#20922
xudong963 wants to merge 7 commits intoapache:mainfrom
xudong963:fix_overflow

Conversation

@xudong963
Copy link
Member

@xudong963 xudong963 commented Mar 13, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

When SortPreservingMergeStream merges batches containing large string/binary columns whose combined offsets exceed i32::MAX, Arrow's interleave panics with .expect("overflow"). This PR catches that panic and retries with progressively fewer rows, producing smaller output batches that fit within i32 offset limits.

What changes are included in this PR?

Are these changes tested?

Yes UT

Are there any user-facing changes?

@xudong963 xudong963 marked this pull request as draft March 13, 2026 09:15
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Mar 13, 2026
@xudong963
Copy link
Member Author

xudong963 commented Mar 16, 2026

I'll continue the PR after apache/arrow-rs#9549 gets into DataFusion

I can directly capture the panic then fix

@xudong963 xudong963 marked this pull request as ready for review March 19, 2026 09:39
@xudong963 xudong963 requested review from 2010YOUY01 and kosiew March 19, 2026 09:40
Copy link
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 @xudong963,
THanks for working on this.

match catch_unwind(AssertUnwindSafe(|| interleave(&arrays, indices))) {
Ok(result) => Ok(result?),
Err(panic_payload) => {
if is_overflow_panic(&panic_payload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching any panic whose message merely contains "overflow" is too broad for a recovery path in the merge operator.

This now converts unrelated bugs such as Rust arithmetic overflows ("attempt to multiply with overflow") or allocation failures like "capacity overflow" into a synthetic OffsetOverflowError, causing the stream to silently split batches instead of surfacing the real defect.
Since this code is on the hot path and intentionally swallows panics, I think we need a tighter discriminator before merging. Ideally the overflow detection should match the specific Arrow panic we expect, or be isolated behind a smaller helper/API so we are not turning arbitrary panics into data-dependent control flow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, totally agree

/// panic and retries with fewer rows until the output fits in i32
/// offsets.
#[test]
fn test_interleave_overflow_is_caught() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and test_sort_merge_fetch_interleave_overflow
allocate enormous strings (768 * 1024 * 1024 bytes each) and then materialize them into multiple StringArrays.

In practice that means several gigabytes of heap allocation per test, which is likely to make CI flaky or OOM outright.

The coverage is important, but I do not think these tests are better replaced with a lower-memory reproduction, for example by constructing the overflow condition with a purpose-built array fixture/helper instead of copying multi-GB payloads into StringArrays.

cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.done {
// When `build_record_batch()` hits an i32 offset overflow (e.g.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The done branch and the normal emit path both repeat the same before = len(); build_record_batch(); produced += ... bookkeeping.

This feels like it wants a small helper on SortPreservingMergeStream or BatchBuilder so the overflow/drain behavior stays in one place.

} else {
self.batches_mem_used -= get_record_batch_memory_size(batch);
// Try interleaving all indices. On offset overflow, halve and retry.
let mut end = self.indices.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry loop is clear, but I think end is really "rows_to_emit".
Renaming that variable or extracting a helper like build_partial_record_batch would make the control flow a bit easier to scan now that build_record_batch has to coordinate retry, draining, and delayed cleanup.

@xudong963
Copy link
Member Author

@kosiew thanks for the review, 19fec33 addressed in the commit

xudong963 added a commit to massive-com/arrow-datafusion that referenced this pull request Mar 20, 2026
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
xudong963 added a commit to massive-com/arrow-datafusion that referenced this pull request Mar 20, 2026
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@xudong963
Copy link
Member Author

Some logs in our env:

thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
{"timestamp":"2026-03-20T09:01:11.233319Z","level":"WARN","fields":{"message":"Interleave offset overflow with 2043 rows, retrying with 1021","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:15.856713Z","level":"WARN","fields":{"message":"Interleave offset overflow with 2043 rows, retrying with 510","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:18.259290Z","level":"WARN","fields":{"message":"Interleave offset overflow with 2043 rows, retrying with 255","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:25.359553Z","level":"WARN","fields":{"message":"Interleave offset overflow with 1788 rows, retrying with 894","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:29.011789Z","level":"WARN","fields":{"message":"Interleave offset overflow with 1788 rows, retrying with 447","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:31.266372Z","level":"WARN","fields":{"message":"Interleave offset overflow with 1788 rows, retrying with 223","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:37.146961Z","level":"WARN","fields":{"message":"Interleave offset overflow with 1565 rows, retrying with 782","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:39.659320Z","level":"WARN","fields":{"message":"Interleave offset overflow with 1565 rows, retrying with 391","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:45.441247Z","level":"WARN","fields":{"message":"Interleave offset overflow with 1174 rows, retrying with 587","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
thread 'main' (32) panicked at /usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.2.0/src/interleave.rs:180:41:
overflow
{"timestamp":"2026-03-20T09:01:47.648284Z","level":"WARN","fields":{"message":"Interleave offset overflow with 1174 rows, retrying with 293","log.target":"datafusion_physical_plan::sorts::builder","log.module_path":"datafusion_physical_plan::sorts::builder","log.file":"/usr/local/cargo/git/checkouts/arrow-datafusion-ea123ae062956126/8dcb444/datafusion/physical-plan/src/sorts/builder.rs","log.line":282},"target":"datafusion_physical_plan::sorts::builder"}
Finished writing metadata, stats, and bitmaps files. Metadata rows: 2043, Stats rows: 2043.

// `.expect("overflow")` / `.expect("offset overflow")`.
// Catch only those specific panics so the caller can retry
// with fewer rows while unrelated defects still unwind.
match catch_unwind(AssertUnwindSafe(f)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be avoided after the apache/arrow-rs#9549 gets into DF

Copy link
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963

Thanks for the update - this looks good overall, and I don’t have any blocking concerns. I left a couple of non-blocking suggestions around memory retention during partial draining and test coverage for the overflow retry path.

// Remove consumed indices, keeping any remaining for the next call.
self.indices.drain(..rows_to_emit);

// Only clean up fully-consumed batches when all indices are drained,
Copy link
Contributor

@kosiew kosiew Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change. One thing that stood out to me here: now that build_record_batch() can emit a prefix and leave the remainder buffered, this branch seems to keep every fully-consumed input batch alive until self.indices is empty.

That seems functionally correct, but it also means overflow cases could retain quite a bit of memory across several follow-up polls - especially for FETCH-limited queries where we stop pulling new input and just drain leftovers.

Would it make sense to either release batches that are no longer referenced by the remaining indices, or at least leave a quick comment here calling out that this retention is intentional? I think that would help future readers understand the tradeoff.

DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None)
}

fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>
Copy link
Contributor

@kosiew kosiew Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry behavior looks good, but right now it seems like it’s only covered through synthetic helper failures.

Since the production path depends on matching Arrow’s panic payload pretty closely, I think it’d be great to add one higher-level regression test closer to BatchBuilder::build_record_batch() or SortPreservingMergeStream that exercises the retry/drain flow end-to-end through an injectable interleave hook.

That would make it a lot easier to catch future Arrow-side panic-message changes - or refactors in this file before they slip through.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants