perf: optimize broadcast hash join, part 1 [WIP]#3693
Closed
andygrove wants to merge 5 commits intoapache:mainfrom
Closed
perf: optimize broadcast hash join, part 1 [WIP]#3693andygrove wants to merge 5 commits intoapache:mainfrom
andygrove wants to merge 5 commits intoapache:mainfrom
Conversation
…ssion caching Use PartitionMode::CollectLeft instead of Partitioned for broadcast hash joins so DataFusion can optimize hash table construction for the broadcast side. Also cache decompressed broadcast data at executor level to avoid repeated LZ4 decompression across tasks.
Reverting the CollectLeft change as it causes multiple test failures in CI including ArrayIndexOutOfBoundsException in NativeUtil.exportBatch and assertion failures in native code.
Instead of caching only decompressed bytes (which still required Arrow IPC deserialization per task), cache the fully materialized ColumnarBatch objects with transferred Arrow vectors. This avoids both LZ4 decompression and Arrow IPC parsing on subsequent task accesses to the same broadcast relation. Vector data is transferred to independent allocations via Arrow's TransferPair so cached batches don't reference stream reader state and can be safely reused across tasks.
The Arrow batch caching caused test failures in CI due to unsafe reuse of exported Arrow vectors via FFI. Reverting to the original Utils.decodeBatches approach which creates fresh batches per task.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3692.
Rationale for this change
CometBroadcastHashJoin has two performance bottlenecks:
Native side always uses
PartitionMode::Partitioned— All hash joins are hardcoded toPartitionMode::Partitionedinplanner.rs, even broadcast joins. DataFusion supportsPartitionMode::CollectLeftwhich is specifically designed for broadcast joins and avoids unnecessary repartitioning.Per-task decompression of broadcast data — Every task independently decompresses (LZ4) the entire broadcast payload. With N partitions on an executor, the same broadcast data is decompressed N times.
What changes are included in this PR?
Fix 1 — Use
PartitionMode::CollectLeftfor broadcast joins:bool is_broadcast = 6field toHashJoinprotobuf messageis_broadcast = truein Scala serde when the join is aBroadcastHashJoinExecPartitionMode::CollectLeftin Rust planner whenis_broadcastis trueFix 2 — Cache decompressed broadcast bytes at executor level:
ConcurrentHashMapinCometBatchRDDcompanion object keyed by broadcast IDBenchmark:
CometBroadcastHashJoinBenchmarkfor measuring broadcast join performance across join types and broadcast sizesHow are these changes tested?
CometJoinSuitetests passCometBroadcastHashJoinBenchmarkvalidates performance (see benchmark results below)Benchmark results