-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Is your feature request related to a problem or challenge?
The problem shows up even in our tpcds_sf1 benchmarks. Here's a simple demonstration.
TPC-DS Query 99 ran with this command:
cargo run --profile release-nonlto --bin dfbench tpcds --query 99 --iterations 3 --path benchmarks/data/tpcds_sf1 --query_path datafusion/core/tests/tpc-ds --prefer_hash_join trueShows these numbers:
Query 99 iteration 0 took 5514.4 ms and returned 90 rows
Query 99 iteration 1 took 5303.4 ms and returned 90 rows
Query 99 iteration 2 took 5388.4 ms and returned 90 rows
Now, if I force all joins to be partitioned joins, by setting DATAFUSION_OPTIMIZER_REPARTITION_JOINS=true DATAFUSION_OPTIMIZER_HASH_JOIN_SINGLE_PARTITION_THRESHOLD=0 DATAFUSION_OPTIMIZER_HASH_JOIN_SINGLE_PARTITION_THRESHOLD_ROWS=0 on the same command gives me:
Query 99 iteration 0 took 204.3 ms and returned 90 rows
Query 99 iteration 1 took 106.5 ms and returned 90 rows
Query 99 iteration 2 took 103.8 ms and returned 90 rows
Nearly 52x speedup!
This query has 4 joins. The default optimizer makes all of them CollectLeft joins. The env vars I mentioned make all of them Partitioned hash joins. This is less than ideal, but it shows that the optimizer doesn't always get the join type right.
Overall numbers
I found these queries benefiting from always partitioning in TPC-DS:
1. tpcds q99 join_under_partitioned 27.971x (5785.86ms -> 206.85ms)
2. tpcds q62 join_under_partitioned 4.204x (626.49ms -> 149.01ms)
3. tpcds q6 join_under_partitioned 3.809x (840.52ms -> 220.69ms)
4. tpcds q68 join_under_partitioned 3.394x (341.73ms -> 100.68ms)
5. tpcds q27 join_under_partitioned 3.020x (399.90ms -> 132.41ms)
6. tpcds q36 join_under_partitioned 3.005x (303.32ms -> 100.94ms)
7. tpcds q46 join_under_partitioned 2.878x (299.31ms -> 104.00ms)
8. tpcds q37 join_under_partitioned 2.675x (253.45ms -> 94.74ms)
9. tpcds q61 join_under_partitioned 2.629x (273.10ms -> 103.88ms)
10. tpcds q4 agg_over_partitioned 1.955x (1197.97ms -> 612.73ms)
And these modest improvements in TPC-H:
- tpch q7 join_over_partitioned 1.204x (169.83ms -> 141.06ms)
- tpch q5 join_over_partitioned 1.165x (158.94ms -> 136.48ms)
- tpch q10 join_over_partitioned 1.106x (182.66ms -> 165.09ms)
- tpch q3 agg_over_partitioned 1.102x (140.40ms -> 127.46ms)
Note: these numbers are from a script that Codex CLI wrote. I have only manually confirmed the numbers for TPC-DS 99. The rest are unconfirmed.
Optimizer can and will get this wrong: even if we had a sophisticated cost-based join selection algorithm, it can still get these things wrong in some cases. We should instead make the execution robust to optimizer's plans.
Describe the solution you'd like
I propose that both HashJoinExec and AggregateExec should dynamically switch from CollectLeft/Final to Partitioned/FinalPartitioned mode based on certain metrics.
Hash join
HashJoinExecwould always ask for an input distribution of[SinglePartition, UnspecifiedDistribution](same asCollectLeftcurrently). At planning time, we do not insert any repartitions.- Once we have more than 1M rows (configurable) collected on the build-side (relevant code), we switch the mode.
- Internally, the
HashJoinExecwould need to repartition (hash-based) both the build and probe side inputs and set the mode toPartitionedfor the stream.- The repartitioning can be done in one of two ways:
- Literally insert
RepartitionExecs internal to HashJoinExec (so they won't show up in the tree, but the streams from below are wrapped) - Abstract out the relevant parts in
RepartitionExecand use them here.
- Literally insert
- The repartitioning can be done in one of two ways:
- One optimization we could do is to start re-partitioning as soon as the threshold is breached. Instead of waiting for all build-side rows to be accumulated.
- The output partitioning of this tree would always be
UnknownPartitioning(N). Even when it switches to partitioned mode, the parent operators cannot make use of this information at planning time. So they may try to repartition the data again. This is one trade-off compared to deciding the partitioning at planning time.
Aggregation
AggregateExecwould always ask for an input distribution of[SinglePartition](same asFinalmode currently). At planning time, we do not insert any repartitions.- This dynamic switching would only be done for agg with group-bys.
- Once we have accumulated more than 200k groups (configurable), we switch.
- Switching is more complex here since we need to do it mid-stream, there's no neat point like the build side finishing in join.
- Switching would be done in two steps:
- Extract out the partial results from the
GroupedHashAggregateStream. Similar behavior as calling theemitfn withspilling = truehere. We need to ensure that we get the intermediate state, and not the final results. - For all subsequent inputs and for the partial state we got from above, we need to repartition it. Same as in join, this can be done either by instantiating a
RepartitionExecinternally in the stream, or by abstracting out the hash repartitioning code and re-using it.- Note that we also need to repartition the intermediate state from above, before re-accumulating it.
- We also need to instantiate
target_partitionsnumber ofGroupedHashAggregateStreams here. - Outputs from all streams would need to be coalesced into one output stream. This is because we currently cannot change the output partitioning of a tree mid-execution.
- Extract out the partial results from the
- The output partitioning of this operator would always be single partitioned. Even when it switches to partitioned mode, the parent operators cannot make use of this information at planning time. So they may try to repartition the data again. This is one trade-off compared to deciding the partitioning at planning time.
Describe alternatives you've considered
- Make the optimizer better instead of handling this at execution time. This will not always work, but it can be made better than it currently is.
Additional context
Some large changes in agg (#20773) and join (#19789) may affect this.