Skip to content
Open
43 changes: 13 additions & 30 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,23 @@ index 27ae10b3d59..78e69902dfd 100644
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index db587dd9868..aac7295a53d 100644
index db587dd9868..33802f29253 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.comet.CometScanExec
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo {
// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
+ case cometScan: CometScanExec => cometScan.metadata
+ case nativeScan: CometNativeScanExec => nativeScan.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(
Expand Down Expand Up @@ -396,14 +397,14 @@ index c4fb4fa943c..a04b23870a8 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..42eb9fd1cb7 100644
index f33432ddb6f..4acdf7e9cfb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
+import org.apache.spark.sql.comet.CometScanExec
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
Expand Down Expand Up @@ -447,40 +448,22 @@ index f33432ddb6f..42eb9fd1cb7 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
* Check the static scan metrics with and without DPP
*/
test("static scan metrics",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
+ case s: CometScanExec =>
+ s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid")))
+ case s: CometNativeScanExec =>
+ s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid")))
case _ => false
}
assert(scanOption.isDefined)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index a206e97c353..79813d8e259 100644
index a206e97c353..fea1149b67d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}

- test("explain formatted - check presence of subquery in case of DPP") {
+ test("explain formatted - check presence of subquery in case of DPP",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) {
withTable("df1", "df2") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}

Expand All @@ -490,7 +473,7 @@ index a206e97c353..79813d8e259 100644
withTempDir { dir =>
Seq("parquet", "orc", "csv", "json").foreach { fmt =>
val basePath = dir.getCanonicalPath + "/" + fmt
@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}

Expand Down Expand Up @@ -965,7 +948,7 @@ index 3cf2bfd17ab..49728c35c42 100644
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index fa1a64460fc..1d2e215d6a3 100644
index fa1a64460fc..134f0db1fb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -17,6 +17,8 @@
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ case class CometNativeScanExec(
override val nodeName: String =
s"CometNativeScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"

override def verboseStringWithOperatorId(): String = {
val metadataStr = metadata.toSeq.sorted
.filterNot {
case (_, value) if (value.isEmpty || value.equals("[]")) => true
case (key, _) if (key.equals("DataFilters") || key.equals("Format")) => true
case (_, _) => false
}
.map {
case (key, _) if (key.equals("Location")) =>
val location = relation.location
val numPaths = location.rootPaths.length
val abbreviatedLocation = if (numPaths <= 1) {
location.rootPaths.mkString("[", ", ", "]")
} else {
"[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
}
s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLocation)}"
case (key, value) => s"$key: ${redact(value)}"
}

s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Output", output)}
|${metadataStr.mkString("\n")}
|""".stripMargin
}

// exposed for testing
lazy val bucketedScan: Boolean = originalPlan.bucketedScan && !disableBucketedScan

Expand Down Expand Up @@ -202,8 +229,19 @@ case class CometNativeScanExec(

override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt)

private val driverMetricKeys =
Set(
"numFiles",
"filesSize",
"numPartitions",
"metadataTime",
"staticFilesNum",
"staticFilesSize",
"pruningTime")

override lazy val metrics: Map[String, SQLMetric] =
CometMetricNode.nativeScanMetrics(session.sparkContext)
CometMetricNode.nativeScanMetrics(session.sparkContext) ++
scan.metrics.filterKeys(driverMetricKeys)

/**
* See [[org.apache.spark.sql.execution.DataSourceScanExec.inputRDDs]]. Only used for tests.
Expand Down
Loading