Acceleration : Iceberg table compaction#3519
Acceleration : Iceberg table compaction#3519Shekharrajak wants to merge 6 commits intoapache:mainfrom
Conversation
| // under the License. | ||
|
|
||
| //! Iceberg Parquet writer operator for writing RecordBatches to Parquet files | ||
| //! with Iceberg-compatible metadata (DataFile structures). |
There was a problem hiding this comment.
DataFusion execution operator that writes Arrow RecordBatches to Parquet files with Iceberg-compatible metadata.
It enables native Rust to produce files that Iceberg's Java API can directly commit.
Metadata is serialized as JSON and passed back to JVM via JNI for commit.
|
|
||
| //! JNI bridge for Iceberg compaction operations. | ||
| //! | ||
| //! This module provides JNI functions for native Iceberg compaction (scan + write). |
There was a problem hiding this comment.
JNI bridge that exposes native Rust compaction to Scala/JVM.
executeIcebergCompaction() | JNI entry point - reads Parquet files via DataFusion, writes compacted output
|
|
||
| /// Configuration for Iceberg table metadata passed from JVM | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct IcebergTableConfig { |
There was a problem hiding this comment.
Table metadata from JVM (identifier, warehouse, snapshot ID, file IO props)
|
|
||
| logDebug(s"Executing native compaction with config: $configJson") | ||
|
|
||
| val resultJson = native.executeIcebergCompaction(configJson) |
There was a problem hiding this comment.
JNI entry point - reads Parquet files via DataFusion, writes compacted output
|
|
||
| def isAvailable: Boolean = { | ||
| try { | ||
| val version = new Native().getIcebergCompactionVersion() |
There was a problem hiding this comment.
Returns native library version for compatibility checks
| "Iceberg reflection failure: Failed to get filter expressions from SparkScan: " + | ||
| s"${e.getMessage}") | ||
| None | ||
| findMethodInHierarchy(scan.getClass, "filterExpressions").flatMap { filterExpressionsMethod => |
There was a problem hiding this comment.
previously we were assuming a fixed Iceberg class hierarchy, this findMethodInHierarchy walks up the class tree - better approach.
For compaction to work, we need to extract FileScanTask objects from the scan. Different Iceberg scan types expose tasks differently:
SparkBatchQueryScan -> tasks() method
SparkStagedScan -> taskGroups() method (returns groups, need to extract tasks from each)
Which issue does this PR close?
Ref #3371
PR Description
Rationale for this change
Iceberg table compaction using Spark's default
rewriteDataFiles()action is slow due to Spark shuffle and task scheduling overhead. This PR adds native Rust-based compaction using DataFusion for direct Parquet read/write, achieving 1.5-1.8x speedup over Spark's default compaction.What changes are included in this PR?
CometNativeCompactionclass that executes native compaction (Executes native scan + write via JNI) and commits via Iceberg Java APIspark.comet.iceberg.compaction.enabledconfig optionHow are these changes tested?
CometIcebergCompactionSuitecovering:CometIcebergTPCCompactionBenchmark) measuring performance on lineitem, orders, customer tables