Skip to content

Dynamic switching of non-partitioned to partitioned for join and aggregation #20847

@Samyak2

Description

@Samyak2

Is your feature request related to a problem or challenge?

The problem shows up even in our tpcds_sf1 benchmarks. Here's a simple demonstration.

TPC-DS Query 99 ran with this command:

cargo run --profile release-nonlto --bin dfbench tpcds --query 99 --iterations 3 --path benchmarks/data/tpcds_sf1 --query_path datafusion/core/tests/tpc-ds --prefer_hash_join true

Shows these numbers:

Query 99 iteration 0 took 5514.4 ms and returned 90 rows
Query 99 iteration 1 took 5303.4 ms and returned 90 rows
Query 99 iteration 2 took 5388.4 ms and returned 90 rows

Now, if I force all joins to be partitioned joins, by setting DATAFUSION_OPTIMIZER_REPARTITION_JOINS=true DATAFUSION_OPTIMIZER_HASH_JOIN_SINGLE_PARTITION_THRESHOLD=0 DATAFUSION_OPTIMIZER_HASH_JOIN_SINGLE_PARTITION_THRESHOLD_ROWS=0 on the same command gives me:

Query 99 iteration 0 took 204.3 ms and returned 90 rows
Query 99 iteration 1 took 106.5 ms and returned 90 rows
Query 99 iteration 2 took 103.8 ms and returned 90 rows

Nearly 52x speedup!

This query has 4 joins. The default optimizer makes all of them CollectLeft joins. The env vars I mentioned make all of them Partitioned hash joins. This is less than ideal, but it shows that the optimizer doesn't always get the join type right.

Overall numbers

I found these queries benefiting from always partitioning in TPC-DS:

  1. tpcds q99 join_under_partitioned 27.971x (5785.86ms -> 206.85ms)
  2. tpcds q62 join_under_partitioned 4.204x (626.49ms -> 149.01ms)
  3. tpcds q6 join_under_partitioned 3.809x (840.52ms -> 220.69ms)
  4. tpcds q68 join_under_partitioned 3.394x (341.73ms -> 100.68ms)
  5. tpcds q27 join_under_partitioned 3.020x (399.90ms -> 132.41ms)
  6. tpcds q36 join_under_partitioned 3.005x (303.32ms -> 100.94ms)
  7. tpcds q46 join_under_partitioned 2.878x (299.31ms -> 104.00ms)
  8. tpcds q37 join_under_partitioned 2.675x (253.45ms -> 94.74ms)
  9. tpcds q61 join_under_partitioned 2.629x (273.10ms -> 103.88ms)
  10. tpcds q4 agg_over_partitioned 1.955x (1197.97ms -> 612.73ms)

And these modest improvements in TPC-H:

  - tpch q7 join_over_partitioned 1.204x (169.83ms -> 141.06ms)
  - tpch q5 join_over_partitioned 1.165x (158.94ms -> 136.48ms)
  - tpch q10 join_over_partitioned 1.106x (182.66ms -> 165.09ms)
  - tpch q3 agg_over_partitioned 1.102x (140.40ms -> 127.46ms)

Note: these numbers are from a script that Codex CLI wrote. I have only manually confirmed the numbers for TPC-DS 99. The rest are unconfirmed.

Optimizer can and will get this wrong: even if we had a sophisticated cost-based join selection algorithm, it can still get these things wrong in some cases. We should instead make the execution robust to optimizer's plans.

Describe the solution you'd like

I propose that both HashJoinExec and AggregateExec should dynamically switch from CollectLeft/Final to Partitioned/FinalPartitioned mode based on certain metrics.

Hash join

  • HashJoinExec would always ask for an input distribution of [SinglePartition, UnspecifiedDistribution] (same as CollectLeft currently). At planning time, we do not insert any repartitions.
  • Once we have more than 1M rows (configurable) collected on the build-side (relevant code), we switch the mode.
  • Internally, the HashJoinExec would need to repartition (hash-based) both the build and probe side inputs and set the mode to Partitioned for the stream.
    • The repartitioning can be done in one of two ways:
      • Literally insert RepartitionExecs internal to HashJoinExec (so they won't show up in the tree, but the streams from below are wrapped)
      • Abstract out the relevant parts in RepartitionExec and use them here.
  • One optimization we could do is to start re-partitioning as soon as the threshold is breached. Instead of waiting for all build-side rows to be accumulated.
  • The output partitioning of this tree would always be UnknownPartitioning(N). Even when it switches to partitioned mode, the parent operators cannot make use of this information at planning time. So they may try to repartition the data again. This is one trade-off compared to deciding the partitioning at planning time.

Aggregation

  • AggregateExec would always ask for an input distribution of [SinglePartition] (same as Final mode currently). At planning time, we do not insert any repartitions.
  • This dynamic switching would only be done for agg with group-bys.
  • Once we have accumulated more than 200k groups (configurable), we switch.
  • Switching is more complex here since we need to do it mid-stream, there's no neat point like the build side finishing in join.
  • Switching would be done in two steps:
    • Extract out the partial results from the GroupedHashAggregateStream. Similar behavior as calling the emit fn with spilling = true here. We need to ensure that we get the intermediate state, and not the final results.
    • For all subsequent inputs and for the partial state we got from above, we need to repartition it. Same as in join, this can be done either by instantiating a RepartitionExec internally in the stream, or by abstracting out the hash repartitioning code and re-using it.
      • Note that we also need to repartition the intermediate state from above, before re-accumulating it.
      • We also need to instantiate target_partitions number of GroupedHashAggregateStreams here.
      • Outputs from all streams would need to be coalesced into one output stream. This is because we currently cannot change the output partitioning of a tree mid-execution.
  • The output partitioning of this operator would always be single partitioned. Even when it switches to partitioned mode, the parent operators cannot make use of this information at planning time. So they may try to repartition the data again. This is one trade-off compared to deciding the partitioning at planning time.

Describe alternatives you've considered

  • Make the optimizer better instead of handling this at execution time. This will not always work, but it can be made better than it currently is.

Additional context

Some large changes in agg (#20773) and join (#19789) may affect this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions