feat: Native columnar to row conversion (Phase 2)#3266
feat: Native columnar to row conversion (Phase 2)#3266andygrove merged 71 commits intoapache:mainfrom
Conversation
Adds a dev script that automates regenerating golden files for the CometTPCDSV1_4_PlanStabilitySuite and CometTPCDSV2_7_PlanStabilitySuite tests across all supported Spark versions (3.4, 3.5, 4.0). The script verifies JDK 17+ is configured (required for Spark 4.0) and supports regenerating for a specific Spark version or all versions. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This PR adds an experimental native (Rust-based) implementation of ColumnarToRowExec that converts Arrow columnar data to Spark UnsafeRow format. Benefits over the current Scala implementation: - Zero-copy for variable-length types: String and Binary data is written directly to the output buffer without intermediate Java object allocation - Vectorized processing: The native implementation processes data in a columnar fashion, improving CPU cache utilization - Reduced GC pressure: All conversion happens in native memory, avoiding the creation of temporary Java objects that would need garbage collection - Buffer reuse: The output buffer is allocated once and reused across batches, minimizing memory allocation overhead The feature is disabled by default and can be enabled by setting: spark.comet.exec.columnarToRow.native.enabled=true Supported data types: - Primitive types: Boolean, Byte, Short, Int, Long, Float, Double - Date and Timestamp (microseconds) - Decimal (both inline precision<=18 and variable-length precision>18) - String and Binary - Complex types: Struct, Array, Map (nested) This is an experimental feature for evaluation and benchmarking purposes. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Spark's UnsafeArrayData uses the actual primitive size for elements (e.g., 4 bytes for INT32), not always 8 bytes like UnsafeRow fields. This fix: - Added get_element_size() to determine correct sizes for each type - Added write_array_element() to write values with type-specific widths - Updated write_list_data() and write_map_data() to use correct sizes - Added LargeUtf8/LargeBinary support for struct fields - Added comprehensive test suite (CometNativeColumnarToRowSuite) - Updated compatibility documentation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add a fuzz test using FuzzDataGenerator to test the native columnar to row conversion with randomly generated schemas containing arrays, structs, and maps. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add tests verifying that native columnar to row conversion correctly handles complex nested types: - Array<Array<Int>> - Map<String, Array<Int>> - Struct<Array<Map<String, Int>>, String> These tests confirm the recursive conversion logic works for arbitrary nesting depth. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add a fuzz test using FuzzDataGenerator.generateNestedSchema to test native columnar to row conversion with deeply nested random schemas (depth 1-3, with arrays, structs, and maps). The test uses only primitive types supported by native C2R (excludes TimestampNTZType which is not yet supported). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use actual array type for dispatching instead of schema type to handle type mismatches between serialized schema and FFI arrays - Add support for LargeList (64-bit offsets) arrays - Replace .unwrap() with proper error handling to provide clear error messages instead of panics - Add tests for LargeList handling Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
When Parquet data is read, string columns may be dictionary-encoded for efficiency. The schema says Utf8 but the actual Arrow array is Dictionary(Int32, Utf8). This caused a type mismatch error. - Add support for Dictionary-encoded arrays in get_variable_length_data - Handle all common key types (Int8, Int16, Int32, Int64, UInt8-64) - Support Utf8, LargeUtf8, Binary, and LargeBinary value types - Add tests for dictionary-encoded string arrays Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add CometColumnarToRowBenchmark to compare performance of: - Spark's default ColumnarToRowExec - Comet's JVM-based CometColumnarToRowExec - Comet's Native CometNativeColumnarToRowExec Benchmark covers: - Primitive types (int, long, double, string, boolean, date) - String-heavy workloads (short, medium, long strings) - Struct types (simple, nested, deeply nested) - Array types (primitives and strings) - Map types (various key/value combinations) - Complex nested types (arrays of structs, maps with arrays) - Wide rows (50 columns of mixed types) Run with: SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometColumnarToRowBenchmark Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The native columnar-to-row conversion was allocating intermediate Vec<u8> for every variable-length field (strings, binary). This change: - Adds write_variable_length_to_buffer() that writes directly to the output buffer instead of returning a Vec - Adds write_dictionary_to_buffer() functions for dictionary-encoded arrays - Adds #[inline] hints to hot-path functions - Removes intermediate allocations for Utf8, LargeUtf8, Binary, LargeBinary Benchmark results for String Types: - Before: Native was slower than Spark - After: Native matches Spark (1.0X) Primitive types and complex nested types (struct, array, map) still have overhead from JNI/FFI and remaining intermediate allocations. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Inspired by Velox UnsafeRowFast, add optimizations for all-fixed-width schemas: - Add is_fixed_width() and is_all_fixed_width() detection functions - Add convert_fixed_width() fast path that: - Pre-allocates entire buffer at once (row_size * num_rows) - Pre-fills offsets/lengths arrays (constant row size) - Processes column-by-column for better cache locality - Add write_column_fixed_width() for type-specific column processing - Add tests for fixed-width fast path detection Limitations: - UnsafeRow format stores 8-byte fields per row (not columnar), so bulk memcpy of entire columns is not possible - JNI/FFI boundary crossing still has overhead - The "primitive types" benchmark includes strings, so it doesn't trigger the fixed-width fast path For schemas with only fixed-width columns (no strings, arrays, maps, structs), this reduces allocations and improves cache locality. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add fixedWidthOnlyBenchmark() with only fixed-width types (no strings) to test the native C2R fast path that pre-allocates buffers - Refactor all benchmark methods to use addC2RBenchmarkCases() helper, reducing ~110 lines of duplicated code Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…e allocations - Add direct-write functions (write_struct_to_buffer, write_list_to_buffer, write_map_to_buffer) that write directly to output buffer - Remove legacy functions that returned intermediate Vec<u8> objects - Eliminates memory allocation per complex type value Benchmark improvements: - Struct: 604ms → 330ms (1.8x faster) - Array: 580ms → 410ms (1.4x faster) - Map: 1141ms → 705ms (1.6x faster) - Complex Nested: 1434ms → 798ms (1.8x faster) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add memcpy-style bulk copying for arrays of primitive types without nulls. When array elements are fixed-width primitives (Int8, Int16, Int32, Int64, Float32, Float64, Date32, Timestamp) and have no null values, copy the entire values buffer at once instead of iterating element by element. Benchmark improvement for Array Types: - Before: 410ms (0.5X of Spark) - After: 301ms (0.7X of Spark) - 27% faster Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Move type dispatch outside the inner row loop by pre-downcasting all arrays to typed variants before processing. This eliminates the O(rows * columns * type_dispatch_cost) overhead in the general path. Adds TypedArray enum with variants for all supported types, with methods for null checking, fixed-value extraction, and variable-length writing that operate directly on concrete array types. Benchmark improvements: - Primitive Types: 201ms → 126ms (37% faster, 0.5X → 0.7X) - String Types: 164ms → 120ms (27% faster, 1.0X → 1.4X) - Wide Rows: 1242ms → 737ms (41% faster, 0.6X → 1.0X) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use correct Arrow array types for bulk copy (Date32Array instead of Int32Array, TimestampMicrosecondArray instead of Int64Array) - Add Boolean array support to bulk copy path (element-by-element but still avoiding type dispatch overhead) - Enable bulk copy for arrays with nulls - copy values buffer then set null bits separately (null slots contain garbage but won't be read) - Restore fixed-width value writing in slow path for unsupported types (e.g., Decimal128 in arrays) This fixes the fuzz test failure where Date32 arrays in maps were producing incorrect values due to failed downcast falling through to an incomplete slow path. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Implements Velox-style optimizations for array and map conversion: 1. **TypedElements enum**: Pre-downcast element arrays once to avoid type dispatch in inner loops 2. **Direct offset access**: Use ListArray/MapArray offsets directly instead of calling value(row_idx) which allocates a sliced ArrayRef 3. **Range-based bulk copy**: Copy element ranges directly from the underlying values buffer using pointer arithmetic Benchmark improvements: - Array Types: 274ms → 163ms (40% faster, 0.8X → 1.4X) - Map Types: 605ms → 292ms (52% faster, 0.6X → 1.4X) - Complex Nested: 701ms → 410ms (42% faster, 0.6X → 1.2X) Native C2R now matches or beats Comet JVM for array/map types. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Remove Vec allocation overhead by using inline type dispatch for struct fields instead of pre-collecting into a Vec<TypedElements>. This improves struct type performance from 357ms to 272ms (24% faster). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Pre-downcast all struct field columns into TypedElements at batch initialization time (in TypedArray::from_array). This eliminates per-row type dispatch overhead for struct fields. Performance improvement for struct types: - Before: 272ms (0.8X of Spark) - After: 220ms (1.0X of Spark, matching Spark performance) The pre-downcast pattern is now consistently applied to: - Top-level columns (TypedArray) - Array/List elements (TypedElements) - Map keys/values (TypedElements) - Struct fields (TypedElements) - NEW Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Pre-compute variable-length column indices once per batch instead of calling is_variable_length() for every column in every row. In pass 2, only iterate over variable-length columns using the pre-computed indices. Also skip writing placeholder values for variable-length columns in pass 1, since they will be overwritten in pass 2. Performance improvement for primitive types (mixed with strings): - Before: 131ms (0.8X of Spark) - After: ~114ms (0.9X of Spark) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3266 +/- ##
============================================
+ Coverage 56.12% 59.95% +3.82%
- Complexity 976 1473 +497
============================================
Files 119 175 +56
Lines 11743 16167 +4424
Branches 2251 2682 +431
============================================
+ Hits 6591 9693 +3102
- Misses 4012 5126 +1114
- Partials 1140 1348 +208 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- Add downcast_array! macro for consistent error messages - Add write_bytes_padded helper for 8-byte aligned writes - Refactor TypedArray::from_array to use macro (removed unused _schema_type param) - Refactor TypedArray::write_variable_to_buffer to use helper - Refactor TypedElements::write_variable_value to use helper (removed unused base_offset param) - Refactor write_nested_variable_to_buffer to use macro and helper (removed unused _base_offset param) - Refactor write_dictionary_to_buffer_with_key to use macro and helper - Refactor array element writing in TypedElements (Decimal128, String, LargeString, Binary, LargeBinary) - Fix write_struct_to_buffer to use downcast_array! macro instead of unsafe unwrap - Fix Float32 sign extension bug: use direct cast to i64 instead of intermediate i32 cast Reduces file by ~380 lines while maintaining functionality. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala
Show resolved
Hide resolved
Close the ColumnarBatch after the row iterator is fully consumed to ensure proper cleanup of Arrow resources. The batch is closed via a wrapper iterator that calls batch.close() when hasNext returns false. This fix applies to both the regular doExecute() path and the broadcast execution path. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
@wForget I've now added the fix for closing the The issue was that after calling new Iterator[InternalRow] {
override def hasNext: Boolean = {
val hasMore = result.hasNext
if (!hasMore) {
batch.close()
}
hasMore
}
override def next(): InternalRow = result.next()
}This fix is applied to both the |
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala
Show resolved
Hide resolved
Add several macros to eliminate repetitive code patterns: - impl_is_null!: generates is_null match arms for TypedArray/TypedElements - typed_elements_from_primitive!: generates downcast patterns - write_fixed_column_primitive!: handles downcast + loop in write_column_fixed_width - get_field_value_primitive!: handles downcast + value extraction - extract_fixed_value!: generates fixed-width value extraction for structs Net reduction of ~226 lines while maintaining identical behavior. All 18 columnar_to_row unit tests pass. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Enables and hardens the native (Rust) columnar-to-row conversion path, and turns it on for the Comet test suite while keeping runtime defaults unchanged.
Changes:
- Enable
CometNativeColumnarToRowExecin test configs and plan checks, updating affected tests. - Add broadcast support and additional batch handling to
CometNativeColumnarToRowExec. - Extend native C2R (Rust) to handle more Arrow edge-cases (dictionary unpacking, null arrays, FixedSizeBinary, small-precision decimal representations) and refactor conversion internals.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| spark/src/test/scala/org/apache/spark/sql/comet/CometPlanChecker.scala | Allow native C2R exec in plan validation for tests. |
| spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | Enable native columnar-to-row in Comet tests via SparkConf. |
| spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | Update broadcast/AQE assertions to accept native or JVM C2R. |
| spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala | Update plan shape expectations for native C2R in tests. |
| spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala | Implement doExecuteBroadcast and improve batch lifecycle handling. |
| spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala | Select native vs JVM C2R with a mutable-buffer scan fallback. |
| native/core/src/execution/columnar_to_row.rs | Add missing type/encoding handling and refactor downcasting/writing logic. |
| common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala | Accept Arrow NullVector during batch serialization. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala
Show resolved
Hide resolved
|
Thanks for the review @wForget! |
Which issue does this PR close?
Part of #3268
Follow on from #3221
Rationale for this change
Enable
CometNativeColumnarToRowExecin Comet tests, but still disabled by default.What changes are included in this PR?
How are these changes tested?