Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e763507
chore: remove config option for native_iceberg_compat
andygrove May 11, 2026
0b664bd
chore: regenerate TPC-DS plan-stability golden files
andygrove May 11, 2026
a58efe8
Merge remote-tracking branch 'apache/main' into remove-iceberg-compat-v2
andygrove May 11, 2026
a57e8a0
Merge branch 'main' into remove-iceberg-compat
andygrove May 12, 2026
acb3bc1
Merge remote-tracking branch 'apache/main' into remove-iceberg-compat
andygrove May 14, 2026
01ab2bf
Merge branch 'main' into remove-iceberg-compat
andygrove May 17, 2026
c5247ed
Merge remote-tracking branch 'apache/main' into remove-iceberg-compat
andygrove May 18, 2026
d454495
test: restore primitive-type filter pushdown test for native_datafusion
andygrove May 18, 2026
ee1e499
Merge remote-tracking branch 'apache/main' into remove-iceberg-compat
andygrove May 18, 2026
5dae9a8
Merge remote-tracking branch 'apache/main' into remove-iceberg-compat
andygrove May 18, 2026
02226af
chore: remove COMET_NATIVE_SCAN_IMPL and related scan-impl constants
andygrove May 19, 2026
235b928
Merge remote-tracking branch 'apache/main' into followup/remove-scan-…
andygrove May 19, 2026
d03d99a
chore: remove dead native_iceberg_compat JVM Parquet reader chain
andygrove May 19, 2026
4bf5435
chore: remove dead Rust column-reader code
andygrove May 19, 2026
7f7c414
fix: lint
andygrove May 19, 2026
4c62d35
chore: update Spark diffs after scan-impl constant removal
andygrove May 19, 2026
7b725b5
chore: collapse IgnoreCometNativeDataFusion/Scan into IgnoreComet
andygrove May 19, 2026
20cdc47
chore: drop CometScanExec case from ParquetRowIndexSuite Spark diffs
andygrove May 19, 2026
2a02f3e
remove more unused methods
andygrove May 19, 2026
1cc6409
spotless
andygrove May 19, 2026
257c31c
chore: remove dead validateObjectStoreConfig path
andygrove May 19, 2026
30b9d06
chore: remove unused ParquetFilters and CometFileReaderThreadPool
andygrove May 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion benchmarks/tpc/engines/comet-hashjoin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ driver_class_path = ["$COMET_JAR"]
"spark.executor.extraClassPath" = "$COMET_JAR"
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.scan.impl" = "native_datafusion"
"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
1 change: 0 additions & 1 deletion benchmarks/tpc/engines/comet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ driver_class_path = ["$COMET_JAR"]
"spark.executor.extraClassPath" = "$COMET_JAR"
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.scan.impl" = "native_datafusion"
"spark.comet.expression.Cast.allowIncompatible" = "true"
91 changes: 27 additions & 64 deletions dev/diffs/3.4.3.diff

Large diffs are not rendered by default.

119 changes: 35 additions & 84 deletions dev/diffs/3.5.8.diff

Large diffs are not rendered by default.

99 changes: 26 additions & 73 deletions dev/diffs/4.0.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -739,10 +739,10 @@ index 9c529d14221..ab2850b5d68 100644
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
new file mode 100644
index 00000000000..5691536c114
index 00000000000..4b31bea33de
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
@@ -0,0 +1,45 @@
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -771,9 +771,6 @@ index 00000000000..5691536c114
+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
+ */
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
+
+/**
+ * Helper trait that disables Comet for all tests regardless of default config values.
Expand Down Expand Up @@ -1199,7 +1196,7 @@ index 0df7f806272..92390bd819f 100644

test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 2e33f6505ab..54f5081e10a 100644
index 2e33f6505ab..fc1a2c8f964 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
Expand All @@ -1220,7 +1217,7 @@ index 2e33f6505ab..54f5081e10a 100644
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
Expand Down Expand Up @@ -2544,14 +2541,14 @@ index cd6f41b4ef4..4b6a17344bc 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 6080a5e8e4b..f5dadef89ae 100644
index 6080a5e8e4b..23a451d5bcf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType

import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
import org.apache.spark.sql._
+import org.apache.spark.sql.IgnoreCometNativeScan
+import org.apache.spark.sql.IgnoreComet
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
Expand All @@ -2574,7 +2571,7 @@ index 6080a5e8e4b..f5dadef89ae 100644

- test("Filters should be pushed down for vectorized Parquet reader at row group level") {
+ test("Filters should be pushed down for vectorized Parquet reader at row group level",
+ IgnoreCometNativeScan("Native scans do not support the tested accumulator")) {
+ IgnoreComet("Native scans do not support the tested accumulator")) {
import testImplicits._

withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
Expand Down Expand Up @@ -2609,7 +2606,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
}

- test("filter pushdown - StringPredicate") {
+ test("filter pushdown - StringPredicate", IgnoreCometNativeScan("cannot be pushed down")) {
+ test("filter pushdown - StringPredicate", IgnoreComet("cannot be pushed down")) {
import testImplicits._
// keep() should take effect on StartsWith/EndsWith/Contains
Seq(
Expand All @@ -2619,7 +2616,7 @@ index 6080a5e8e4b..f5dadef89ae 100644

- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
+ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
+ IgnoreCometNativeScan("Comet has different push-down behavior")) {
+ IgnoreComet("Comet has different push-down behavior")) {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
Expand Down Expand Up @@ -2666,7 +2663,7 @@ index 6080a5e8e4b..f5dadef89ae 100644

- test("SPARK-34562: Bloom filter push down") {
+ test("SPARK-34562: Bloom filter push down",
+ IgnoreCometNativeScan("Native scans do not support the tested accumulator")) {
+ IgnoreComet("Native scans do not support the tested accumulator")) {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
Expand Down Expand Up @@ -2773,32 +2770,21 @@ index 30503af0fab..1491f4bc2d5 100644

import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
index 08fd8a9ecb5..27aee839b8c 100644
index 08fd8a9ecb5..306958da489 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
@@ -20,6 +20,7 @@ import java.io.File

import scala.jdk.CollectionConverters._

+import org.apache.comet.CometConf
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.ParquetProperties._
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
@@ -27,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE

import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -245,6 +247,17 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
@@ -245,6 +246,14 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
case f: FileSourceScanExec =>
numPartitions += f.inputRDD.partitions.length
numOutputRows += f.metrics("numOutputRows").value
+ case b: CometScanExec =>
+ numPartitions += b.inputRDD.partitions.length
+ numOutputRows += b.metrics("numOutputRows").value
+ case b: CometBatchScanExec =>
+ numPartitions += b.inputRDD.partitions.length
+ numOutputRows += b.metrics("numOutputRows").value
Expand All @@ -2810,16 +2796,13 @@ index 08fd8a9ecb5..27aee839b8c 100644
case _ =>
}
assert(numPartitions > 0)
@@ -303,6 +316,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
@@ -303,6 +312,9 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)

test(s"invalid row index column type - ${conf.desc}") {
+ // https://github.com/apache/datafusion-comet/issues/3886
+ // Comet throws RuntimeException instead of SparkException
+ assume(!Seq(
+ CometConf.SCAN_NATIVE_DATAFUSION,
+ CometConf.SCAN_AUTO
+ ).contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()))
+ assume(false)
withSQLConf(conf.sqlConfs: _*) {
withTempPath{ path =>
val df = spark.range(0, 10, 1, 1).toDF("id")
Expand All @@ -2844,15 +2827,15 @@ index 5c0b7def039..151184bc98c 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 0acb21f3e6f..e7c65429119 100644
index 0acb21f3e6f..15bd866d8aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type._

import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
Expand All @@ -2872,7 +2855,7 @@ index 0acb21f3e6f..e7c65429119 100644

- test("schema mismatch failure error message for parquet vectorized reader") {
+ test("schema mismatch failure error message for parquet vectorized reader",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4316")) {
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
Expand Down Expand Up @@ -3460,69 +3443,39 @@ index 86c4e49f6f6..2e639e5f38d 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index f0f3f94b811..be5e113c3ed 100644
index f0f3f94b811..b7d18771314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
import scala.util.control.NonFatal

+import org.apache.comet.CometConf
import org.apache.hadoop.fs.Path
import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits}
+import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec}
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.DataSourceUtils
@@ -121,6 +123,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
@@ -121,6 +122,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with

override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
+ // Check Comet skip tags first, before DisableAdaptiveExecution handling
+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
+ return
+ }
+ if (isCometEnabled) {
+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
+ cometScanImpl == CometConf.SCAN_AUTO
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
+ cometScanImpl == CometConf.SCAN_AUTO
+ if (isNativeIcebergCompat &&
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
+ return
+ }
+ if (isNativeDataFusion &&
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
+ return
+ }
+ if ((isNativeDataFusion || isNativeIcebergCompat) &&
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)",
+ testTags: _*)(testFun)
+ return
+ }
+ }
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
super.test(testName, testTags: _*) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
@@ -248,8 +278,15 @@ private[sql] trait SQLTestUtilsBase
@@ -248,8 +254,15 @@ private[sql] trait SQLTestUtilsBase
override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter
}

Expand All @@ -3538,7 +3491,7 @@ index f0f3f94b811..be5e113c3ed 100644
super.withSQLConf(pairs: _*)(f)
}

@@ -451,6 +488,8 @@ private[sql] trait SQLTestUtilsBase
@@ -451,6 +464,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand Down
Loading
Loading