fix: enforce schema evolution config at runtime in native_datafusion scan#3689
Closed
andygrove wants to merge 9 commits intoapache:mainfrom
Closed
fix: enforce schema evolution config at runtime in native_datafusion scan#3689andygrove wants to merge 9 commits intoapache:mainfrom
andygrove wants to merge 9 commits intoapache:mainfrom
Conversation
Pass schema_evolution_enabled config through protobuf to the native Parquet reader. When disabled, the SparkPhysicalExprAdapter rejects type mismatches between file and table schemas at runtime, matching Spark's behavior of throwing errors on incompatible types.
Timestamp timezone-only differences (e.g., Timestamp(us, "UTC") vs Timestamp(us, None)) are metadata-only relabelings handled by the existing CometCastColumnExpr, not real schema evolution.
The previous check was too strict, rejecting normal Parquet-to-Spark metadata differences (timestamp units/timezones, list/map/struct field names and nullability) that the adapter already handles. Now only flag actual type promotions (Int32→Int64, Float32→Float64, etc.) while allowing the adapter to handle structural metadata differences.
Unsigned int types (UInt8→Int16, UInt64→Decimal) and FixedSizeBinary→ Binary are Parquet physical type mappings handled by the adapter, not schema evolution. Also simplify the schema evolution test since native_datafusion now properly enforces the config.
Use the SparkError/SparkErrorConverter pattern to produce proper SparkException with error class _LEGACY_ERROR_TEMP_2063, matching Spark's native schema mismatch error format.
The schema evolution enforcement now properly rejects type mismatches and produces Spark-compatible error messages, allowing these tests to pass with native_datafusion scan.
Spark 4.0 replaced unsupportedSchemaColumnConvertError with FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH error class.
… disabled The is_type_promotion function was too aggressive, treating standard Parquet type mappings as schema evolution. Spark's vectorized reader natively handles integer family casts (int32→int8/int16/int64), decimal widening, float→double, binary↔string, and int→string conversions without requiring schema evolution. Only truly incompatible conversions (e.g., string→timestamp) should be rejected. Also re-add IgnoreCometNativeDataFusion tag for SPARK-35640 int-as-long test since native_datafusion handles int→long natively (unlike Spark's non-vectorized reader which the test targets).
Member
Author
|
I am leaning towards just documenting this is a difference |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3311.
Rationale for this change
When
spark.comet.schemaEvolution.enabledis set tofalse(the default), thenative_datafusionscan should reject Parquet files whose physical schema differs from the expected logical schema (e.g., int written as long). Previously,native_datafusionsilently allowed schema widening, producing incorrect results or confusing errors instead of the expectedSchemaColumnConvertNotSupportedException-style error that Spark produces.What changes are included in this PR?
Runtime schema mismatch detection in native code:
detect_schema_mismatch()function inschema_adapter.rsthat compares logical and physical schemas per-file at runtimeis_type_promotion()recursive function to distinguish real type promotions (Int32→Int64) from adapter-handled differences (timestamp tz/unit, list/map/struct metadata, unsigned ints, FixedSizeBinary)schema_evolution_enabledconfig flows from JVM through protobuf toSparkParquetOptionsSpark-compatible error conversion:
SchemaColumnConvertNotSupportedvariant toSparkErrorenumDataFusionError::External(SparkError)so they flow through the JSON error pathSchemaColumnConvertNotSupportedhandler inShimSparkErrorConverter(all 3 Spark versions) that callsQueryExecutionErrors.unsupportedSchemaColumnConvertError(), producing the sameSparkExceptionwith error class_LEGACY_ERROR_TEMP_2063that Spark natively producesSpark SQL test updates:
read binary as timestamp should throw schema incompatible error,int as long should throw schema incompatible error) fornative_datafusionscan since the enforcement now produces matching Spark errorsHow are these changes tested?
parquet_schema_mismatch_rejected_when_evolution_disabledvalidates that type mismatches are rejected when schema evolution is disabled and allowed when enabledParquetReadSuiteschema evolution tests validate end-to-end behavior