From eacc8651fa43618c76bd2a4bad3ee02e92a4c728 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 16:23:41 -0600 Subject: [PATCH 1/7] fix: detect all DPP forms in native_datafusion scan fallback Update CometNativeScan.isDynamicPruningFilter to check for DynamicPruningExpression in addition to PlanExpression. The previous check only caught dynamic DPP (with subqueries) but missed static DPP where Spark resolves the pruning expression to a literal wrapped in DynamicPruningExpression. Closes #3313 --- dev/diffs/3.5.8.diff | 32 ++++------------- .../serde/operator/CometNativeScan.scala | 10 +++--- .../apache/comet/exec/CometExecSuite.scala | 36 +++++++++++++++++++ 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 3aaecdecb1..c3449beb7e 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -396,7 +396,7 @@ index c4fb4fa943c..a04b23870a8 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..42eb9fd1cb7 100644 +index f33432ddb6f..0e1499a24ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -447,17 +447,7 @@ index f33432ddb6f..42eb9fd1cb7 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat - * Check the static scan metrics with and without DPP - */ - test("static scan metrics", -- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { -+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", - SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { -@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -467,20 +457,10 @@ index f33432ddb6f..42eb9fd1cb7 100644 } assert(scanOption.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -index a206e97c353..79813d8e259 100644 +index a206e97c353..fea1149b67d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite - } - } - -- test("explain formatted - check presence of subquery in case of DPP") { -+ test("explain formatted - check presence of subquery in case of DPP", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { - withTable("df1", "df2") { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", -@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite +@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } @@ -490,7 +470,7 @@ index a206e97c353..79813d8e259 100644 withTempDir { dir => Seq("parquet", "orc", "csv", "json").foreach { fmt => val basePath = dir.getCanonicalPath + "/" + fmt -@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite +@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } @@ -965,7 +945,7 @@ index 3cf2bfd17ab..49728c35c42 100644 SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index fa1a64460fc..1d2e215d6a3 100644 +index fa1a64460fc..134f0db1fb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index d5d075760f..799f3eb1fc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression, Literal, PlanExpression} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec @@ -57,9 +57,11 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled") } - // Native DataFusion doesn't support subqueries/dynamic pruning + // Native DataFusion doesn't support dynamic partition pruning. + // Check for both PlanExpression (dynamic DPP with subqueries) and + // DynamicPruningExpression (covers static DPP resolved to literals). if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) { - withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning") + withInfo(scanExec, "Native DataFusion scan does not support dynamic partition pruning") } if (SQLConf.get.ignoreCorruptFiles || @@ -82,7 +84,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { } private def isDynamicPruningFilter(e: Expression): Boolean = - e.exists(_.isInstanceOf[PlanExpression[_]]) + e.isInstanceOf[DynamicPruningExpression] || e.exists(_.isInstanceOf[PlanExpression[_]]) override def enabledConfig: Option[ConfigEntry[Boolean]] = None diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 0bf9bbc95b..0b291bb5f3 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -146,6 +146,42 @@ class CometExecSuite extends CometTestBase { } } + test("DPP fallback with native_datafusion scan") { + withTempDir { path => + val factPath = s"${path.getAbsolutePath}/fact.parquet" + val dimPath = s"${path.getAbsolutePath}/dim.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val one_day = 24 * 60 * 60000 + val fact = Range(0, 100) + .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) + .toDF("fact_id", "fact_date", "fact_str") + fact.write.partitionBy("fact_date").parquet(factPath) + val dim = Range(0, 10) + .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + dim.write.parquet(dimPath) + } + + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false") { + spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact") + spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim") + val df = + spark.sql( + "select * from dpp_fact join dpp_dim on fact_date = dim_date where dim_id > 7") + val (_, cometPlan) = checkSparkAnswer(df) + val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + assert( + infos.contains("dynamic partition pruning"), + s"Expected native_datafusion to fall back for DPP but got:\n$infos") + } + } + } + } + test("ShuffleQueryStageExec could be direct child node of CometBroadcastExchangeExec") { withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val table = "src" From 06c828d1ed5d9a7c310813f0abae8f3588c6dafd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 17:38:23 -0600 Subject: [PATCH 2/7] fix: revert fallback message to preserve golden file compatibility --- .../scala/org/apache/comet/serde/operator/CometNativeScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 799f3eb1fc..85fe317b9d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -61,7 +61,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { // Check for both PlanExpression (dynamic DPP with subqueries) and // DynamicPruningExpression (covers static DPP resolved to literals). if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) { - withInfo(scanExec, "Native DataFusion scan does not support dynamic partition pruning") + withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning") } if (SQLConf.get.ignoreCorruptFiles || From 85ca500972d3fde5acd2853dceb4b73d5f61f017 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 18:03:20 -0600 Subject: [PATCH 3/7] fix: update DPP test assertion to match reverted fallback message --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 0b291bb5f3..9cbfa6ae85 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -175,7 +175,7 @@ class CometExecSuite extends CometTestBase { val (_, cometPlan) = checkSparkAnswer(df) val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) assert( - infos.contains("dynamic partition pruning"), + infos.contains("subqueries/dynamic pruning"), s"Expected native_datafusion to fall back for DPP but got:\n$infos") } } From 0fcc413705441f40be2aa6308ed1aa80950ca2dc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 06:41:35 -0600 Subject: [PATCH 4/7] fix: add driver metrics and explain output to CometNativeScanExec - Add verboseStringWithOperatorId() with Location abbreviation so EXPLAIN FORMATTED shows scan metadata correctly - Share driver metrics (numFiles, filesSize, numPartitions, etc.) from the underlying CometScanExec so Spark scan metric tests pass - Add CometNativeScanExec to SparkPlanInfo metadata extraction and DPP test scan pattern matching in the Spark diff --- dev/diffs/3.5.8.diff | 15 ++++--- .../spark/sql/comet/CometNativeScanExec.scala | 40 ++++++++++++++++++- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index c3449beb7e..60550614b9 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -93,22 +93,23 @@ index 27ae10b3d59..78e69902dfd 100644 + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala -index db587dd9868..aac7295a53d 100644 +index db587dd9868..33802f29253 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -+import org.apache.spark.sql.comet.CometScanExec ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec -@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo { +@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo { // dump the file scan metadata (e.g file path) to event log val metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata + case cometScan: CometScanExec => cometScan.metadata ++ case nativeScan: CometNativeScanExec => nativeScan.metadata case _ => Map[String, String]() } new SparkPlanInfo( @@ -396,14 +397,14 @@ index c4fb4fa943c..a04b23870a8 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..0e1499a24ca 100644 +index f33432ddb6f..4acdf7e9cfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.plans.ExistenceJoin -+import org.apache.spark.sql.comet.CometScanExec ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ @@ -447,11 +448,13 @@ index f33432ddb6f..0e1499a24ca 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) + case s: CometScanExec => ++ s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid"))) ++ case s: CometNativeScanExec => + s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid"))) case _ => false } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 4e68a423ad..32523dc755 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -78,6 +78,33 @@ case class CometNativeScanExec( override val nodeName: String = s"CometNativeScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" + override def verboseStringWithOperatorId(): String = { + val metadataStr = metadata.toSeq.sorted + .filterNot { + case (_, value) if (value.isEmpty || value.equals("[]")) => true + case (key, _) if (key.equals("DataFilters") || key.equals("Format")) => true + case (_, _) => false + } + .map { + case (key, _) if (key.equals("Location")) => + val location = relation.location + val numPaths = location.rootPaths.length + val abbreviatedLocation = if (numPaths <= 1) { + location.rootPaths.mkString("[", ", ", "]") + } else { + "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]" + } + s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLocation)}" + case (key, value) => s"$key: ${redact(value)}" + } + + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Output", output)} + |${metadataStr.mkString("\n")} + |""".stripMargin + } + // exposed for testing lazy val bucketedScan: Boolean = originalPlan.bucketedScan && !disableBucketedScan @@ -202,8 +229,19 @@ case class CometNativeScanExec( override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt) + private val driverMetricKeys = + Set( + "numFiles", + "filesSize", + "numPartitions", + "metadataTime", + "staticFilesNum", + "staticFilesSize", + "pruningTime") + override lazy val metrics: Map[String, SQLMetric] = - CometMetricNode.nativeScanMetrics(session.sparkContext) + CometMetricNode.nativeScanMetrics(session.sparkContext) ++ + scan.metrics.filterKeys(driverMetricKeys) /** * See [[org.apache.spark.sql.execution.DataSourceScanExec.inputRDDs]]. Only used for tests. From 89e2235c575d254d546a14199a2395c36175f9f1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 06:45:00 -0600 Subject: [PATCH 5/7] refactor: revert unnecessary DynamicPruningExpression check The isDynamicPruningFilter check in CometNativeScan only needs to check for PlanExpression since DPP subqueries always contain one. The DynamicPruningExpression wrapper check is not needed. --- .../org/apache/comet/serde/operator/CometNativeScan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 85fe317b9d..bf3cd34e03 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression, Literal, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, PlanExpression} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec @@ -84,7 +84,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { } private def isDynamicPruningFilter(e: Expression): Boolean = - e.isInstanceOf[DynamicPruningExpression] || e.exists(_.isInstanceOf[PlanExpression[_]]) + e.exists(_.isInstanceOf[PlanExpression[_]]) override def enabledConfig: Option[ConfigEntry[Boolean]] = None From 560b6a5d6e03a933397fbb1c1b6428c60cee3cc4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 06:45:27 -0600 Subject: [PATCH 6/7] refactor: remove redundant DPP fallback test This test is now covered by the Spark SQL test suite with native_datafusion scan mode enabled. --- .../apache/comet/exec/CometExecSuite.scala | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9cbfa6ae85..0bf9bbc95b 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -146,42 +146,6 @@ class CometExecSuite extends CometTestBase { } } - test("DPP fallback with native_datafusion scan") { - withTempDir { path => - val factPath = s"${path.getAbsolutePath}/fact.parquet" - val dimPath = s"${path.getAbsolutePath}/dim.parquet" - withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { - val one_day = 24 * 60 * 60000 - val fact = Range(0, 100) - .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) - .toDF("fact_id", "fact_date", "fact_str") - fact.write.partitionBy("fact_date").parquet(factPath) - val dim = Range(0, 10) - .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) - .toDF("dim_id", "dim_date", "dim_str") - dim.write.parquet(dimPath) - } - - Seq("parquet").foreach { v1List => - withSQLConf( - SQLConf.USE_V1_SOURCE_LIST.key -> v1List, - CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, - CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false") { - spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact") - spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim") - val df = - spark.sql( - "select * from dpp_fact join dpp_dim on fact_date = dim_date where dim_id > 7") - val (_, cometPlan) = checkSparkAnswer(df) - val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) - assert( - infos.contains("subqueries/dynamic pruning"), - s"Expected native_datafusion to fall back for DPP but got:\n$infos") - } - } - } - } - test("ShuffleQueryStageExec could be direct child node of CometBroadcastExchangeExec") { withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val table = "src" From c8579aecb85a82f93ab164e6f7eb4702f2e54b8e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 06:46:55 -0600 Subject: [PATCH 7/7] refactor: revert all CometNativeScan.scala changes No changes to CometNativeScan.scala are needed for this PR. --- .../org/apache/comet/serde/operator/CometNativeScan.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index bf3cd34e03..d5d075760f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -57,9 +57,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled") } - // Native DataFusion doesn't support dynamic partition pruning. - // Check for both PlanExpression (dynamic DPP with subqueries) and - // DynamicPruningExpression (covers static DPP resolved to literals). + // Native DataFusion doesn't support subqueries/dynamic pruning if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) { withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning") }