diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 5d40bb5823..d2334d49ee 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -783,6 +783,19 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER: ConfigEntry[Boolean] = + conf("spark.comet.scan.allowDisabledParquetVectorizedReader") + .category(CATEGORY_SCAN) + .doc( + "Whether to allow Comet's native scan to replace the Parquet scan when Spark's " + + s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key} is set to false. By default " + + "(false), Comet falls back to Spark in that case, because Comet's native readers " + + "mirror Spark's vectorized reader semantics rather than Spark's parquet-mr " + + "(non-vectorized) semantics, which permit silent overflow / null-on-narrowing " + + s"that Comet has no equivalent for. $COMPAT_GUIDE.") + .booleanConf + .createWithDefault(false) + val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] = conf("spark.comet.exec.strictFloatingPoint") .category(CATEGORY_EXEC) diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 2ae5a60bff..2614abc979 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -2877,18 +2877,9 @@ index 0acb21f3e6f..e7c65429119 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..82924c83eb5 100644 +index 09ed6955a51..236a4e99824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter - import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} - - import org.apache.spark.SparkException --import org.apache.spark.sql.{DataFrame, QueryTest, Row} -+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper - import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException - import org.apache.spark.sql.functions.col @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite withClue( s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + @@ -2919,66 +2910,6 @@ index 09ed6955a51..82924c83eb5 100644 ) } test(s"parquet widening conversion $fromType -> $toType") { -@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, expectError = true) - } - -@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite - (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, - expectError = - // parquet-mr allows reading decimals into a smaller precision decimal type without -@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)) - outputTimestampType <- ParquetOutputTimestampType.values - } -- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { -+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withSQLConf( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString -@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite - Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) - } - test( -- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { -+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), - fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite - Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) - } - test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + -- s"Decimal($toPrecision, $toScale)" -+ s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352") - ) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), -@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite - ) - } - -- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") { -+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withTempDir { dir => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - writeParquetFiles( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 10c17a5cf1..2ed8a5a32f 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -3036,18 +3036,9 @@ index 56076175d60..5872d9962cc 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..82924c83eb5 100644 +index 09ed6955a51..236a4e99824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter - import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} - - import org.apache.spark.SparkException --import org.apache.spark.sql.{DataFrame, QueryTest, Row} -+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper - import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException - import org.apache.spark.sql.functions.col @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite withClue( s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + @@ -3078,66 +3069,6 @@ index 09ed6955a51..82924c83eb5 100644 ) } test(s"parquet widening conversion $fromType -> $toType") { -@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, expectError = true) - } - -@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite - (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) - ) - } -- test(s"unsupported parquet conversion $fromType -> $toType") { -+ test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders(values, fromType, toType, - expectError = - // parquet-mr allows reading decimals into a smaller precision decimal type without -@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite - (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)) - outputTimestampType <- ParquetOutputTimestampType.values - } -- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { -+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withSQLConf( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString -@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite - Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) - } - test( -- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { -+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), - fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite - Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) - } - test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + -- s"Decimal($toPrecision, $toScale)" -+ s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352") - ) { - checkAllParquetReaders( - values = Seq("1.23", "10.34"), -@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite - ) - } - -- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") { -+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { - withTempDir { dir => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - writeParquetFiles( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 1cc6d3afbee..8275727fbb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index de5e273df1..f784a4f467 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -79,6 +79,12 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b - Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns) are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. +- `spark.sql.parquet.enableVectorizedReader=false`. Disabling the vectorized reader opts into + Spark's parquet-mr semantics (silent overflow, null-on-narrowing), which Comet's native reader + does not replicate. By default Comet falls back to Spark in this case. Set + `spark.comet.scan.allowDisabledParquetVectorizedReader=true` to opt in to running the + `native_datafusion` scan regardless. See + [#4352](https://github.com/apache/datafusion-comet/issues/4352). The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0 without falling back to Spark: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 64b69be1e9..f19e280291 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -207,6 +207,19 @@ case class CometScanRule(session: SparkSession) s"$SCAN_NATIVE_DATAFUSION scan requires ${COMET_EXEC_ENABLED.key} to be enabled") return None } + // Disabling the vectorized reader opts into parquet-mr's permissive behavior + // (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent + // backend, so by default fall back to Spark. Users can opt in to letting Comet + // replace the scan via COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER. + if (!conf.parquetVectorizedReaderEnabled && + !COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) { + withInfo( + scanExec, + s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " + + s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " + + s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in") + return None + } if (!CometNativeScan.isSupported(scanExec)) { return None } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 767968b7c1..90ca585630 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -86,6 +86,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false") + conf.set(CometConf.COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key, "true") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") // SortOrder is incompatible for mixed zero and negative zero floating point values, but