Introduce morsel-driven Parquet scan#20481
Conversation
This PR implements morsel-driven execution for Parquet files in DataFusion, enabling row-group level work sharing across partitions to mitigate data skew. Key changes: - Introduced `WorkQueue` in `datafusion/datasource/src/file_stream.rs` for shared pool of work. - Added `morselize` method to `FileOpener` trait to allow dynamic splitting of files into morsels. - Implemented `morselize` for `ParquetOpener` to split files into individual row groups. - Cached `ParquetMetaData` in `ParquetMorsel` extensions to avoid redundant I/O. - Modified `FileStream` to support work stealing from the shared queue. - Implemented `Weak` pointer pattern for `WorkQueue` in `FileScanConfig` to support plan re-executability. - Added `MorselizingGuard` to ensure shared state consistency on cancellation. - Added `allow_morsel_driven` configuration option (enabled by default for Parquet). - Implemented row-group pruning during the morselization phase for better efficiency. Tests: - Added `parquet_morsel_driven_execution` test to verify work distribution and re-executability. - Added `parquet_morsel_driven_enabled_by_default` to verify the default configuration. Co-authored-by: Dandandan <163737+Dandandan@users.noreply.github.com>
|
run benchmarks |
|
🤖: Benchmark completed Details
|
This reverts commit 4fc8edd.
|
run benchmarks |
|
Benchmark job started for this request (job |
|
Benchmark job started for this request (job |
|
Benchmark job started for this request (job |
|
🤖 Benchmark running (GKE) | trigger |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagetpch — base (merge-base)
tpch — branch
|
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
|
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
|
|
run benchmark clickbench_extended |
|
Benchmark job started for this request (job |
|
🤖 Benchmark running (GKE) | trigger |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usageclickbench_extended — base (merge-base)
clickbench_extended — branch
|
|
run benchmark clickbench_pushdown |
|
🤖 Benchmark running (GKE) | trigger |
…en-execution-237164415184908839 # Conflicts: # datafusion/datasource-parquet/src/opener.rs # datafusion/sqllogictest/test_files/clickbench.slt
…am/main - Add missing import for reverse_row_selection in opener.rs - Update clickbench.slt plan expectations for ProjectionExec optimization changes - Update projection_pushdown.slt for predicate pushdown in DataSourceExec - Update encrypted_parquet.slt for query that now succeeds Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usageclickbench_pushdown — base (merge-base)
clickbench_pushdown — branch
|
|
run benchmark clickbench_pushdown |
|
🤖 Benchmark running (GKE) | trigger |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usageclickbench_pushdown — base (merge-base)
clickbench_pushdown — branch
|
|
Seems like morselization is even better w/ added latency?? |
Yeah still seems to improve things greatly! However I think the relative improvement is smaller, because of lower utilization, perhaps also a bit due to somewhat smaller request sizes / more requests (and having no prefetching yet in place). |
|
Oh wait - the relative improvement seems bigger still! 15% to 22% 🤯 I think it is mainly because of the added skew (from different request latencies). Nice to see this confirmed as well using this bench @alamb |
|
The results seem very promising! I'm curious at what point you think this will be ready for final review and inclusion? Right now it seems to be still in the development phase? |
I am trying to make a "production ready" version here I am hoping it will be ready / show the same improvements this PR does "soon" (friday hopefully) |
|
Which issue does this PR close?
TODO:
executestream is dropped / not shared between two concurrent instances (major design issue)Rationale for this change
Current parelllization of Parquet scan is bounded by the thread that has the most data / is the slowest to execute, which means in the case of data skew (driven by either larger partitions or less selective filters during pruning / filter pushdown..., variable object store latency), the parallelism will be significantly limited.
We can change the strategy by morsel-driven parallelism like described in https://db.in.tum.de/~leis/papers/morsels.pdf.
Doing so is faster for a lot of queries, when there is an amount of skew (such as clickbench) and we have enough row filters to spread out the work.
For clickbench_partitioned / clickbench_pushdown it seems up to ~2x as fast for some queries, on a 10 core machine.
It seems to have almost no regressions, perhaps some due to different file scanning order(?) - so different statistics that can be used to prune and thus some minor variation.
Morsel-Driven Execution Architecture (partly claude-generated)
This branch implements a morsel-driven execution model for Parquet scans, based on the concept
from the Morsel-Driven Parallelism paper (Leis et al.). The core idea: instead of statically
assigning files to partitions, all work is pooled in a shared queue that all partition streams pull
from dynamically.
The Problem It Solves
In the traditional model, partition 0 might get a 1 GB file while partition 1 gets nothing --
partition 1 idles while 0 is busy. Currently we already try to statically spread out work to n partitions / threads based on stats (which works very well on perfectly distributed scans on SSDs (e.g. TPCH running locally), this doesn't work well when there is any data skew caused by any of those:
Morsel-driven execution prevents this by sharing work dynamically.
Key Types
ParquetMorsel--datafusion/datasource-parquet/src/opener.rs:129A morsel = one row group of a Parquet file. Stored as an extension on
PartitionedFile.WorkQueue--datafusion/datasource/src/file_stream.rs:410The shared, thread-safe queue. Each partition stream calls
pull()which returns:Work(file)-- here's a file/morsel to processWait-- queue is empty but workers are still morselizing (wait for notification)Done-- all work consumedMorselState--datafusion/datasource/src/source.rs:240Tracks the shared queue lifecycle. A new queue is created once per execution cycle when all
partition streams have opened.
MorselizingGuard--datafusion/datasource/src/file_stream.rs:49RAII wrapper that atomically decrements
morselizing_countwhen a worker finishes -- enablingWorkStatus::WaitvsDonedecisions.FileOpenerTrait Extension --datafusion/datasource/src/file_stream.rs:498A new
morselize()method is added toFileOpener. The default implementation is a no-op(returns the file as-is).
ParquetOpeneroverrides it to split files by row group.ParquetOpener::morselize()atopener.rs:232:Arcacross all resulting morsels)PartitionedFileper surviving row group, each carrying aParquetMorselextensionFileStreamState Machine --datafusion/datasource/src/file_stream.rs:141The morsel-driven path adds two new states (
MorselizingandWaiting):Configuration
datafusion.execution.parquet.allow_morsel_driven -- datafusion/common/src/config.rs:748
Default: true. Can be disabled per-session.
FileScanConfig::morsel_driven -- datafusion/datasource/src/file_scan_config.rs:211
Automatically disabled when:
partitioned_by_file_group = true(breaks hash-partitioning guarantees)preserve_order = true(breaks SortPreservingMerge guarantees)Benchmark results
Summary: both clickbench, clickbench_partitioned
Details
Acknowledgements
I heavily used AI (Jules / Claude) for this PR, but reviewed the code myself