Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,19 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER: ConfigEntry[Boolean] =
conf("spark.comet.scan.allowDisabledParquetVectorizedReader")
.category(CATEGORY_SCAN)
.doc(
"Whether to allow Comet's native scan to replace the Parquet scan when Spark's " +
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key} is set to false. By default " +
"(false), Comet falls back to Spark in that case, because Comet's native readers " +
"mirror Spark's vectorized reader semantics rather than Spark's parquet-mr " +
"(non-vectorized) semantics, which permit silent overflow / null-on-narrowing " +
s"that Comet has no equivalent for. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
conf("spark.comet.exec.strictFloatingPoint")
.category(CATEGORY_EXEC)
Expand Down
71 changes: 1 addition & 70 deletions dev/diffs/4.0.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2877,18 +2877,9 @@ index 0acb21f3e6f..e7c65429119 100644
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
index 09ed6955a51..82924c83eb5 100644
index 09ed6955a51..236a4e99824 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}

import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.col
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
withClue(
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
Expand Down Expand Up @@ -2919,66 +2910,6 @@ index 09ed6955a51..82924c83eb5 100644
)
}
test(s"parquet widening conversion $fromType -> $toType") {
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
)
}
- test(s"unsupported parquet conversion $fromType -> $toType") {
+ test(s"unsupported parquet conversion $fromType -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
checkAllParquetReaders(values, fromType, toType, expectError = true)
}

@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
)
}
- test(s"unsupported parquet conversion $fromType -> $toType") {
+ test(s"unsupported parquet conversion $fromType -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
checkAllParquetReaders(values, fromType, toType,
expectError =
// parquet-mr allows reading decimals into a smaller precision decimal type without
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
outputTimestampType <- ParquetOutputTimestampType.values
}
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
withSQLConf(
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
}
test(
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
fromType = DecimalType(fromPrecision, 2),
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
}
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
- s"Decimal($toPrecision, $toScale)"
+ s"Decimal($toPrecision, $toScale)",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
) {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
)
}

- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
withTempDir { dir =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
writeParquetFiles(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index b8f3ea3c6f3..bbd44221288 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
Expand Down
71 changes: 1 addition & 70 deletions dev/diffs/4.1.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -3036,18 +3036,9 @@ index 56076175d60..5872d9962cc 100644
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
index 09ed6955a51..82924c83eb5 100644
index 09ed6955a51..236a4e99824 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}

import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.col
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
withClue(
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
Expand Down Expand Up @@ -3078,66 +3069,6 @@ index 09ed6955a51..82924c83eb5 100644
)
}
test(s"parquet widening conversion $fromType -> $toType") {
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
)
}
- test(s"unsupported parquet conversion $fromType -> $toType") {
+ test(s"unsupported parquet conversion $fromType -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
checkAllParquetReaders(values, fromType, toType, expectError = true)
}

@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
)
}
- test(s"unsupported parquet conversion $fromType -> $toType") {
+ test(s"unsupported parquet conversion $fromType -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
checkAllParquetReaders(values, fromType, toType,
expectError =
// parquet-mr allows reading decimals into a smaller precision decimal type without
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
outputTimestampType <- ParquetOutputTimestampType.values
}
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
withSQLConf(
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
}
test(
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
fromType = DecimalType(fromPrecision, 2),
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
}
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
- s"Decimal($toPrecision, $toScale)"
+ s"Decimal($toPrecision, $toScale)",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
) {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
)
}

- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
withTempDir { dir =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
writeParquetFiles(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
index 1cc6d3afbee..8275727fbb4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
Expand Down
6 changes: 6 additions & 0 deletions docs/source/user-guide/latest/compatibility/scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b
- Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns)
are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
matching Spark's behavior.
- `spark.sql.parquet.enableVectorizedReader=false`. Disabling the vectorized reader opts into
Spark's parquet-mr semantics (silent overflow, null-on-narrowing), which Comet's native reader
does not replicate. By default Comet falls back to Spark in this case. Set
`spark.comet.scan.allowDisabledParquetVectorizedReader=true` to opt in to running the
`native_datafusion` scan regardless. See
[#4352](https://github.com/apache/datafusion-comet/issues/4352).

The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0
without falling back to Spark:
Expand Down
13 changes: 13 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ case class CometScanRule(session: SparkSession)
s"$SCAN_NATIVE_DATAFUSION scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
return None
}
// Disabling the vectorized reader opts into parquet-mr's permissive behavior
// (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent
// backend, so by default fall back to Spark. Users can opt in to letting Comet
// replace the scan via COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.
if (!conf.parquetVectorizedReaderEnabled &&
!COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) {
withInfo(
scanExec,
s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " +
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " +
s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in")
return None
}
if (!CometNativeScan.isSupported(scanExec)) {
return None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
conf.set(CometConf.COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key, "true")
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
// SortOrder is incompatible for mixed zero and negative zero floating point values, but
Expand Down
Loading