Conversation
|
I can directly capture the panic then fix |
kosiew
left a comment
There was a problem hiding this comment.
👋 @xudong963,
THanks for working on this.
| match catch_unwind(AssertUnwindSafe(|| interleave(&arrays, indices))) { | ||
| Ok(result) => Ok(result?), | ||
| Err(panic_payload) => { | ||
| if is_overflow_panic(&panic_payload) { |
There was a problem hiding this comment.
Catching any panic whose message merely contains "overflow" is too broad for a recovery path in the merge operator.
This now converts unrelated bugs such as Rust arithmetic overflows ("attempt to multiply with overflow") or allocation failures like "capacity overflow" into a synthetic OffsetOverflowError, causing the stream to silently split batches instead of surfacing the real defect.
Since this code is on the hot path and intentionally swallows panics, I think we need a tighter discriminator before merging. Ideally the overflow detection should match the specific Arrow panic we expect, or be isolated behind a smaller helper/API so we are not turning arbitrary panics into data-dependent control flow.
There was a problem hiding this comment.
good point, totally agree
| /// panic and retries with fewer rows until the output fits in i32 | ||
| /// offsets. | ||
| #[test] | ||
| fn test_interleave_overflow_is_caught() { |
There was a problem hiding this comment.
this and test_sort_merge_fetch_interleave_overflow
allocate enormous strings (768 * 1024 * 1024 bytes each) and then materialize them into multiple StringArrays.
In practice that means several gigabytes of heap allocation per test, which is likely to make CI flaky or OOM outright.
The coverage is important, but I do not think these tests are better replaced with a lower-memory reproduction, for example by constructing the overflow condition with a purpose-built array fixture/helper instead of copying multi-GB payloads into StringArrays.
| cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Result<RecordBatch>>> { | ||
| if self.done { | ||
| // When `build_record_batch()` hits an i32 offset overflow (e.g. |
There was a problem hiding this comment.
The done branch and the normal emit path both repeat the same before = len(); build_record_batch(); produced += ... bookkeeping.
This feels like it wants a small helper on SortPreservingMergeStream or BatchBuilder so the overflow/drain behavior stays in one place.
| } else { | ||
| self.batches_mem_used -= get_record_batch_memory_size(batch); | ||
| // Try interleaving all indices. On offset overflow, halve and retry. | ||
| let mut end = self.indices.len(); |
There was a problem hiding this comment.
The retry loop is clear, but I think end is really "rows_to_emit".
Renaming that variable or extracting a helper like build_partial_record_batch would make the control flow a bit easier to scan now that build_record_batch has to coordinate retry, draining, and delayed cleanup.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Some logs in our env: |
| // `.expect("overflow")` / `.expect("offset overflow")`. | ||
| // Catch only those specific panics so the caller can retry | ||
| // with fewer rows while unrelated defects still unwind. | ||
| match catch_unwind(AssertUnwindSafe(f)) { |
There was a problem hiding this comment.
This can be avoided after the apache/arrow-rs#9549 gets into DF
| // Remove consumed indices, keeping any remaining for the next call. | ||
| self.indices.drain(..rows_to_emit); | ||
|
|
||
| // Only clean up fully-consumed batches when all indices are drained, |
There was a problem hiding this comment.
Nice change. One thing that stood out to me here: now that build_record_batch() can emit a prefix and leave the remainder buffered, this branch seems to keep every fully-consumed input batch alive until self.indices is empty.
That seems functionally correct, but it also means overflow cases could retain quite a bit of memory across several follow-up polls - especially for FETCH-limited queries where we stop pulling new input and just drain leftovers.
Would it make sense to either release batches that are no longer referenced by the remaining indices, or at least leave a quick comment here calling out that this retention is intentional? I think that would help future readers understand the tradeoff.
| DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None) | ||
| } | ||
|
|
||
| fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T> |
There was a problem hiding this comment.
The retry behavior looks good, but right now it seems like it’s only covered through synthetic helper failures.
Since the production path depends on matching Arrow’s panic payload pretty closely, I think it’d be great to add one higher-level regression test closer to BatchBuilder::build_record_batch() or SortPreservingMergeStream that exercises the retry/drain flow end-to-end through an injectable interleave hook.
That would make it a lot easier to catch future Arrow-side panic-message changes - or refactors in this file before they slip through.
Which issue does this PR close?
Rationale for this change
When SortPreservingMergeStream merges batches containing large string/binary columns whose combined offsets exceed i32::MAX, Arrow's interleave panics with .expect("overflow"). This PR catches that panic and retries with progressively fewer rows, producing smaller output batches that fit within i32 offset limits.
What changes are included in this PR?
Are these changes tested?
Yes UT
Are there any user-facing changes?