diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 3aaecdecb1..d4544c1544 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644 SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index fa1a64460fc..1d2e215d6a3 100644 +index fa1a64460fc..134f0db1fb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -2184,30 +2184,20 @@ index 8e88049f51e..49f2001dc6b 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..f312174b182 100644 +index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1075,7 +1075,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - + - test("SPARK-35640: int as long should throw schema incompatible error") { + test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("native_datafusion handles int->long natively")) { val data = (1 to 4).map(i => Tuple1(i)) val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + +@@ -1345,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 15bbabe883..7d3b6af660 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1213,6 +1213,7 @@ impl PhysicalPlanner { common.case_sensitive, self.session_ctx(), common.encryption_enabled, + common.schema_evolution_enabled, )?; Ok(( vec![], diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index f2b0e80ab2..b0588d8737 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -775,6 +775,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat case_sensitive != JNI_FALSE, session_ctx, encryption_enabled, + true, // schema_evolution_enabled (always true for iceberg_compat) )?; let partition_index: usize = 0; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 2d970734bb..37f970c5c0 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -72,12 +72,14 @@ pub(crate) fn init_datasource_exec( case_sensitive: bool, session_ctx: &Arc, encryption_enabled: bool, + schema_evolution_enabled: bool, ) -> Result, ExecutionError> { let (table_parquet_options, spark_parquet_options) = get_options( session_timezone, case_sensitive, &object_store_url, encryption_enabled, + schema_evolution_enabled, ); // Determine the schema and projection to use for ParquetSource. @@ -181,6 +183,7 @@ fn get_options( case_sensitive: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, + schema_evolution_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { let mut table_parquet_options = TableParquetOptions::new(); table_parquet_options.global.pushdown_filters = true; @@ -190,6 +193,7 @@ fn get_options( SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false); spark_parquet_options.allow_cast_unsigned_ints = true; spark_parquet_options.case_sensitive = case_sensitive; + spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index e7ff5630f1..e46d1bf233 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -76,6 +76,9 @@ pub struct SparkParquetOptions { pub use_legacy_date_timestamp_or_ntz: bool, // Whether schema field names are case sensitive pub case_sensitive: bool, + /// Whether schema evolution (type promotion) is enabled. When false, the adapter + /// should reject type mismatches between file schema and table schema. + pub schema_evolution_enabled: bool, } impl SparkParquetOptions { @@ -88,6 +91,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + schema_evolution_enabled: true, } } @@ -100,6 +104,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + schema_evolution_enabled: true, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 0ad61df426..1217f10ef6 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -19,12 +19,12 @@ use crate::parquet::cast_column::CometCastColumnExpr; use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion::common::Result as DataFusionResult; +use datafusion::common::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_expr::expressions::Column; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; use datafusion::scalar::ScalarValue; -use datafusion_comet_spark_expr::{Cast, SparkCastOptions}; +use datafusion_comet_spark_expr::{Cast, SparkCastOptions, SparkError}; use datafusion_physical_expr_adapter::{ replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, @@ -95,12 +95,121 @@ fn remap_physical_schema_names( Arc::new(Schema::new(remapped_fields)) } +/// Returns true if the two types represent a real type promotion that constitutes +/// schema evolution — i.e., a conversion that Spark's vectorized Parquet reader +/// does NOT support natively (e.g., Binary→Timestamp). Returns false for +/// conversions the reader handles without schema evolution (integer family casts, +/// decimal widening, timestamp timezone/unit changes, etc.). +fn is_type_promotion(logical: &DataType, physical: &DataType) -> bool { + use DataType::*; + match (logical, physical) { + // Same type — no promotion + (a, b) if a == b => false, + // Integer family: Spark's vectorized reader supports INT32 → byte/short/int/long + // and INT64 → long. These are standard Parquet type mappings, not schema evolution. + (Int8 | Int16 | Int32 | Int64, Int8 | Int16 | Int32 | Int64) => false, + // Float widening: Spark supports FLOAT → double + (Float64, Float32) => false, + // Decimal: Spark supports reading decimals with different precision/scale + (Decimal128(_, _), Decimal128(_, _)) | (Decimal256(_, _), Decimal256(_, _)) => false, + // Timestamp differences (unit, timezone) are handled by the adapter + (Timestamp(_, _), Timestamp(_, _)) => false, + // Timestamp to/from Int64 (nanosAsLong) is handled by the adapter + (Timestamp(_, _), Int64) | (Int64, Timestamp(_, _)) => false, + // Unsigned-to-signed mappings are Parquet physical type conversions, not evolution. + // Parquet UINT_8→Spark ShortType, UINT_16→IntegerType, UINT_32→LongType, + // UINT_64→Decimal(20,0). The adapter handles these via allow_cast_unsigned_ints. + (_, UInt8 | UInt16 | UInt32 | UInt64) => false, + // Binary/String family: Spark supports BINARY → string and vice versa + (Utf8 | LargeUtf8 | Binary | LargeBinary, Utf8 | LargeUtf8 | Binary | LargeBinary) => false, + // FixedSizeBinary→Binary/String is a Parquet FIXED_LEN_BYTE_ARRAY mapping + (Binary | LargeBinary | Utf8 | LargeUtf8, FixedSizeBinary(_)) => false, + // Integer/Float to String: Spark handles this for partition column overlap + (Utf8 | LargeUtf8, Int8 | Int16 | Int32 | Int64 | Float32 | Float64) => false, + // Date ↔ Int32: Parquet stores dates as INT32 + (Date32, Int32) | (Int32, Date32) => false, + // Complex types: compare element types recursively, ignore field metadata + (List(l), List(p)) | (LargeList(l), LargeList(p)) => { + is_type_promotion(l.data_type(), p.data_type()) + } + (Map(lf, _), Map(pf, _)) => { + // Map entries have key and value fields + let l_entries = lf.data_type(); + let p_entries = pf.data_type(); + if let (Struct(l_fields), Struct(p_fields)) = (l_entries, p_entries) { + l_fields + .iter() + .zip(p_fields.iter()) + .any(|(lf, pf)| is_type_promotion(lf.data_type(), pf.data_type())) + } else { + true + } + } + (Struct(l_fields), Struct(p_fields)) => l_fields.iter().any(|lf| { + p_fields + .iter() + .find(|pf| pf.name() == lf.name()) + .is_some_and(|pf| is_type_promotion(lf.data_type(), pf.data_type())) + }), + // Different base scalar types — this is real type promotion + _ => true, + } +} + +/// Check if the logical (table) schema and physical (file) schema have type +/// promotions that constitute schema evolution. Returns a SparkError for +/// the first real type mismatch found, or None if schemas are compatible. +/// Ignores differences that the adapter handles natively (timestamp timezone/unit, +/// list/map/struct field names and nullability). +fn detect_schema_mismatch( + logical_schema: &SchemaRef, + physical_schema: &SchemaRef, + case_sensitive: bool, +) -> Option { + for logical_field in logical_schema.fields() { + let physical_field = if case_sensitive { + physical_schema + .fields() + .iter() + .find(|f| f.name() == logical_field.name()) + } else { + physical_schema + .fields() + .iter() + .find(|f| f.name().to_lowercase() == logical_field.name().to_lowercase()) + }; + if let Some(physical_field) = physical_field { + if is_type_promotion(logical_field.data_type(), physical_field.data_type()) { + return Some(SparkError::SchemaColumnConvertNotSupported { + column: logical_field.name().clone(), + logical_type: logical_field.data_type().to_string(), + physical_type: physical_field.data_type().to_string(), + }); + } + } + } + None +} + impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { fn create( &self, logical_file_schema: SchemaRef, physical_file_schema: SchemaRef, ) -> Arc { + // When schema evolution is disabled, check for type mismatches between the + // logical (table) schema and the physical (file) schema. If any column has + // a different type, store the error to be raised during rewrite(). + let schema_mismatch_error = if !self.parquet_options.schema_evolution_enabled { + detect_schema_mismatch( + &logical_file_schema, + &physical_file_schema, + self.parquet_options.case_sensitive, + ) + } else { + None + }; + // When case-insensitive, remap physical schema field names to match logical // field names. The DefaultPhysicalExprAdapter uses exact name matching, so // without this remapping, columns like "a" won't match logical "A" and will @@ -154,6 +263,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { default_values: self.default_values.clone(), default_adapter, logical_to_physical_names, + schema_mismatch_error, }) } } @@ -183,10 +293,18 @@ struct SparkPhysicalExprAdapter { /// physical names so that downstream reassign_expr_columns can find /// columns in the actual stream schema. logical_to_physical_names: Option>, + /// When schema evolution is disabled and file/table types differ, this + /// holds the error message to return from rewrite(). + schema_mismatch_error: Option, } impl PhysicalExprAdapter for SparkPhysicalExprAdapter { fn rewrite(&self, expr: Arc) -> DataFusionResult> { + // When schema evolution is disabled and types differ, reject the read + if let Some(err) = &self.schema_mismatch_error { + return Err(DataFusionError::External(Box::new(err.clone()))); + } + // First let the default adapter handle column remapping, missing columns, // and simple scalar type casts. Then replace DataFusion's CastColumnExpr // with Spark-compatible equivalents. @@ -496,11 +614,54 @@ mod test { Ok(()) } + #[tokio::test] + async fn parquet_schema_mismatch_rejected_when_evolution_disabled() { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) + as Arc; + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names]).unwrap(); + + // Read Utf8 as Timestamp (incompatible) with schema evolution disabled + let required_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "name", + DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), + false, + ), + ])); + + let result = + roundtrip_with_schema_evolution(&batch, Arc::clone(&required_schema), false).await; + assert!( + result.is_err(), + "Expected error when schema evolution is disabled" + ); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Parquet column cannot be converted"), + "Error should mention column conversion: {err_msg}" + ); + } + /// Create a Parquet file containing a single batch and then read the batch back using /// the specified required_schema. This will cause the PhysicalExprAdapter code to be used. async fn roundtrip( batch: &RecordBatch, required_schema: SchemaRef, + ) -> Result { + roundtrip_with_schema_evolution(batch, required_schema, true).await + } + + async fn roundtrip_with_schema_evolution( + batch: &RecordBatch, + required_schema: SchemaRef, + schema_evolution_enabled: bool, ) -> Result { let filename = get_temp_filename(); let filename = filename.as_path().as_os_str().to_str().unwrap().to_string(); @@ -513,6 +674,7 @@ mod test { let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); spark_parquet_options.allow_cast_unsigned_ints = true; + spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled; // Create expression adapter factory for Spark-compatible schema adaptation let expr_adapter_factory: Arc = Arc::new( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 4afc1fefb7..655f5eb498 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -100,6 +100,7 @@ message NativeScanCommon { bool encryption_enabled = 11; string source = 12; repeated spark.spark_expression.DataType fields = 13; + bool schema_evolution_enabled = 14; } message NativeScan { diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs index ae3b5c0eda..bea93c651c 100644 --- a/native/spark-expr/src/error.rs +++ b/native/spark-expr/src/error.rs @@ -166,6 +166,13 @@ pub enum SparkError { #[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more than one row.")] ScalarSubqueryTooManyRows, + #[error("[_LEGACY_ERROR_TEMP_2063] Parquet column cannot be converted. Column: [{column}], Expected: {logical_type}, Found: {physical_type}")] + SchemaColumnConvertNotSupported { + column: String, + logical_type: String, + physical_type: String, + }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -236,6 +243,7 @@ impl SparkError { SparkError::InvalidRegexGroupIndex { .. } => "InvalidRegexGroupIndex", SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder", SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", + SparkError::SchemaColumnConvertNotSupported { .. } => "SchemaColumnConvertNotSupported", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -421,6 +429,17 @@ impl SparkError { "dataType": data_type, }) } + SparkError::SchemaColumnConvertNotSupported { + column, + logical_type, + physical_type, + } => { + serde_json::json!({ + "column": column, + "logicalType": logical_type, + "physicalType": physical_type, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -487,6 +506,11 @@ impl SparkError { SparkError::DatatypeCannotOrder { .. } | SparkError::InvalidUtf8String { .. } => "org/apache/spark/SparkIllegalArgumentException", + // Schema mismatch - this becomes SparkException via QueryExecutionErrors + SparkError::SchemaColumnConvertNotSupported { .. } => { + "org/apache/spark/SparkException" + } + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -559,6 +583,9 @@ impl SparkError { // Subquery errors SparkError::ScalarSubqueryTooManyRows => Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"), + // Schema mismatch + SparkError::SchemaColumnConvertNotSupported { .. } => Some("_LEGACY_ERROR_TEMP_2063"), + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index d5d075760f..9354a37d33 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -174,6 +174,8 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava) commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) + commonBuilder.setSchemaEvolutionEnabled( + CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get(scan.conf)) // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 8e6ed1a927..0c4dbeff8f 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -243,6 +243,18 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "SchemaColumnConvertNotSupported" => + val column = params.getOrElse("column", "").toString + val logicalType = params.getOrElse("logicalType", "").toString + val physicalType = params.getOrElse("physicalType", "").toString + Some( + QueryExecutionErrors.unsupportedSchemaColumnConvertError( + "unknown", + column, + logicalType, + physicalType, + null)) + case _ => None } diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 9bd8c7dba1..f5ccfc0b18 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -239,6 +239,18 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "SchemaColumnConvertNotSupported" => + val column = params.getOrElse("column", "").toString + val logicalType = params.getOrElse("logicalType", "").toString + val physicalType = params.getOrElse("physicalType", "").toString + Some( + QueryExecutionErrors.unsupportedSchemaColumnConvertError( + "unknown", + column, + logicalType, + physicalType, + null)) + case _ => None } diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index e49c789a77..57888f1aec 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -251,6 +251,20 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError( context.headOption.orNull)) + case "SchemaColumnConvertNotSupported" => + val column = params.getOrElse("column", "").toString + val logicalType = params.getOrElse("logicalType", "").toString + val physicalType = params.getOrElse("physicalType", "").toString + Some( + new SparkException( + errorClass = "FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH", + messageParameters = Map( + "filePath" -> "unknown", + "column" -> column, + "logicalType" -> logicalType, + "physicalType" -> physicalType), + cause = null)) + case _ => // Unknown error type - return None to trigger fallback None diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 09a2308e35..63fa32c3ff 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -982,9 +982,7 @@ abstract class ParquetReadSuite extends CometTestBase { Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) withParquetDataFrame(data, schema = Some(readSchema)) { df => - // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' - if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL - .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { + if (enableSchemaEvolution) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect())