Skip to content

Acceleration : Iceberg table compaction#3519

Open
Shekharrajak wants to merge 6 commits intoapache:mainfrom
Shekharrajak:feature/iceberg-compaction-benchmark
Open

Acceleration : Iceberg table compaction#3519
Shekharrajak wants to merge 6 commits intoapache:mainfrom
Shekharrajak:feature/iceberg-compaction-benchmark

Conversation

@Shekharrajak
Copy link
Contributor

Which issue does this PR close?

Ref #3371

PR Description

Rationale for this change

Iceberg table compaction using Spark's default rewriteDataFiles() action is slow due to Spark shuffle and task scheduling overhead. This PR adds native Rust-based compaction using DataFusion for direct Parquet read/write, achieving 1.5-1.8x speedup over Spark's default compaction.

What changes are included in this PR?

  • Native Rust compaction: DataFusion-based Parquet read/write via JNI ([iceberg_compaction_jni.rs]
  • Scala integration: CometNativeCompaction class that executes native compaction (Executes native scan + write via JNI) and commits via Iceberg Java API
  • Configuration: spark.comet.iceberg.compaction.enabled config option
  • Benchmark: TPC-H based compaction benchmark comparing Spark vs Native performance

How are these changes tested?

  • Unit tests in CometIcebergCompactionSuite covering:
    • Non-partitioned table compaction
    • Partitioned table compaction (bucket, truncate, date partitions)
    • Data correctness verification after compaction
  • TPC-H benchmark (CometIcebergTPCCompactionBenchmark) measuring performance on lineitem, orders, customer tables
  • Manual testing with SF1 TPC-H data showing:
    • lineitem (6M rows): 7.2s → 4.4s (1.6x)
    • orders (1.5M rows): 1.5s → 0.9s (1.8x)

@Shekharrajak Shekharrajak changed the title Feature/iceberg compaction benchmark Iceberg table compaction Feb 14, 2026
// under the License.

//! Iceberg Parquet writer operator for writing RecordBatches to Parquet files
//! with Iceberg-compatible metadata (DataFile structures).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion execution operator that writes Arrow RecordBatches to Parquet files with Iceberg-compatible metadata.

It enables native Rust to produce files that Iceberg's Java API can directly commit.
Metadata is serialized as JSON and passed back to JVM via JNI for commit.


//! JNI bridge for Iceberg compaction operations.
//!
//! This module provides JNI functions for native Iceberg compaction (scan + write).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JNI bridge that exposes native Rust compaction to Scala/JVM.

executeIcebergCompaction() | JNI entry point - reads Parquet files via DataFusion, writes compacted output


/// Configuration for Iceberg table metadata passed from JVM
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IcebergTableConfig {
Copy link
Contributor Author

@Shekharrajak Shekharrajak Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table metadata from JVM (identifier, warehouse, snapshot ID, file IO props)


logDebug(s"Executing native compaction with config: $configJson")

val resultJson = native.executeIcebergCompaction(configJson)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JNI entry point - reads Parquet files via DataFusion, writes compacted output


def isAvailable: Boolean = {
try {
val version = new Native().getIcebergCompactionVersion()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns native library version for compatibility checks

"Iceberg reflection failure: Failed to get filter expressions from SparkScan: " +
s"${e.getMessage}")
None
findMethodInHierarchy(scan.getClass, "filterExpressions").flatMap { filterExpressionsMethod =>
Copy link
Contributor Author

@Shekharrajak Shekharrajak Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we were assuming a fixed Iceberg class hierarchy, this findMethodInHierarchy walks up the class tree - better approach.

For compaction to work, we need to extract FileScanTask objects from the scan. Different Iceberg scan types expose tasks differently:

SparkBatchQueryScan -> tasks() method
SparkStagedScan -> taskGroups() method (returns groups, need to extract tasks from each)

@Shekharrajak Shekharrajak changed the title Iceberg table compaction Acceleration : Iceberg table compaction Feb 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant