From 50f2414b26e59f9af090376a39001f84a78b0778 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 07:32:37 -0600 Subject: [PATCH 1/9] feat: enforce schema evolution config in native_datafusion scan Pass schema_evolution_enabled config through protobuf to the native Parquet reader. When disabled, the SparkPhysicalExprAdapter rejects type mismatches between file and table schemas at runtime, matching Spark's behavior of throwing errors on incompatible types. --- native/core/src/execution/planner.rs | 1 + native/core/src/parquet/mod.rs | 1 + native/core/src/parquet/parquet_exec.rs | 4 + native/core/src/parquet/parquet_support.rs | 5 + native/core/src/parquet/schema_adapter.rs | 101 +++++++++++++++++- native/proto/src/proto/operator.proto | 1 + .../serde/operator/CometNativeScan.scala | 2 + 7 files changed, 114 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 15bbabe883..7d3b6af660 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1213,6 +1213,7 @@ impl PhysicalPlanner { common.case_sensitive, self.session_ctx(), common.encryption_enabled, + common.schema_evolution_enabled, )?; Ok(( vec![], diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index f2b0e80ab2..b0588d8737 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -775,6 +775,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat case_sensitive != JNI_FALSE, session_ctx, encryption_enabled, + true, // schema_evolution_enabled (always true for iceberg_compat) )?; let partition_index: usize = 0; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 2d970734bb..37f970c5c0 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -72,12 +72,14 @@ pub(crate) fn init_datasource_exec( case_sensitive: bool, session_ctx: &Arc, encryption_enabled: bool, + schema_evolution_enabled: bool, ) -> Result, ExecutionError> { let (table_parquet_options, spark_parquet_options) = get_options( session_timezone, case_sensitive, &object_store_url, encryption_enabled, + schema_evolution_enabled, ); // Determine the schema and projection to use for ParquetSource. @@ -181,6 +183,7 @@ fn get_options( case_sensitive: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, + schema_evolution_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { let mut table_parquet_options = TableParquetOptions::new(); table_parquet_options.global.pushdown_filters = true; @@ -190,6 +193,7 @@ fn get_options( SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false); spark_parquet_options.allow_cast_unsigned_ints = true; spark_parquet_options.case_sensitive = case_sensitive; + spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index e7ff5630f1..e46d1bf233 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -76,6 +76,9 @@ pub struct SparkParquetOptions { pub use_legacy_date_timestamp_or_ntz: bool, // Whether schema field names are case sensitive pub case_sensitive: bool, + /// Whether schema evolution (type promotion) is enabled. When false, the adapter + /// should reject type mismatches between file schema and table schema. + pub schema_evolution_enabled: bool, } impl SparkParquetOptions { @@ -88,6 +91,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + schema_evolution_enabled: true, } } @@ -100,6 +104,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + schema_evolution_enabled: true, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 0ad61df426..2e846f6e4c 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -19,7 +19,7 @@ use crate::parquet::cast_column::CometCastColumnExpr; use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion::common::Result as DataFusionResult; +use datafusion::common::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_expr::expressions::Column; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; @@ -95,12 +95,61 @@ fn remap_physical_schema_names( Arc::new(Schema::new(remapped_fields)) } +/// Check if the logical (table) schema and physical (file) schema have type +/// mismatches. Returns an error message describing the first mismatch found, +/// or None if all types match. +fn detect_schema_mismatch( + logical_schema: &SchemaRef, + physical_schema: &SchemaRef, + case_sensitive: bool, +) -> Option { + for logical_field in logical_schema.fields() { + let physical_field = if case_sensitive { + physical_schema + .fields() + .iter() + .find(|f| f.name() == logical_field.name()) + } else { + physical_schema + .fields() + .iter() + .find(|f| f.name().to_lowercase() == logical_field.name().to_lowercase()) + }; + if let Some(physical_field) = physical_field { + if logical_field.data_type() != physical_field.data_type() { + return Some(format!( + "Parquet column cannot be converted. \ + Column: [{}], Expected: {}, Found: {} \ + (schema evolution is disabled)", + logical_field.name(), + logical_field.data_type(), + physical_field.data_type() + )); + } + } + } + None +} + impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { fn create( &self, logical_file_schema: SchemaRef, physical_file_schema: SchemaRef, ) -> Arc { + // When schema evolution is disabled, check for type mismatches between the + // logical (table) schema and the physical (file) schema. If any column has + // a different type, store the error to be raised during rewrite(). + let schema_mismatch_error = if !self.parquet_options.schema_evolution_enabled { + detect_schema_mismatch( + &logical_file_schema, + &physical_file_schema, + self.parquet_options.case_sensitive, + ) + } else { + None + }; + // When case-insensitive, remap physical schema field names to match logical // field names. The DefaultPhysicalExprAdapter uses exact name matching, so // without this remapping, columns like "a" won't match logical "A" and will @@ -154,6 +203,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { default_values: self.default_values.clone(), default_adapter, logical_to_physical_names, + schema_mismatch_error, }) } } @@ -183,10 +233,18 @@ struct SparkPhysicalExprAdapter { /// physical names so that downstream reassign_expr_columns can find /// columns in the actual stream schema. logical_to_physical_names: Option>, + /// When schema evolution is disabled and file/table types differ, this + /// holds the error message to return from rewrite(). + schema_mismatch_error: Option, } impl PhysicalExprAdapter for SparkPhysicalExprAdapter { fn rewrite(&self, expr: Arc) -> DataFusionResult> { + // When schema evolution is disabled and types differ, reject the read + if let Some(err) = &self.schema_mismatch_error { + return Err(DataFusionError::Plan(err.clone())); + } + // First let the default adapter handle column remapping, missing columns, // and simple scalar type casts. Then replace DataFusion's CastColumnExpr // with Spark-compatible equivalents. @@ -496,11 +554,51 @@ mod test { Ok(()) } + #[tokio::test] + async fn parquet_schema_mismatch_rejected_when_evolution_disabled() { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) + as Arc; + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names]).unwrap(); + + // Read as Int64 (widening) with schema evolution disabled + let required_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ])); + + let result = + roundtrip_with_schema_evolution(&batch, required_schema.clone(), false).await; + assert!(result.is_err(), "Expected error when schema evolution is disabled"); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("schema evolution is disabled"), + "Error should mention schema evolution: {err_msg}" + ); + + // Same read with schema evolution enabled should succeed + let result = roundtrip_with_schema_evolution(&batch, required_schema, true).await; + assert!(result.is_ok(), "Expected success when schema evolution is enabled"); + } + /// Create a Parquet file containing a single batch and then read the batch back using /// the specified required_schema. This will cause the PhysicalExprAdapter code to be used. async fn roundtrip( batch: &RecordBatch, required_schema: SchemaRef, + ) -> Result { + roundtrip_with_schema_evolution(batch, required_schema, true).await + } + + async fn roundtrip_with_schema_evolution( + batch: &RecordBatch, + required_schema: SchemaRef, + schema_evolution_enabled: bool, ) -> Result { let filename = get_temp_filename(); let filename = filename.as_path().as_os_str().to_str().unwrap().to_string(); @@ -513,6 +611,7 @@ mod test { let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); spark_parquet_options.allow_cast_unsigned_ints = true; + spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled; // Create expression adapter factory for Spark-compatible schema adaptation let expr_adapter_factory: Arc = Arc::new( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 4afc1fefb7..655f5eb498 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -100,6 +100,7 @@ message NativeScanCommon { bool encryption_enabled = 11; string source = 12; repeated spark.spark_expression.DataType fields = 13; + bool schema_evolution_enabled = 14; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index d5d075760f..9354a37d33 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -174,6 +174,8 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava) commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) + commonBuilder.setSchemaEvolutionEnabled( + CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get(scan.conf)) // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState From 81f258791952a0892eb9c65c4309df1f0f0e570d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 07:50:09 -0600 Subject: [PATCH 2/9] style: apply cargo fmt --- native/core/src/parquet/schema_adapter.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 2e846f6e4c..c4db35976a 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -572,9 +572,11 @@ mod test { Field::new("name", DataType::Utf8, false), ])); - let result = - roundtrip_with_schema_evolution(&batch, required_schema.clone(), false).await; - assert!(result.is_err(), "Expected error when schema evolution is disabled"); + let result = roundtrip_with_schema_evolution(&batch, required_schema.clone(), false).await; + assert!( + result.is_err(), + "Expected error when schema evolution is disabled" + ); let err_msg = result.unwrap_err().to_string(); assert!( err_msg.contains("schema evolution is disabled"), @@ -583,7 +585,10 @@ mod test { // Same read with schema evolution enabled should succeed let result = roundtrip_with_schema_evolution(&batch, required_schema, true).await; - assert!(result.is_ok(), "Expected success when schema evolution is enabled"); + assert!( + result.is_ok(), + "Expected success when schema evolution is enabled" + ); } /// Create a Parquet file containing a single batch and then read the batch back using From 836df2da562f1a0b26e64b73c9b960c34503a7df Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 08:04:57 -0600 Subject: [PATCH 3/9] fix: exclude timestamp timezone differences from schema mismatch check Timestamp timezone-only differences (e.g., Timestamp(us, "UTC") vs Timestamp(us, None)) are metadata-only relabelings handled by the existing CometCastColumnExpr, not real schema evolution. --- native/core/src/parquet/schema_adapter.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index c4db35976a..397c9db9f9 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -95,6 +95,16 @@ fn remap_physical_schema_names( Arc::new(Schema::new(remapped_fields)) } +/// Returns true if the two types differ only in timestamp timezone metadata +/// (e.g., Timestamp(us, Some("UTC")) vs Timestamp(us, None)). This is not a +/// real schema evolution — the adapter handles it as a metadata-only relabeling. +fn is_timestamp_timezone_only_diff(logical: &DataType, physical: &DataType) -> bool { + matches!( + (logical, physical), + (DataType::Timestamp(lu, _), DataType::Timestamp(pu, _)) if lu == pu + ) +} + /// Check if the logical (table) schema and physical (file) schema have type /// mismatches. Returns an error message describing the first mismatch found, /// or None if all types match. @@ -116,7 +126,12 @@ fn detect_schema_mismatch( .find(|f| f.name().to_lowercase() == logical_field.name().to_lowercase()) }; if let Some(physical_field) = physical_field { - if logical_field.data_type() != physical_field.data_type() { + if logical_field.data_type() != physical_field.data_type() + && !is_timestamp_timezone_only_diff( + logical_field.data_type(), + physical_field.data_type(), + ) + { return Some(format!( "Parquet column cannot be converted. \ Column: [{}], Expected: {}, Found: {} \ @@ -572,7 +587,8 @@ mod test { Field::new("name", DataType::Utf8, false), ])); - let result = roundtrip_with_schema_evolution(&batch, required_schema.clone(), false).await; + let result = + roundtrip_with_schema_evolution(&batch, Arc::clone(&required_schema), false).await; assert!( result.is_err(), "Expected error when schema evolution is disabled" From 0aad6999b750a8eb3247d4e30cc9f0dbfd105342 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 09:58:01 -0600 Subject: [PATCH 4/9] fix: only reject real type promotions in schema mismatch check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous check was too strict, rejecting normal Parquet-to-Spark metadata differences (timestamp units/timezones, list/map/struct field names and nullability) that the adapter already handles. Now only flag actual type promotions (Int32→Int64, Float32→Float64, etc.) while allowing the adapter to handle structural metadata differences. --- native/core/src/parquet/schema_adapter.rs | 60 +++++++++++++++++------ 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 397c9db9f9..02fd1965e8 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -95,19 +95,52 @@ fn remap_physical_schema_names( Arc::new(Schema::new(remapped_fields)) } -/// Returns true if the two types differ only in timestamp timezone metadata -/// (e.g., Timestamp(us, Some("UTC")) vs Timestamp(us, None)). This is not a -/// real schema evolution — the adapter handles it as a metadata-only relabeling. -fn is_timestamp_timezone_only_diff(logical: &DataType, physical: &DataType) -> bool { - matches!( - (logical, physical), - (DataType::Timestamp(lu, _), DataType::Timestamp(pu, _)) if lu == pu - ) +/// Returns true if the two types represent a real type promotion that constitutes +/// schema evolution (e.g., Int32→Int64, Float32→Float64). Returns false for +/// differences the adapter handles natively (timestamp timezone/unit changes, +/// list/map/struct field name or nullability differences). +fn is_type_promotion(logical: &DataType, physical: &DataType) -> bool { + use DataType::*; + match (logical, physical) { + // Same type — no promotion + (a, b) if a == b => false, + // Timestamp differences (unit, timezone) are handled by the adapter + (Timestamp(_, _), Timestamp(_, _)) => false, + // Timestamp to/from Int64 (nanosAsLong) is handled by the adapter + (Timestamp(_, _), Int64) | (Int64, Timestamp(_, _)) => false, + // Complex types: compare element types recursively, ignore field metadata + (List(l), List(p)) | (LargeList(l), LargeList(p)) => { + is_type_promotion(l.data_type(), p.data_type()) + } + (Map(lf, _), Map(pf, _)) => { + // Map entries have key and value fields + let l_entries = lf.data_type(); + let p_entries = pf.data_type(); + if let (Struct(l_fields), Struct(p_fields)) = (l_entries, p_entries) { + l_fields + .iter() + .zip(p_fields.iter()) + .any(|(lf, pf)| is_type_promotion(lf.data_type(), pf.data_type())) + } else { + true + } + } + (Struct(l_fields), Struct(p_fields)) => l_fields.iter().any(|lf| { + p_fields + .iter() + .find(|pf| pf.name() == lf.name()) + .is_some_and(|pf| is_type_promotion(lf.data_type(), pf.data_type())) + }), + // Different base scalar types — this is real type promotion + _ => true, + } } /// Check if the logical (table) schema and physical (file) schema have type -/// mismatches. Returns an error message describing the first mismatch found, -/// or None if all types match. +/// promotions that constitute schema evolution. Returns an error message for +/// the first real type mismatch found, or None if schemas are compatible. +/// Ignores differences that the adapter handles natively (timestamp timezone/unit, +/// list/map/struct field names and nullability). fn detect_schema_mismatch( logical_schema: &SchemaRef, physical_schema: &SchemaRef, @@ -126,12 +159,7 @@ fn detect_schema_mismatch( .find(|f| f.name().to_lowercase() == logical_field.name().to_lowercase()) }; if let Some(physical_field) = physical_field { - if logical_field.data_type() != physical_field.data_type() - && !is_timestamp_timezone_only_diff( - logical_field.data_type(), - physical_field.data_type(), - ) - { + if is_type_promotion(logical_field.data_type(), physical_field.data_type()) { return Some(format!( "Parquet column cannot be converted. \ Column: [{}], Expected: {}, Found: {} \ From 106f868fe12b9ca15639a0dad6a3a1ef5e155555 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 10:33:30 -0600 Subject: [PATCH 5/9] fix: allow unsigned int and FixedSizeBinary Parquet type mappings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unsigned int types (UInt8→Int16, UInt64→Decimal) and FixedSizeBinary→ Binary are Parquet physical type mappings handled by the adapter, not schema evolution. Also simplify the schema evolution test since native_datafusion now properly enforces the config. --- native/core/src/parquet/schema_adapter.rs | 6 ++++++ .../scala/org/apache/comet/parquet/ParquetReadSuite.scala | 4 +--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 02fd1965e8..c85af2aae0 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -108,6 +108,12 @@ fn is_type_promotion(logical: &DataType, physical: &DataType) -> bool { (Timestamp(_, _), Timestamp(_, _)) => false, // Timestamp to/from Int64 (nanosAsLong) is handled by the adapter (Timestamp(_, _), Int64) | (Int64, Timestamp(_, _)) => false, + // Unsigned-to-signed mappings are Parquet physical type conversions, not evolution. + // Parquet UINT_8→Spark ShortType, UINT_16→IntegerType, UINT_32→LongType, + // UINT_64→Decimal(20,0). The adapter handles these via allow_cast_unsigned_ints. + (_, UInt8 | UInt16 | UInt32 | UInt64) => false, + // FixedSizeBinary→Binary is a Parquet FIXED_LEN_BYTE_ARRAY mapping + (Binary, FixedSizeBinary(_)) | (LargeBinary, FixedSizeBinary(_)) => false, // Complex types: compare element types recursively, ignore field metadata (List(l), List(p)) | (LargeList(l), LargeList(p)) => { is_type_promotion(l.data_type(), p.data_type()) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 09a2308e35..63fa32c3ff 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -982,9 +982,7 @@ abstract class ParquetReadSuite extends CometTestBase { Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) withParquetDataFrame(data, schema = Some(readSchema)) { df => - // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' - if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL - .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { + if (enableSchemaEvolution) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect()) From 5c02fcf81b8e37f46e509a47e85e131bfd9e8f44 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 11:33:32 -0600 Subject: [PATCH 6/9] feat: convert schema mismatch errors to Spark-compatible exceptions Use the SparkError/SparkErrorConverter pattern to produce proper SparkException with error class _LEGACY_ERROR_TEMP_2063, matching Spark's native schema mismatch error format. --- native/core/src/parquet/schema_adapter.rs | 27 +++++++++---------- native/spark-expr/src/error.rs | 27 +++++++++++++++++++ .../comet/shims/ShimSparkErrorConverter.scala | 12 +++++++++ .../comet/shims/ShimSparkErrorConverter.scala | 12 +++++++++ .../comet/shims/ShimSparkErrorConverter.scala | 12 +++++++++ 5 files changed, 75 insertions(+), 15 deletions(-) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index c85af2aae0..275273ed52 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -24,7 +24,7 @@ use datafusion::physical_expr::expressions::Column; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; use datafusion::scalar::ScalarValue; -use datafusion_comet_spark_expr::{Cast, SparkCastOptions}; +use datafusion_comet_spark_expr::{Cast, SparkCastOptions, SparkError}; use datafusion_physical_expr_adapter::{ replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, @@ -143,7 +143,7 @@ fn is_type_promotion(logical: &DataType, physical: &DataType) -> bool { } /// Check if the logical (table) schema and physical (file) schema have type -/// promotions that constitute schema evolution. Returns an error message for +/// promotions that constitute schema evolution. Returns a SparkError for /// the first real type mismatch found, or None if schemas are compatible. /// Ignores differences that the adapter handles natively (timestamp timezone/unit, /// list/map/struct field names and nullability). @@ -151,7 +151,7 @@ fn detect_schema_mismatch( logical_schema: &SchemaRef, physical_schema: &SchemaRef, case_sensitive: bool, -) -> Option { +) -> Option { for logical_field in logical_schema.fields() { let physical_field = if case_sensitive { physical_schema @@ -166,14 +166,11 @@ fn detect_schema_mismatch( }; if let Some(physical_field) = physical_field { if is_type_promotion(logical_field.data_type(), physical_field.data_type()) { - return Some(format!( - "Parquet column cannot be converted. \ - Column: [{}], Expected: {}, Found: {} \ - (schema evolution is disabled)", - logical_field.name(), - logical_field.data_type(), - physical_field.data_type() - )); + return Some(SparkError::SchemaColumnConvertNotSupported { + column: logical_field.name().clone(), + logical_type: logical_field.data_type().to_string(), + physical_type: physical_field.data_type().to_string(), + }); } } } @@ -284,14 +281,14 @@ struct SparkPhysicalExprAdapter { logical_to_physical_names: Option>, /// When schema evolution is disabled and file/table types differ, this /// holds the error message to return from rewrite(). - schema_mismatch_error: Option, + schema_mismatch_error: Option, } impl PhysicalExprAdapter for SparkPhysicalExprAdapter { fn rewrite(&self, expr: Arc) -> DataFusionResult> { // When schema evolution is disabled and types differ, reject the read if let Some(err) = &self.schema_mismatch_error { - return Err(DataFusionError::Plan(err.clone())); + return Err(DataFusionError::External(Box::new(err.clone()))); } // First let the default adapter handle column remapping, missing columns, @@ -629,8 +626,8 @@ mod test { ); let err_msg = result.unwrap_err().to_string(); assert!( - err_msg.contains("schema evolution is disabled"), - "Error should mention schema evolution: {err_msg}" + err_msg.contains("Parquet column cannot be converted"), + "Error should mention column conversion: {err_msg}" ); // Same read with schema evolution enabled should succeed diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs index ae3b5c0eda..bea93c651c 100644 --- a/native/spark-expr/src/error.rs +++ b/native/spark-expr/src/error.rs @@ -166,6 +166,13 @@ pub enum SparkError { #[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more than one row.")] ScalarSubqueryTooManyRows, + #[error("[_LEGACY_ERROR_TEMP_2063] Parquet column cannot be converted. Column: [{column}], Expected: {logical_type}, Found: {physical_type}")] + SchemaColumnConvertNotSupported { + column: String, + logical_type: String, + physical_type: String, + }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -236,6 +243,7 @@ impl SparkError { SparkError::InvalidRegexGroupIndex { .. } => "InvalidRegexGroupIndex", SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder", SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", + SparkError::SchemaColumnConvertNotSupported { .. } => "SchemaColumnConvertNotSupported", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -421,6 +429,17 @@ impl SparkError { "dataType": data_type, }) } + SparkError::SchemaColumnConvertNotSupported { + column, + logical_type, + physical_type, + } => { + serde_json::json!({ + "column": column, + "logicalType": logical_type, + "physicalType": physical_type, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -487,6 +506,11 @@ impl SparkError { SparkError::DatatypeCannotOrder { .. } | SparkError::InvalidUtf8String { .. } => "org/apache/spark/SparkIllegalArgumentException", + // Schema mismatch - this becomes SparkException via QueryExecutionErrors + SparkError::SchemaColumnConvertNotSupported { .. } => { + "org/apache/spark/SparkException" + } + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -559,6 +583,9 @@ impl SparkError { // Subquery errors SparkError::ScalarSubqueryTooManyRows => Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"), + // Schema mismatch + SparkError::SchemaColumnConvertNotSupported { .. } => Some("_LEGACY_ERROR_TEMP_2063"), + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 8e6ed1a927..0c4dbeff8f 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -243,6 +243,18 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "SchemaColumnConvertNotSupported" => + val column = params.getOrElse("column", "").toString + val logicalType = params.getOrElse("logicalType", "").toString + val physicalType = params.getOrElse("physicalType", "").toString + Some( + QueryExecutionErrors.unsupportedSchemaColumnConvertError( + "unknown", + column, + logicalType, + physicalType, + null)) + case _ => None } diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 9bd8c7dba1..f5ccfc0b18 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -239,6 +239,18 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "SchemaColumnConvertNotSupported" => + val column = params.getOrElse("column", "").toString + val logicalType = params.getOrElse("logicalType", "").toString + val physicalType = params.getOrElse("physicalType", "").toString + Some( + QueryExecutionErrors.unsupportedSchemaColumnConvertError( + "unknown", + column, + logicalType, + physicalType, + null)) + case _ => None } diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index e49c789a77..716e92f039 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -251,6 +251,18 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError( context.headOption.orNull)) + case "SchemaColumnConvertNotSupported" => + val column = params.getOrElse("column", "").toString + val logicalType = params.getOrElse("logicalType", "").toString + val physicalType = params.getOrElse("physicalType", "").toString + Some( + QueryExecutionErrors.unsupportedSchemaColumnConvertError( + "unknown", + column, + logicalType, + physicalType, + null)) + case _ => // Unknown error type - return None to trigger fallback None From 2c3c51eb42d511cfa111f964e4e9ec211a8b95a0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 11:33:36 -0600 Subject: [PATCH 7/9] fix: unignore SPARK-35640 schema mismatch tests for native_datafusion The schema evolution enforcement now properly rejects type mismatches and produces Spark-compatible error messages, allowing these tests to pass with native_datafusion scan. --- dev/diffs/3.5.8.diff | 26 +++----------------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 3aaecdecb1..6e93ce800e 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644 SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index fa1a64460fc..1d2e215d6a3 100644 +index fa1a64460fc..134f0db1fb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -2184,30 +2184,10 @@ index 8e88049f51e..49f2001dc6b 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..f312174b182 100644 +index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } From ad14686b1487acfe4af45650a94363197d0596af Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 11:46:27 -0600 Subject: [PATCH 8/9] fix: use correct error class for schema mismatch in Spark 4.0 Spark 4.0 replaced unsupportedSchemaColumnConvertError with FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH error class. --- .../sql/comet/shims/ShimSparkErrorConverter.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 716e92f039..57888f1aec 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -256,12 +256,14 @@ trait ShimSparkErrorConverter { val logicalType = params.getOrElse("logicalType", "").toString val physicalType = params.getOrElse("physicalType", "").toString Some( - QueryExecutionErrors.unsupportedSchemaColumnConvertError( - "unknown", - column, - logicalType, - physicalType, - null)) + new SparkException( + errorClass = "FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH", + messageParameters = Map( + "filePath" -> "unknown", + "column" -> column, + "logicalType" -> logicalType, + "physicalType" -> physicalType), + cause = null)) case _ => // Unknown error type - return None to trigger fallback From dd6c4c2047c1e7f333f2b01453d2e7f9b37e4861 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 13:23:45 -0600 Subject: [PATCH 9/9] fix: allow standard Parquet type conversions when schema evolution is disabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The is_type_promotion function was too aggressive, treating standard Parquet type mappings as schema evolution. Spark's vectorized reader natively handles integer family casts (int32→int8/int16/int64), decimal widening, float→double, binary↔string, and int→string conversions without requiring schema evolution. Only truly incompatible conversions (e.g., string→timestamp) should be rejected. Also re-add IgnoreCometNativeDataFusion tag for SPARK-35640 int-as-long test since native_datafusion handles int→long natively (unlike Spark's non-vectorized reader which the test targets). --- dev/diffs/3.5.8.diff | 12 ++++++- native/core/src/parquet/schema_adapter.rs | 41 ++++++++++++++--------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 6e93ce800e..d4544c1544 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2187,7 +2187,17 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1075,7 +1075,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: int as long should throw schema incompatible error") { ++ test("SPARK-35640: int as long should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("native_datafusion handles int->long natively")) { + val data = (1 to 4).map(i => Tuple1(i)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) + +@@ -1345,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 275273ed52..1217f10ef6 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -96,14 +96,22 @@ fn remap_physical_schema_names( } /// Returns true if the two types represent a real type promotion that constitutes -/// schema evolution (e.g., Int32→Int64, Float32→Float64). Returns false for -/// differences the adapter handles natively (timestamp timezone/unit changes, -/// list/map/struct field name or nullability differences). +/// schema evolution — i.e., a conversion that Spark's vectorized Parquet reader +/// does NOT support natively (e.g., Binary→Timestamp). Returns false for +/// conversions the reader handles without schema evolution (integer family casts, +/// decimal widening, timestamp timezone/unit changes, etc.). fn is_type_promotion(logical: &DataType, physical: &DataType) -> bool { use DataType::*; match (logical, physical) { // Same type — no promotion (a, b) if a == b => false, + // Integer family: Spark's vectorized reader supports INT32 → byte/short/int/long + // and INT64 → long. These are standard Parquet type mappings, not schema evolution. + (Int8 | Int16 | Int32 | Int64, Int8 | Int16 | Int32 | Int64) => false, + // Float widening: Spark supports FLOAT → double + (Float64, Float32) => false, + // Decimal: Spark supports reading decimals with different precision/scale + (Decimal128(_, _), Decimal128(_, _)) | (Decimal256(_, _), Decimal256(_, _)) => false, // Timestamp differences (unit, timezone) are handled by the adapter (Timestamp(_, _), Timestamp(_, _)) => false, // Timestamp to/from Int64 (nanosAsLong) is handled by the adapter @@ -112,8 +120,14 @@ fn is_type_promotion(logical: &DataType, physical: &DataType) -> bool { // Parquet UINT_8→Spark ShortType, UINT_16→IntegerType, UINT_32→LongType, // UINT_64→Decimal(20,0). The adapter handles these via allow_cast_unsigned_ints. (_, UInt8 | UInt16 | UInt32 | UInt64) => false, - // FixedSizeBinary→Binary is a Parquet FIXED_LEN_BYTE_ARRAY mapping - (Binary, FixedSizeBinary(_)) | (LargeBinary, FixedSizeBinary(_)) => false, + // Binary/String family: Spark supports BINARY → string and vice versa + (Utf8 | LargeUtf8 | Binary | LargeBinary, Utf8 | LargeUtf8 | Binary | LargeBinary) => false, + // FixedSizeBinary→Binary/String is a Parquet FIXED_LEN_BYTE_ARRAY mapping + (Binary | LargeBinary | Utf8 | LargeUtf8, FixedSizeBinary(_)) => false, + // Integer/Float to String: Spark handles this for partition column overlap + (Utf8 | LargeUtf8, Int8 | Int16 | Int32 | Int64 | Float32 | Float64) => false, + // Date ↔ Int32: Parquet stores dates as INT32 + (Date32, Int32) | (Int32, Date32) => false, // Complex types: compare element types recursively, ignore field metadata (List(l), List(p)) | (LargeList(l), LargeList(p)) => { is_type_promotion(l.data_type(), p.data_type()) @@ -612,10 +626,14 @@ mod test { as Arc; let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names]).unwrap(); - // Read as Int64 (widening) with schema evolution disabled + // Read Utf8 as Timestamp (incompatible) with schema evolution disabled let required_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int64, false), - Field::new("name", DataType::Utf8, false), + Field::new("id", DataType::Int32, false), + Field::new( + "name", + DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), + false, + ), ])); let result = @@ -629,13 +647,6 @@ mod test { err_msg.contains("Parquet column cannot be converted"), "Error should mention column conversion: {err_msg}" ); - - // Same read with schema evolution enabled should succeed - let result = roundtrip_with_schema_evolution(&batch, required_schema, true).await; - assert!( - result.is_ok(), - "Expected success when schema evolution is enabled" - ); } /// Create a Parquet file containing a single batch and then read the batch back using