Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index fa1a64460fc..1d2e215d6a3 100644
index fa1a64460fc..134f0db1fb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -17,6 +17,8 @@
Expand Down Expand Up @@ -2184,30 +2184,20 @@ index 8e88049f51e..49f2001dc6b 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 8ed9ef1630e..f312174b182 100644
index 8ed9ef1630e..eed2a6f5ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -1075,7 +1075,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") {
+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
val data = (1 to 4).map(i => Tuple1(i.toString))
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))

@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}


- test("SPARK-35640: int as long should throw schema incompatible error") {
+ test("SPARK-35640: int as long should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreCometNativeDataFusion("native_datafusion handles int->long natively")) {
val data = (1 to 4).map(i => Tuple1(i))
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

@@ -1345,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ impl PhysicalPlanner {
common.case_sensitive,
self.session_ctx(),
common.encryption_enabled,
common.schema_evolution_enabled,
)?;
Ok((
vec![],
Expand Down
1 change: 1 addition & 0 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
case_sensitive != JNI_FALSE,
session_ctx,
encryption_enabled,
true, // schema_evolution_enabled (always true for iceberg_compat)
)?;

let partition_index: usize = 0;
Expand Down
4 changes: 4 additions & 0 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ pub(crate) fn init_datasource_exec(
case_sensitive: bool,
session_ctx: &Arc<SessionContext>,
encryption_enabled: bool,
schema_evolution_enabled: bool,
) -> Result<Arc<DataSourceExec>, ExecutionError> {
let (table_parquet_options, spark_parquet_options) = get_options(
session_timezone,
case_sensitive,
&object_store_url,
encryption_enabled,
schema_evolution_enabled,
);

// Determine the schema and projection to use for ParquetSource.
Expand Down Expand Up @@ -181,6 +183,7 @@ fn get_options(
case_sensitive: bool,
object_store_url: &ObjectStoreUrl,
encryption_enabled: bool,
schema_evolution_enabled: bool,
) -> (TableParquetOptions, SparkParquetOptions) {
let mut table_parquet_options = TableParquetOptions::new();
table_parquet_options.global.pushdown_filters = true;
Expand All @@ -190,6 +193,7 @@ fn get_options(
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
spark_parquet_options.allow_cast_unsigned_ints = true;
spark_parquet_options.case_sensitive = case_sensitive;
spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled;

if encryption_enabled {
table_parquet_options.crypto.configure_factory(
Expand Down
5 changes: 5 additions & 0 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ pub struct SparkParquetOptions {
pub use_legacy_date_timestamp_or_ntz: bool,
// Whether schema field names are case sensitive
pub case_sensitive: bool,
/// Whether schema evolution (type promotion) is enabled. When false, the adapter
/// should reject type mismatches between file schema and table schema.
pub schema_evolution_enabled: bool,
}

impl SparkParquetOptions {
Expand All @@ -88,6 +91,7 @@ impl SparkParquetOptions {
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
schema_evolution_enabled: true,
}
}

Expand All @@ -100,6 +104,7 @@ impl SparkParquetOptions {
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
schema_evolution_enabled: true,
}
}
}
Expand Down
166 changes: 164 additions & 2 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use crate::parquet::cast_column::CometCastColumnExpr;
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::Result as DataFusionResult;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;
use datafusion::scalar::ScalarValue;
use datafusion_comet_spark_expr::{Cast, SparkCastOptions};
use datafusion_comet_spark_expr::{Cast, SparkCastOptions, SparkError};
use datafusion_physical_expr_adapter::{
replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter,
PhysicalExprAdapterFactory,
Expand Down Expand Up @@ -95,12 +95,121 @@ fn remap_physical_schema_names(
Arc::new(Schema::new(remapped_fields))
}

/// Returns true if the two types represent a real type promotion that constitutes
/// schema evolution — i.e., a conversion that Spark's vectorized Parquet reader
/// does NOT support natively (e.g., Binary→Timestamp). Returns false for
/// conversions the reader handles without schema evolution (integer family casts,
/// decimal widening, timestamp timezone/unit changes, etc.).
fn is_type_promotion(logical: &DataType, physical: &DataType) -> bool {
use DataType::*;
match (logical, physical) {
// Same type — no promotion
(a, b) if a == b => false,
// Integer family: Spark's vectorized reader supports INT32 → byte/short/int/long
// and INT64 → long. These are standard Parquet type mappings, not schema evolution.
(Int8 | Int16 | Int32 | Int64, Int8 | Int16 | Int32 | Int64) => false,
// Float widening: Spark supports FLOAT → double
(Float64, Float32) => false,
// Decimal: Spark supports reading decimals with different precision/scale
(Decimal128(_, _), Decimal128(_, _)) | (Decimal256(_, _), Decimal256(_, _)) => false,
// Timestamp differences (unit, timezone) are handled by the adapter
(Timestamp(_, _), Timestamp(_, _)) => false,
// Timestamp to/from Int64 (nanosAsLong) is handled by the adapter
(Timestamp(_, _), Int64) | (Int64, Timestamp(_, _)) => false,
// Unsigned-to-signed mappings are Parquet physical type conversions, not evolution.
// Parquet UINT_8→Spark ShortType, UINT_16→IntegerType, UINT_32→LongType,
// UINT_64→Decimal(20,0). The adapter handles these via allow_cast_unsigned_ints.
(_, UInt8 | UInt16 | UInt32 | UInt64) => false,
// Binary/String family: Spark supports BINARY → string and vice versa
(Utf8 | LargeUtf8 | Binary | LargeBinary, Utf8 | LargeUtf8 | Binary | LargeBinary) => false,
// FixedSizeBinary→Binary/String is a Parquet FIXED_LEN_BYTE_ARRAY mapping
(Binary | LargeBinary | Utf8 | LargeUtf8, FixedSizeBinary(_)) => false,
// Integer/Float to String: Spark handles this for partition column overlap
(Utf8 | LargeUtf8, Int8 | Int16 | Int32 | Int64 | Float32 | Float64) => false,
// Date ↔ Int32: Parquet stores dates as INT32
(Date32, Int32) | (Int32, Date32) => false,
// Complex types: compare element types recursively, ignore field metadata
(List(l), List(p)) | (LargeList(l), LargeList(p)) => {
is_type_promotion(l.data_type(), p.data_type())
}
(Map(lf, _), Map(pf, _)) => {
// Map entries have key and value fields
let l_entries = lf.data_type();
let p_entries = pf.data_type();
if let (Struct(l_fields), Struct(p_fields)) = (l_entries, p_entries) {
l_fields
.iter()
.zip(p_fields.iter())
.any(|(lf, pf)| is_type_promotion(lf.data_type(), pf.data_type()))
} else {
true
}
}
(Struct(l_fields), Struct(p_fields)) => l_fields.iter().any(|lf| {
p_fields
.iter()
.find(|pf| pf.name() == lf.name())
.is_some_and(|pf| is_type_promotion(lf.data_type(), pf.data_type()))
}),
// Different base scalar types — this is real type promotion
_ => true,
}
}

/// Check if the logical (table) schema and physical (file) schema have type
/// promotions that constitute schema evolution. Returns a SparkError for
/// the first real type mismatch found, or None if schemas are compatible.
/// Ignores differences that the adapter handles natively (timestamp timezone/unit,
/// list/map/struct field names and nullability).
fn detect_schema_mismatch(
logical_schema: &SchemaRef,
physical_schema: &SchemaRef,
case_sensitive: bool,
) -> Option<SparkError> {
for logical_field in logical_schema.fields() {
let physical_field = if case_sensitive {
physical_schema
.fields()
.iter()
.find(|f| f.name() == logical_field.name())
} else {
physical_schema
.fields()
.iter()
.find(|f| f.name().to_lowercase() == logical_field.name().to_lowercase())
};
if let Some(physical_field) = physical_field {
if is_type_promotion(logical_field.data_type(), physical_field.data_type()) {
return Some(SparkError::SchemaColumnConvertNotSupported {
column: logical_field.name().clone(),
logical_type: logical_field.data_type().to_string(),
physical_type: physical_field.data_type().to_string(),
});
}
}
}
None
}

impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
// When schema evolution is disabled, check for type mismatches between the
// logical (table) schema and the physical (file) schema. If any column has
// a different type, store the error to be raised during rewrite().
let schema_mismatch_error = if !self.parquet_options.schema_evolution_enabled {
detect_schema_mismatch(
&logical_file_schema,
&physical_file_schema,
self.parquet_options.case_sensitive,
)
} else {
None
};

// When case-insensitive, remap physical schema field names to match logical
// field names. The DefaultPhysicalExprAdapter uses exact name matching, so
// without this remapping, columns like "a" won't match logical "A" and will
Expand Down Expand Up @@ -154,6 +263,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
default_values: self.default_values.clone(),
default_adapter,
logical_to_physical_names,
schema_mismatch_error,
})
}
}
Expand Down Expand Up @@ -183,10 +293,18 @@ struct SparkPhysicalExprAdapter {
/// physical names so that downstream reassign_expr_columns can find
/// columns in the actual stream schema.
logical_to_physical_names: Option<HashMap<String, String>>,
/// When schema evolution is disabled and file/table types differ, this
/// holds the error message to return from rewrite().
schema_mismatch_error: Option<SparkError>,
}

impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
// When schema evolution is disabled and types differ, reject the read
if let Some(err) = &self.schema_mismatch_error {
return Err(DataFusionError::External(Box::new(err.clone())));
}

// First let the default adapter handle column remapping, missing columns,
// and simple scalar type casts. Then replace DataFusion's CastColumnExpr
// with Spark-compatible equivalents.
Expand Down Expand Up @@ -496,11 +614,54 @@ mod test {
Ok(())
}

#[tokio::test]
async fn parquet_schema_mismatch_rejected_when_evolution_disabled() {
let file_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn arrow::array::Array>;
let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"]))
as Arc<dyn arrow::array::Array>;
let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names]).unwrap();

// Read Utf8 as Timestamp (incompatible) with schema evolution disabled
let required_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"name",
DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
false,
),
]));

let result =
roundtrip_with_schema_evolution(&batch, Arc::clone(&required_schema), false).await;
assert!(
result.is_err(),
"Expected error when schema evolution is disabled"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Parquet column cannot be converted"),
"Error should mention column conversion: {err_msg}"
);
}

/// Create a Parquet file containing a single batch and then read the batch back using
/// the specified required_schema. This will cause the PhysicalExprAdapter code to be used.
async fn roundtrip(
batch: &RecordBatch,
required_schema: SchemaRef,
) -> Result<RecordBatch, DataFusionError> {
roundtrip_with_schema_evolution(batch, required_schema, true).await
}

async fn roundtrip_with_schema_evolution(
batch: &RecordBatch,
required_schema: SchemaRef,
schema_evolution_enabled: bool,
) -> Result<RecordBatch, DataFusionError> {
let filename = get_temp_filename();
let filename = filename.as_path().as_os_str().to_str().unwrap().to_string();
Expand All @@ -513,6 +674,7 @@ mod test {

let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
spark_parquet_options.allow_cast_unsigned_ints = true;
spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled;

// Create expression adapter factory for Spark-compatible schema adaptation
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ message NativeScanCommon {
bool encryption_enabled = 11;
string source = 12;
repeated spark.spark_expression.DataType fields = 13;
bool schema_evolution_enabled = 14;
}

message NativeScan {
Expand Down
Loading
Loading