Skip to content

perf: optimize broadcast hash join, part 1 [WIP]#3693

Closed
andygrove wants to merge 5 commits intoapache:mainfrom
andygrove:optimize-broadcast-hash-join
Closed

perf: optimize broadcast hash join, part 1 [WIP]#3693
andygrove wants to merge 5 commits intoapache:mainfrom
andygrove:optimize-broadcast-hash-join

Conversation

@andygrove
Copy link
Member

Which issue does this PR close?

Closes #3692.

Rationale for this change

CometBroadcastHashJoin has two performance bottlenecks:

  1. Native side always uses PartitionMode::Partitioned — All hash joins are hardcoded to PartitionMode::Partitioned in planner.rs, even broadcast joins. DataFusion supports PartitionMode::CollectLeft which is specifically designed for broadcast joins and avoids unnecessary repartitioning.

  2. Per-task decompression of broadcast data — Every task independently decompresses (LZ4) the entire broadcast payload. With N partitions on an executor, the same broadcast data is decompressed N times.

What changes are included in this PR?

Fix 1 — Use PartitionMode::CollectLeft for broadcast joins:

  • Added bool is_broadcast = 6 field to HashJoin protobuf message
  • Set is_broadcast = true in Scala serde when the join is a BroadcastHashJoinExec
  • Use PartitionMode::CollectLeft in Rust planner when is_broadcast is true

Fix 2 — Cache decompressed broadcast bytes at executor level:

  • Added ConcurrentHashMap in CometBatchRDD companion object keyed by broadcast ID
  • First task on an executor decompresses the broadcast data; subsequent tasks reuse cached decompressed Arrow IPC bytes
  • Each task still performs Arrow IPC parsing (for proper memory management) but skips LZ4 decompression

Benchmark:

  • Added CometBroadcastHashJoinBenchmark for measuring broadcast join performance across join types and broadcast sizes

How are these changes tested?

  • All 9 existing CometJoinSuite tests pass
  • New CometBroadcastHashJoinBenchmark validates performance (see benchmark results below)
  • Clippy passes with no warnings

Benchmark results

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.2, Apple M3 Ultra

Broadcast Hash Join (INNER, stream=2097152, broadcast=1000):
Spark                     45ms    1.0X
Comet (Scan + Exec)       51ms    0.9X

Broadcast Hash Join (LEFT, stream=2097152, broadcast=1000):
Spark                     39ms    1.0X
Comet (Scan + Exec)       48ms    0.8X

Broadcast Hash Join (RIGHT, stream=2097152, broadcast=1000):
Spark                    244ms    1.0X
Comet (Scan + Exec)      104ms    2.3X

Broadcast Hash Join (INNER, stream=2097152, broadcast=10000):
Spark                     40ms    1.0X
Comet (Scan + Exec)       47ms    0.9X

…ssion caching

Use PartitionMode::CollectLeft instead of Partitioned for broadcast hash
joins so DataFusion can optimize hash table construction for the broadcast
side. Also cache decompressed broadcast data at executor level to avoid
repeated LZ4 decompression across tasks.
@andygrove andygrove changed the title perf: optimize broadcast hash join with CollectLeft mode and decompression caching perf: optimize broadcast hash join with CollectLeft mode and decompression caching [WIP] Mar 13, 2026
@andygrove andygrove changed the title perf: optimize broadcast hash join with CollectLeft mode and decompression caching [WIP] perf: optimize broadcast hash join [WIP] Mar 13, 2026
@andygrove andygrove changed the title perf: optimize broadcast hash join [WIP] perf: optimize broadcast hash join, part 1 [WIP] Mar 13, 2026
Reverting the CollectLeft change as it causes multiple test failures
in CI including ArrayIndexOutOfBoundsException in NativeUtil.exportBatch
and assertion failures in native code.
Instead of caching only decompressed bytes (which still required
Arrow IPC deserialization per task), cache the fully materialized
ColumnarBatch objects with transferred Arrow vectors. This avoids
both LZ4 decompression and Arrow IPC parsing on subsequent task
accesses to the same broadcast relation.

Vector data is transferred to independent allocations via Arrow's
TransferPair so cached batches don't reference stream reader state
and can be safely reused across tasks.
The Arrow batch caching caused test failures in CI due to unsafe
reuse of exported Arrow vectors via FFI. Reverting to the original
Utils.decodeBatches approach which creates fresh batches per task.
@andygrove andygrove closed this Mar 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Optimize CometBroadcastHashJoin: use CollectLeft partition mode and cache deserialized broadcast batches

1 participant