diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 568b33e758..3ed7d9ce18 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1986,7 +1986,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..49f2001dc6b 100644 +index 8e88049f51e..6150a556f9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2075,17 +2075,24 @@ index 8e88049f51e..49f2001dc6b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - } - } +@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + val e = intercept[SparkException] { + sql(s"select a from $tableName where b > 0").collect() + } +- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( +- """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) ++ assert(e.getCause.isInstanceOf[RuntimeException]) ++ val msg = e.getCause.getMessage ++ // native_datafusion produces a different error message for duplicate fields ++ assert( ++ msg.contains( ++ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") || ++ msg.contains("Unable to get field named"), ++ s"Unexpected error message: $msg") + } -- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { -+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - withTempPath { dir => - val count = 10 - val tableName = "spark_25207" -@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2095,7 +2102,7 @@ index 8e88049f51e..49f2001dc6b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2105,7 +2112,7 @@ index 8e88049f51e..49f2001dc6b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2118,7 +2125,7 @@ index 8e88049f51e..49f2001dc6b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index c8e960a15e..2a10bb111d 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -62,6 +62,10 @@ cause Comet to fall back to Spark. - No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` +- No support for duplicate field names in case-insensitive mode. When the required or data schema contains + field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Note that duplicates + in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time, + so DataFusion may produce a different error message than Spark in that case. The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results without falling back to Spark: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 698b68777a..c004d77283 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -222,6 +222,22 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") return None } + // Case-insensitive mode with duplicate field names produces different errors + // in DataFusion vs Spark, so fall back to avoid incompatible error messages + if (!session.sessionState.conf.caseSensitiveAnalysis) { + val schemas = Seq(scanExec.requiredSchema, r.dataSchema) + for (schema <- schemas) { + val fieldNames = + schema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT)) + if (fieldNames.length != fieldNames.distinct.length) { + withInfo( + scanExec, + "Native DataFusion scan does not support " + + "duplicate field names in case-insensitive mode") + return None + } + } + } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None }