From 4bcb25cb3ebabfba8accd3e6372e981d7bd0d6df Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 16 May 2026 20:34:33 -0600 Subject: [PATCH 1/2] fix: configurable fallback when parquet vectorized reader is disabled (#4352) Comet's native_datafusion scan rejects Parquet-to-Spark conversions that Spark's vectorized reader rejects, but Spark's parquet-mr (non-vectorized) path silently overflows / nulls. Disabling spark.sql.parquet.enableVectorizedReader opts into parquet-mr semantics that Comet has no equivalent for, so by default Comet now falls back to Spark in that case. Users who want Comet to handle the scan regardless can opt in. - New config spark.comet.scan.allowDisabledParquetVectorizedReader (default false: fall back to Spark when vectorized reader is disabled). - CometScanRule.nativeDataFusionScan skips itself when the vectorized reader is disabled and the opt-in flag is false. - CometTestBase sets the flag to true so existing Comet tests continue to exercise the native scan. - Re-enables (un-ignores) the affected ParquetTypeWideningSuite tests in the 4.0.2 and 4.1.1 diffs. --- .../scala/org/apache/comet/CometConf.scala | 13 ++++ dev/diffs/4.0.2.diff | 71 +------------------ dev/diffs/4.1.1.diff | 71 +------------------ .../apache/comet/rules/CometScanRule.scala | 13 ++++ .../org/apache/spark/sql/CometTestBase.scala | 1 + 5 files changed, 29 insertions(+), 140 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 5d40bb5823..d2334d49ee 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -783,6 +783,19 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER: ConfigEntry[Boolean] = + conf("spark.comet.scan.allowDisabledParquetVectorizedReader") + .category(CATEGORY_SCAN) + .doc( + "Whether to allow Comet's native scan to replace the Parquet scan when Spark's " + + s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key} is set to false. By default " + + "(false), Comet falls back to Spark in that case, because Comet's native readers " + + "mirror Spark's vectorized reader semantics rather than Spark's parquet-mr " + + "(non-vectorized) semantics, which permit silent overflow / null-on-narrowing " + + s"that Comet has no equivalent for. $COMPAT_GUIDE.") + .booleanConf + .createWithDefault(false) + val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] = conf("spark.comet.exec.strictFloatingPoint") .category(CATEGORY_EXEC) diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 2ae5a60bff..2614abc979 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -2877,18 +2877,9 @@ index 0acb21f3e6f..e7c65429119 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..82924c83eb5 100644 +index 09ed6955a51..236a4e99824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter - import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} - - import org.apache.spark.SparkException --import org.apache.spark.sql.{DataFrame, QueryTest, Row} -+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper - import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException - import org.apache.spark.sql.functions.col @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite withClue( s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + @@ -2919,66 +2910,6 @@ index 09ed6955a51..82924c83eb5 100644 ) } test(s"parquet widening conversion $fromType -> $toType") { -@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, expectError = true) - } - -@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite - (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, - expectError = - // parquet-mr allows reading decimals into a smaller precision decimal type without -@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)) - outputTimestampType <- ParquetOutputTimestampType.values - } -- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { -+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withSQLConf( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString -@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite - Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) - } - test( -- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { -+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), - fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite - Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) - } - test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + -- s"Decimal($toPrecision, $toScale)" -+ s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352") - ) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), -@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite - ) - } - -- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") { -+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withTempDir { dir => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - writeParquetFiles( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 10c17a5cf1..2ed8a5a32f 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -3036,18 +3036,9 @@ index 56076175d60..5872d9962cc 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..82924c83eb5 100644 +index 09ed6955a51..236a4e99824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter - import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} - - import org.apache.spark.SparkException --import org.apache.spark.sql.{DataFrame, QueryTest, Row} -+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper - import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException - import org.apache.spark.sql.functions.col @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite withClue( s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + @@ -3078,66 +3069,6 @@ index 09ed6955a51..82924c83eb5 100644 ) } test(s"parquet widening conversion $fromType -> $toType") { -@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, expectError = true) - } - -@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite - (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, - expectError = - // parquet-mr allows reading decimals into a smaller precision decimal type without -@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)) - outputTimestampType <- ParquetOutputTimestampType.values - } -- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { -+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withSQLConf( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString -@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite - Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) - } - test( -- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { -+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), - fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite - Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) - } - test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + -- s"Decimal($toPrecision, $toScale)" -+ s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352") - ) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), -@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite - ) - } - -- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") { -+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withTempDir { dir => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - writeParquetFiles( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 1cc6d3afbee..8275727fbb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 64b69be1e9..f19e280291 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -207,6 +207,19 @@ case class CometScanRule(session: SparkSession) s"$SCAN_NATIVE_DATAFUSION scan requires ${COMET_EXEC_ENABLED.key} to be enabled") return None } + // Disabling the vectorized reader opts into parquet-mr's permissive behavior + // (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent + // backend, so by default fall back to Spark. Users can opt in to letting Comet + // replace the scan via COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER. + if (!conf.parquetVectorizedReaderEnabled && + !COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) { + withInfo( + scanExec, + s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " + + s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " + + s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in") + return None + } if (!CometNativeScan.isSupported(scanExec)) { return None } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 767968b7c1..90ca585630 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -86,6 +86,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false") + conf.set(CometConf.COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key, "true") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") // SortOrder is incompatible for mixed zero and negative zero floating point values, but From 1a437a3c43aadd6bde0988c550ddf9720cc59788 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 16 May 2026 20:37:59 -0600 Subject: [PATCH 2/2] docs: note parquet vectorized-reader fallback in compatibility guide --- docs/source/user-guide/latest/compatibility/scans.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index de5e273df1..f784a4f467 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -79,6 +79,12 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b - Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns) are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. +- `spark.sql.parquet.enableVectorizedReader=false`. Disabling the vectorized reader opts into + Spark's parquet-mr semantics (silent overflow, null-on-narrowing), which Comet's native reader + does not replicate. By default Comet falls back to Spark in this case. Set + `spark.comet.scan.allowDisabledParquetVectorizedReader=true` to opt in to running the + `native_datafusion` scan regardless. See + [#4352](https://github.com/apache/datafusion-comet/issues/4352). The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0 without falling back to Spark: