diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index f8385f41b6..bb5f33fa68 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -128,7 +128,7 @@ public void close() { protected void initNative() { LOG.debug("initializing the native column reader"); - DataType readType = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() ? type : null; + DataType readType = CometConf.COMET_SCHEMA_EVOLUTION_ENABLED() ? type : null; boolean useLegacyDateTimestampOrNTZ = useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$; nativeHandle = diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index 87cecdc65d..eaa1ecb3ce 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -130,7 +130,7 @@ public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkT PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); + boolean allowTypePromotion = CometConf.COMET_SCHEMA_EVOLUTION_ENABLED(); if (sparkType instanceof NullType) { return; @@ -150,8 +150,8 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 32)) { // fallbacks. We read them as long values. return; } else if (sparkType == DataTypes.LongType && allowTypePromotion) { - // In Comet we allow schema evolution from int to long, if - // `spark.comet.schemaEvolution.enabled` is enabled. + // INT32 -> LONG widening is allowed when Comet's per-Spark-version + // type-promotion default permits it (Spark 4.x). See ShimCometConf. return; } else if (sparkType == DataTypes.ByteType || sparkType == DataTypes.ShortType) { return; @@ -198,8 +198,8 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 64)) { break; case FLOAT: if (sparkType == DataTypes.FloatType) return; - // In Comet we allow schema evolution from float to double, if - // `spark.comet.schemaEvolution.enabled` is enabled. + // FLOAT -> DOUBLE widening is allowed when Comet's per-Spark-version + // type-promotion default permits it (Spark 4.x). See ShimCometConf. if (sparkType == DataTypes.DoubleType && allowTypePromotion) return; break; case DOUBLE: diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9b376837f7..5d40bb5823 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -727,16 +727,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) - val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] = - conf("spark.comet.schemaEvolution.enabled") - .internal() - .category(CATEGORY_SCAN) - .doc("Whether to enable schema evolution in Comet. For instance, promoting a integer " + - "column to a long column, a float column to a double column, etc. This is automatically" + - "enabled when reading from Iceberg tables.") - .booleanConf - .createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT) - val COMET_ENABLE_PARTIAL_HASH_AGGREGATE: ConfigEntry[Boolean] = conf("spark.comet.testing.aggregate.partialMode.enabled") .internal() diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala b/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala index 6147c18ddb..a113893c84 100644 --- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala +++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala @@ -20,5 +20,12 @@ package org.apache.comet.shims trait ShimCometConf { - protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = false + + /** + * Whether Comet's Parquet scan paths allow widening type promotions (e.g. INT32 → INT64, FLOAT + * → DOUBLE). Spark 3.x's vectorized reader rejects these on read, so Comet matches by + * defaulting to false on 3.x. Reads from the deprecated `spark.comet.schemaEvolution.enabled` + * SQL conf were removed in favor of this per-version constant; see #4298. + */ + val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = false } diff --git a/common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala b/common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala index 0eb57c52b4..e89b37d604 100644 --- a/common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala +++ b/common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala @@ -20,5 +20,12 @@ package org.apache.comet.shims trait ShimCometConf { - protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true + + /** + * Whether Comet's Parquet scan paths allow widening type promotions (e.g. INT32 → INT64, FLOAT + * → DOUBLE, INT32 → DOUBLE). Spark 4.x's vectorized reader accepts these by default. Reads from + * the deprecated `spark.comet.schemaEvolution.enabled` SQL conf were removed in favor of this + * per-version constant; see #4298. + */ + val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = true } diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index ebe53f49dd..79a945add3 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index d3544881af1..d075572c5b3 100644 +index d3544881af1..1126f287096 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -1969,7 +1969,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..b8af360fa14 100644 +index 104b4e416cd..4adb273170a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType @@ -2121,28 +2121,10 @@ index 104b4e416cd..b8af360fa14 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..9411af57a26 100644 +index 8670d95c65e..b624c3811dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} - - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} - import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} - import org.apache.spark.sql.catalyst.util.DateTimeUtils -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1335,7 +1337,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2153,7 +2135,7 @@ index 8670d95c65e..9411af57a26 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index 29cb224c878..ee5a87fa200 100644 +index 29cb224c878..dcb8a0e9bef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2190,7 +2172,7 @@ index 29cb224c878..ee5a87fa200 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2220,7 +2202,7 @@ index 29cb224c878..ee5a87fa200 100644 - test("row group skipping doesn't overflow when reading into larger type") { + test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -2309,7 +2291,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index bf5c51b89bb..dc3aac281c3 100644 +index bf5c51b89bb..7e143a0e0f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2336,7 +2318,7 @@ index bf5c51b89bb..dc3aac281c3 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) @@ -2882,7 +2864,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..99bc018008a 100644 +index dd55fcfe42c..cd18a23d4de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration._ @@ -2948,7 +2930,7 @@ index dd55fcfe42c..99bc018008a 100644 protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { SparkSession.setActiveSession(spark) super.withSQLConf(pairs: _*)(f) -@@ -434,6 +487,8 @@ private[sql] trait SQLTestUtilsBase +@@ -434,6 +469,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child @@ -2958,7 +2940,7 @@ index dd55fcfe42c..99bc018008a 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..a5ea58146ad 100644 +index ed2e309fa07..25b798d2c1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -74,6 +74,20 @@ trait SharedSparkSessionBase @@ -3071,7 +3053,7 @@ index a902cb3a69e..800a3acbe99 100644 test("SPARK-4963 DataFrame sample on mutable row return wrong result") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -index 07361cfdce9..97dab2a3506 100644 +index 07361cfdce9..4fdbcd18656 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -55,25 +55,41 @@ object TestHive diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 76ed210d31..a72e44fc4f 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index edd2ad57880..d5273840330 100644 +index edd2ad57880..15a0947abf4 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,8 @@ @@ -1958,7 +1958,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..20d7ef7b1bc 100644 +index 8e88049f51e..097c518a19a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2104,20 +2104,10 @@ index 8e88049f51e..20d7ef7b1bc 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..71e22972a47 100644 +index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1075,7 +1075,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1345,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2128,7 +2118,7 @@ index 8ed9ef1630e..71e22972a47 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..5ea2d938664 100644 +index f6472ba3d9d..b62ff2975e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2157,7 +2147,7 @@ index f6472ba3d9d..5ea2d938664 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2187,7 +2177,7 @@ index f6472ba3d9d..5ea2d938664 100644 - test("row group skipping doesn't overflow when reading into larger type") { + test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -2276,7 +2266,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 3f47c5e506f..2ac0868407e 100644 +index 3f47c5e506f..80b7ef6c46a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2303,7 +2293,7 @@ index 3f47c5e506f..2ac0868407e 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) @@ -2834,7 +2824,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index e937173a590..7d20538bc68 100644 +index e937173a590..3134078a122 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration._ @@ -2900,7 +2890,7 @@ index e937173a590..7d20538bc68 100644 protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { SparkSession.setActiveSession(spark) super.withSQLConf(pairs: _*)(f) -@@ -435,6 +488,8 @@ private[sql] trait SQLTestUtilsBase +@@ -435,6 +470,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child @@ -2910,7 +2900,7 @@ index e937173a590..7d20538bc68 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..a5ea58146ad 100644 +index ed2e309fa07..25b798d2c1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -74,6 +74,20 @@ trait SharedSparkSessionBase @@ -3023,7 +3013,7 @@ index 6160c3e5f6c..0956d7d9edc 100644 test("SPARK-4963 DataFrame sample on mutable row return wrong result") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -index 1d646f40b3e..5babe505301 100644 +index 1d646f40b3e..df108c17c42 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -53,25 +53,41 @@ object TestHive diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 34deaa5825..2ae5a60bff 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -39,7 +39,7 @@ index 6c51bd4ff2e..e72ec1d26e2 100644 withSpark(sc) { sc => TestUtils.waitUntilExecutorsUp(sc, 2, 60000) diff --git a/pom.xml b/pom.xml -index 252cfdf9073..cc878eb3cd9 100644 +index 252cfdf9073..64e899efe6b 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -2544,7 +2544,7 @@ index cd6f41b4ef4..4b6a17344bc 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 6080a5e8e4b..ea058d57b4b 100644 +index 6080a5e8e4b..f5dadef89ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType @@ -2697,28 +2697,10 @@ index 6080a5e8e4b..ea058d57b4b 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 4474ec1fd42..05fa0257c82 100644 +index 4474ec1fd42..97910c4fc3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} - - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.InternalRow - import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} - import org.apache.spark.sql.catalyst.util.DateTimeUtils -@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2729,18 +2711,10 @@ index 4474ec1fd42..05fa0257c82 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..5a111a937a9 100644 +index bba71f1c48d..38c60ee2584 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat - - import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} - import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow - import org.apache.spark.sql.catalyst.util.ArrayData -@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2753,17 +2727,7 @@ index bba71f1c48d..5a111a937a9 100644 } } } -@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") - } - -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } -@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2773,7 +2737,7 @@ index bba71f1c48d..5a111a937a9 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2783,16 +2747,6 @@ index bba71f1c48d..5a111a937a9 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1139,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -2890,7 +2844,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 0acb21f3e6f..8d60dfb686d 100644 +index 0acb21f3e6f..e7c65429119 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2918,12 +2872,12 @@ index 0acb21f3e6f..8d60dfb686d 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..5cd856ff7b6 100644 +index 09ed6955a51..82924c83eb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -2971,7 +2925,7 @@ index 09ed6955a51..5cd856ff7b6 100644 } - test(s"unsupported parquet conversion $fromType -> $toType") { + test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { checkAllParquetReaders(values, fromType, toType, expectError = true) } @@ -2981,7 +2935,7 @@ index 09ed6955a51..5cd856ff7b6 100644 } - test(s"unsupported parquet conversion $fromType -> $toType") { + test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { checkAllParquetReaders(values, fromType, toType, expectError = // parquet-mr allows reading decimals into a smaller precision decimal type without @@ -2991,7 +2945,7 @@ index 09ed6955a51..5cd856ff7b6 100644 } - test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { + test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { withSQLConf( SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString @@ -3001,7 +2955,7 @@ index 09ed6955a51..5cd856ff7b6 100644 test( - s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { checkAllParquetReaders( values = Seq("1.23", "10.34"), fromType = DecimalType(fromPrecision, 2), @@ -3011,10 +2965,20 @@ index 09ed6955a51..5cd856ff7b6 100644 test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + - s"Decimal($toPrecision, $toScale)" + s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720") ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352") ) { checkAllParquetReaders( values = Seq("1.23", "10.34"), +@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite + ) + } + +- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") { ++ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { + withTempDir { dir => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + writeParquetFiles( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -3565,7 +3529,7 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..f77b54dcef9 100644 +index f0f3f94b811..be5e113c3ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ @@ -3643,7 +3607,7 @@ index f0f3f94b811..f77b54dcef9 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +497,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +488,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child @@ -3653,7 +3617,7 @@ index f0f3f94b811..f77b54dcef9 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index 245219c1756..a611836f086 100644 +index 245219c1756..b566f970ccd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -75,6 +75,21 @@ trait SharedSparkSessionBase @@ -3796,7 +3760,7 @@ index b67370f6eb9..746b3974b29 100644 override def beforeEach(): Unit = { super.beforeEach() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -index a394d0b7393..a4bc3d3fd8e 100644 +index a394d0b7393..3e1f0404a37 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -53,24 +53,34 @@ object TestHive diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index f685e6146a..10c17a5cf1 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -2838,14 +2838,14 @@ index 6b73cc8618d..81a58f43784 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 3072657a095..6b5b9103363 100644 +index 3072657a095..599d169cf8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils} @@ -2859,17 +2859,7 @@ index 3072657a095..6b5b9103363 100644 val data = Seq( Tuple1((null, null)), Tuple1((null, null)), -@@ -1282,7 +1284,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1567,7 +1570,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1567,7 +1569,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2880,18 +2870,10 @@ index 3072657a095..6b5b9103363 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index c530dc0d3df..7e1dd663873 100644 +index c530dc0d3df..d23069689da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat - - import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} - import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow - import org.apache.spark.sql.catalyst.util.ArrayData -@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2904,17 +2886,7 @@ index c530dc0d3df..7e1dd663873 100644 } } } -@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") - } - -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } -@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2924,7 +2896,7 @@ index c530dc0d3df..7e1dd663873 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2934,16 +2906,6 @@ index c530dc0d3df..7e1dd663873 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1139,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -3041,7 +3003,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 56076175d60..78c4a9755c0 100644 +index 56076175d60..5872d9962cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -3069,22 +3031,12 @@ index 56076175d60..78c4a9755c0 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) -@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..5cd856ff7b6 100644 +index 09ed6955a51..82924c83eb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -3132,7 +3084,7 @@ index 09ed6955a51..5cd856ff7b6 100644 } - test(s"unsupported parquet conversion $fromType -> $toType") { + test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { checkAllParquetReaders(values, fromType, toType, expectError = true) } @@ -3142,7 +3094,7 @@ index 09ed6955a51..5cd856ff7b6 100644 } - test(s"unsupported parquet conversion $fromType -> $toType") { + test(s"unsupported parquet conversion $fromType -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { checkAllParquetReaders(values, fromType, toType, expectError = // parquet-mr allows reading decimals into a smaller precision decimal type without @@ -3152,7 +3104,7 @@ index 09ed6955a51..5cd856ff7b6 100644 } - test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { + test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { withSQLConf( SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString @@ -3162,7 +3114,7 @@ index 09ed6955a51..5cd856ff7b6 100644 test( - s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { checkAllParquetReaders( values = Seq("1.23", "10.34"), fromType = DecimalType(fromPrecision, 2), @@ -3172,10 +3124,20 @@ index 09ed6955a51..5cd856ff7b6 100644 test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + - s"Decimal($toPrecision, $toScale)" + s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720") ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352") ) { checkAllParquetReaders( values = Seq("1.23", "10.34"), +@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite + ) + } + +- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") { ++ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) { + withTempDir { dir => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + writeParquetFiles( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 1cc6d3afbee..8275727fbb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index d713dd4e8f..de5e273df1 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -91,10 +91,45 @@ without falling back to Spark: ([SPARK-47447](https://issues.apache.org/jira/browse/SPARK-47447)) and Comet matches Spark's behavior. See [#4219](https://github.com/apache/datafusion-comet/issues/4219). -- Unsupported Parquet type conversions. Spark raises schema incompatibility errors for certain conversions - (e.g., reading INT32 as BIGINT, reading BINARY as TIMESTAMP, unsupported decimal precision changes). - The `native_datafusion` scan may not detect these mismatches and could return unexpected values instead - of raising an error. See [#3720](https://github.com/apache/datafusion-comet/issues/3720). +### Schema Mismatch Handling + +The issues in this subsection apply only when the requested read schema differs from the schema written +to the Parquet file. They do **not** affect a plain `spark.read.parquet(path)` that infers the schema +from file metadata, because in that case the requested schema and file schema match by construction. +Schema mismatch happens in two real-world scenarios: + +1. The user provides an explicit read schema: `spark.read.schema().parquet(path)` (or the + equivalent DataFrame API). +2. **Schema evolution / partitioned reads** where files in a single dataset were written at different + times with different types, or a table-format catalog (Iceberg, Delta) records a logical schema + that has evolved past one or more underlying Parquet files. Spark coerces the file types to the + table types at read time. + +Spark's vectorized Parquet reader fully validates these conversions in `ParquetVectorUpdaterFactory.getUpdater` +and throws `SchemaColumnConvertNotSupportedException` for unsupported pairs. `native_datafusion` mirrors +that validation in its schema adapter; the entries below are the remaining gaps. + +Note that the exact set of accepted conversions has changed between Spark versions +(for example, Spark 3.x's `schemaEvolution.enabled` flag gates `INT32 → INT64`, `FLOAT → DOUBLE`, +and `INT32 → DOUBLE` widening that Spark 4.0+ accepts unconditionally; `TimestampLTZ → TimestampNTZ` +is rejected by Spark 3.x but accepted by Spark 4.0+). Comet aims to follow the per-version Spark +behavior. + +- **`ParquetSchemaConvert` errors do not include the file path**. The mismatch itself is detected and + rejected correctly, but the resulting Spark error message reads + `Encountered error while reading file . Data type mismatches…` (note the empty path). Behavior is + consistent across Spark versions. See + [#4316](https://github.com/apache/datafusion-comet/issues/4316). +- **Spark 3.x: extra `SparkException` layer in the cause chain**. The native error is translated to a + `SparkException` whose cause is `SchemaColumnConvertNotSupportedException` (matching what Spark would + throw); on Spark 3.x the executor / task error handling re-wraps this once more on the way back to + the driver, producing a two-level chain (`SparkException → SparkException → +SchemaColumnConvertNotSupportedException`) instead of the one-level chain Spark's own vectorized + reader produces. Code that catches `SparkException` and inspects only the immediate cause via + `e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]` will see the inner + `SparkException` instead. Walk the cause chain to recover the + `SchemaColumnConvertNotSupportedException`. Spark 4.0+ produces a single-level chain, matching + vanilla Spark. See [#4354](https://github.com/apache/datafusion-comet/issues/4354). ## `native_iceberg_compat` Limitations diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b00f140026..debe47ba04 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1372,6 +1372,7 @@ impl PhysicalPlanner { common.session_timezone.as_str(), common.case_sensitive, common.return_null_struct_if_all_fields_missing, + common.allow_type_promotion, self.session_ctx(), common.encryption_enabled, common.use_field_id, diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 5de14aa610..c522b3a14e 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -513,6 +513,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat session_timezone.as_str(), case_sensitive != JNI_FALSE, return_null_struct_if_all_fields_missing != JNI_FALSE, + true, // allow_type_promotion: JVM side already validated via TypeUtil.checkParquetType session_ctx, encryption_enabled, // The iceberg-compat path resolves IDs in the JVM via NativeBatchReader, diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 38a0755658..5379c02058 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -70,6 +70,7 @@ pub(crate) fn init_datasource_exec( session_timezone: &str, case_sensitive: bool, return_null_struct_if_all_fields_missing: bool, + allow_type_promotion: bool, session_ctx: &Arc, encryption_enabled: bool, use_field_id: bool, @@ -79,6 +80,7 @@ pub(crate) fn init_datasource_exec( session_timezone, case_sensitive, return_null_struct_if_all_fields_missing, + allow_type_promotion, &object_store_url, encryption_enabled, ); @@ -197,6 +199,7 @@ fn get_options( session_timezone: &str, case_sensitive: bool, return_null_struct_if_all_fields_missing: bool, + allow_type_promotion: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { @@ -210,6 +213,7 @@ fn get_options( spark_parquet_options.case_sensitive = case_sensitive; spark_parquet_options.return_null_struct_if_all_fields_missing = return_null_struct_if_all_fields_missing; + spark_parquet_options.allow_type_promotion = allow_type_promotion; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 4a48aaca28..0e0e7c2a6e 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -93,6 +93,9 @@ pub struct SparkParquetOptions { /// requested schema does carry ids raises a runtime error rather than silently /// producing nulls (mirrors `spark.sql.parquet.fieldId.read.ignoreMissing`). pub ignore_missing_field_id: bool, + /// Whether type promotion (schema evolution) is allowed, e.g. INT32 -> INT64, + /// FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled. + pub allow_type_promotion: bool, } impl SparkParquetOptions { @@ -108,6 +111,7 @@ impl SparkParquetOptions { return_null_struct_if_all_fields_missing: true, use_field_id: false, ignore_missing_field_id: false, + allow_type_promotion: false, } } @@ -123,6 +127,7 @@ impl SparkParquetOptions { return_null_struct_if_all_fields_missing: true, use_field_id: false, ignore_missing_field_id: false, + allow_type_promotion: false, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 40cea642ac..9a348e5b62 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -17,7 +17,9 @@ use crate::parquet::cast_column::CometCastColumnExpr; use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; +use arrow::array::new_empty_array; use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::common::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_expr::expressions::Column; @@ -31,7 +33,10 @@ use datafusion_physical_expr_adapter::{ PhysicalExprAdapterFactory, }; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use std::any::Any; use std::collections::HashMap; +use std::fmt::{self, Display}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; /// Factory for creating Spark-compatible physical expression adapters. @@ -232,6 +237,102 @@ fn remap_physical_schema( Ok((Arc::new(Schema::new(remapped_fields)), name_map)) } +/// Format an Arrow `DataType` as Spark's catalog string (e.g. `Int64` -> `bigint`), +/// so SchemaColumnConvertNotSupportedException messages match Spark's vectorized reader. +fn spark_catalog_name(dt: &DataType) -> String { + match dt { + DataType::Boolean => "boolean".to_string(), + DataType::Int8 => "tinyint".to_string(), + DataType::Int16 => "smallint".to_string(), + DataType::Int32 => "int".to_string(), + DataType::Int64 => "bigint".to_string(), + DataType::Float32 => "float".to_string(), + DataType::Float64 => "double".to_string(), + DataType::Utf8 | DataType::LargeUtf8 => "string".to_string(), + DataType::Binary | DataType::LargeBinary => "binary".to_string(), + DataType::Date32 => "date".to_string(), + DataType::Timestamp(_, Some(_)) => "timestamp".to_string(), + DataType::Timestamp(_, None) => "timestamp_ntz".to_string(), + DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => { + format!("decimal({p},{s})") + } + _ => "unknown".to_string(), + } +} + +/// Format an Arrow `DataType` as the Parquet primitive type name +/// (e.g. `Int64` -> `INT64`, matching `PrimitiveTypeName.toString()` in parquet-mr). +fn parquet_primitive_name(dt: &DataType) -> &'static str { + match dt { + DataType::Boolean => "BOOLEAN", + DataType::Int8 | DataType::Int16 | DataType::Int32 => "INT32", + DataType::Int64 => "INT64", + DataType::Float32 => "FLOAT", + DataType::Float64 => "DOUBLE", + DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => "BINARY", + // Spark stores DATE as INT32 with a DATE logical-type annotation. + DataType::Date32 => "INT32", + // Spark stores TIMESTAMP as INT64 with a timestamp annotation, or as + // INT96 (legacy nanos). arrow-rs surfaces both as `Timestamp`; without + // the original physical name we report INT64, which matches the + // common case. + DataType::Timestamp(_, _) => "INT64", + // Mirror Spark's `SparkToParquetSchemaConverter` decimal mapping: + // precision 1-9 -> INT32, 10-18 -> INT64, 19+ -> FIXED_LEN_BYTE_ARRAY. + DataType::Decimal128(p, _) | DataType::Decimal256(p, _) => { + if *p <= 9 { + "INT32" + } else if *p <= 18 { + "INT64" + } else { + "FIXED_LEN_BYTE_ARRAY" + } + } + _ => "UNKNOWN", + } +} + +fn is_string_or_binary(dt: &DataType) -> bool { + matches!( + dt, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary + ) +} + +/// Build a Spark-shaped `SchemaColumnConvertNotSupportedException` carrier for a +/// rejected Parquet -> Spark conversion. The bracketed column wrapping mirrors +/// `Arrays.toString(descriptor.getPath())` in Spark's vectorized reader. +fn parquet_schema_convert_err( + field_name: &str, + physical_type: &DataType, + target_type: &DataType, +) -> DataFusionError { + DataFusionError::External(Box::new(SparkError::ParquetSchemaConvert { + file_path: String::new(), + column: format!("[{}]", field_name), + physical_type: parquet_primitive_name(physical_type).to_string(), + spark_type: spark_catalog_name(target_type), + })) +} + +/// Build a `RejectOnNonEmpty` expr wrapping `child`. The rejection fires only +/// when the input batch is non-empty (mirrors Spark's per-row-group check). +fn reject_on_non_empty_expr( + child: Arc, + target_field: &FieldRef, + field_name: &str, + physical_type: &DataType, + target_type: &DataType, +) -> Arc { + Arc::new(RejectOnNonEmpty { + child, + target_field: Arc::clone(target_field), + column: format!("[{}]", field_name), + physical_type: parquet_primitive_name(physical_type).to_string(), + spark_type: spark_catalog_name(target_type), + }) +} + /// Check if a specific column name has duplicate matches in the physical schema /// (case-insensitive). Returns the error info if so. fn check_column_duplicate(col_name: &str, physical_schema: &SchemaRef) -> Option<(String, String)> { @@ -487,6 +588,21 @@ impl SparkPhysicalExprAdapter { }; if logical_field.data_type() != physical_field.data_type() { + // Mirror the same string/binary -> non-string/binary rejection in + // `replace_with_spark_cast`; this branch is reached when the default + // adapter rejected the cast and we'd otherwise build a CometCastColumnExpr + // that can't actually convert (e.g. BINARY -> DECIMAL with no + // `DecimalLogicalTypeAnnotation`). See #4088 and #4351. + let physical_type = physical_field.data_type(); + let target_type = logical_field.data_type(); + if is_string_or_binary(physical_type) && !is_string_or_binary(target_type) { + return Err(parquet_schema_convert_err( + physical_field.name(), + physical_type, + target_type, + )); + } + let cast_expr: Arc = Arc::new( CometCastColumnExpr::new( remapped, @@ -522,101 +638,180 @@ impl SparkPhysicalExprAdapter { let physical_type = cast.input_field().data_type(); let target_type = cast.target_field().data_type(); - // Reject reading a string/binary Parquet column as anything other - // than string, binary, or a binary-encoded decimal. This mirrors - // Spark's TypeUtil.checkParquetType for the BINARY case (lines - // 208-221): a BINARY (or UTF8-annotated BINARY) physical column is - // only readable as StringType, BinaryType, or a binary-encoded - // decimal; every other target type (numeric, boolean, date, - // timestamp, ...) raises SchemaColumnConvertNotSupportedException. - // - // Without this guard, Spark's Cast below (in is_adapting_schema - // mode) falls through to DataFusion's cast, which silently parses - // the bytes (returning nulls for non-numeric strings, parsing - // date/timestamp/boolean strings, or in some paths reinterpreting - // raw bytes). See issue #4088. - if matches!( + // Reject reading a string/binary Parquet column as anything else. Spark's + // `ParquetVectorUpdaterFactory.getUpdater` BINARY case allows StringType / + // BinaryType, or DecimalType only when the column carries a + // `DecimalLogicalTypeAnnotation` (which arrow-rs surfaces as `Decimal128`, + // not `Binary`). Without this guard, runtime cast paths silently return + // nulls, parse strings, or surface as a generic Arrow type-mismatch error. + // See #4088 and #4351. + if is_string_or_binary(physical_type) && !is_string_or_binary(target_type) { + return Err(parquet_schema_convert_err( + cast.input_field().name(), + physical_type, + target_type, + )); + } + + // Reject reading a primitive numeric Parquet column as StringType / + // BinaryType. Spark has no `int -> string` etc. updater. Defer to + // runtime via `RejectOnNonEmpty` so empty Parquet files (SPARK-26709) + // pass and the JVM shim translates to + // `SchemaColumnConvertNotSupportedException`. + let physical_is_primitive_numeric = matches!( physical_type, - DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary - ) && !matches!( - target_type, - DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Binary - | DataType::LargeBinary - | DataType::Decimal128(_, _) - | DataType::Decimal256(_, _) - ) { - return Err(DataFusionError::External(Box::new( - SparkError::ParquetSchemaConvert { - file_path: String::new(), - column: cast.input_field().name().to_string(), - physical_type: physical_type.to_string(), - spark_type: target_type.to_string(), - }, - ))); + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + ); + if physical_is_primitive_numeric && is_string_or_binary(target_type) { + let rejection = reject_on_non_empty_expr( + child, + cast.target_field(), + cast.input_field().name(), + physical_type, + target_type, + ); + return Ok(Transformed::yes(rejection)); } - // Decimal-to-decimal scale-narrowing check. - // Reject reads where the read schema has a smaller scale than the - // file's, because Spark's Cast below would silently truncate - // fractional digits, producing wrong values. This matches the - // unconditionally-lossy case in issue #4089 (e.g. Decimal(10,2) read - // as Decimal(5,0)). - // - // Other decimal mismatches are intentionally NOT rejected here, - // even though Spark's vectorized reader would reject them via - // `ParquetVectorUpdaterFactory#isDecimalTypeMatched` (which requires - // exact precision and scale): - // - // - Precision-only changes with the same scale (e.g. Decimal(5,2) - // read as Decimal(3,2)): Spark 4.0's parquet-mr fallback path - // (PARQUET_VECTORIZED_READER_ENABLED=false) and the vectorized - // type-widening path produce null on per-value overflow, which - // DataFusion's cast already does in the adapting-schema path. - // - // - Scale widening (e.g. Decimal(10,2) read as Decimal(10,4)): the - // cast is lossless (no truncation, no overflow), so allowing it - // here is strictly more permissive than Spark's vectorized reader - // without risking wrong values. - if let (DataType::Decimal128(_src_p, src_s), DataType::Decimal128(_dst_p, dst_s)) = + // Decimal-to-decimal narrowing. Spark's `isDecimalTypeMatched` (the + // `DecimalLogicalTypeAnnotation` branch) allows the read only when + // `dst_scale >= src_scale` AND + // `dst_precision - dst_scale >= src_precision - src_scale`. + // Either failure means silently dropping fractional digits or losing + // integer-side magnitude. See #4089 and #4343. + if let (DataType::Decimal128(src_p, src_s), DataType::Decimal128(dst_p, dst_s)) = (physical_type, target_type) { - if dst_s < src_s { - return Err(DataFusionError::External(Box::new( - SparkError::ParquetSchemaConvert { - file_path: String::new(), - column: cast.input_field().name().to_string(), - physical_type: physical_type.to_string(), - spark_type: target_type.to_string(), - }, - ))); + let src_int_precision = i32::from(*src_p) - i32::from(*src_s); + let dst_int_precision = i32::from(*dst_p) - i32::from(*dst_s); + if dst_s < src_s || dst_int_precision < src_int_precision { + return Err(parquet_schema_convert_err( + cast.input_field().name(), + physical_type, + target_type, + )); } } - // For complex nested types (Struct, List, Map), Timestamp timezone - // mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr - // with spark_parquet_convert which handles field-name-based selection, - // reordering, nested type casting, metadata-only timestamp timezone - // relabeling, and raw value reinterpretation correctly. + // Integer-to-decimal narrowing. Spark's `canReadAsDecimal` requires + // `precision - scale >= 10` for an INT32 source and `>= 20` for INT64. + // Unconditional in all Spark versions, so reject at plan time. See #4344. + let int_decimal_min_int_precision = match physical_type { + DataType::Int8 | DataType::Int16 | DataType::Int32 => Some(10i32), + DataType::Int64 => Some(20i32), + _ => None, + }; + if let Some(min_int_precision) = int_decimal_min_int_precision { + let dst_precision_scale = match target_type { + DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => Some((*p, *s)), + _ => None, + }; + if let Some((dst_p, dst_s)) = dst_precision_scale { + let dst_int_precision = i32::from(dst_p) - i32::from(dst_s); + if dst_int_precision < min_int_precision { + return Err(parquet_schema_convert_err( + cast.input_field().name(), + physical_type, + target_type, + )); + } + } + } + + // Type promotion (widening). When `allow_type_promotion` is false, + // reject the three widenings (INT32→INT64, FLOAT→DOUBLE, INT32→DOUBLE) + // that Spark 3.x's vectorized reader rejects. The flag tracks Comet's + // per-Spark-version constant in ShimCometConf. Deferred to runtime so + // empty files (SPARK-26709) pass. + if !self.parquet_options.allow_type_promotion { + let is_disallowed_promotion = matches!( + (physical_type, target_type), + (DataType::Int32, DataType::Int64) + | (DataType::Float32, DataType::Float64) + | (DataType::Int32, DataType::Float64) + ); + if is_disallowed_promotion { + let rejection = reject_on_non_empty_expr( + Arc::clone(&child), + cast.target_field(), + cast.input_field().name(), + physical_type, + target_type, + ); + return Ok(Transformed::yes(rejection)); + } + } + + // Reject primitive Parquet conversions Spark's vectorized reader rejects + // on every supported version (no matching branch in + // `ParquetVectorUpdaterFactory.getUpdater`): // - // Timestamp mismatches (e.g., Timestamp(us, None) -> Timestamp(us, Some("UTC"))) - // occur when INT96 Parquet timestamps are coerced to Timestamp(us, None) by - // DataFusion but the logical schema expects Timestamp(us, Some("UTC")). - // Using Spark's Cast here would incorrectly treat the None-timezone values as - // local time (TimestampNTZ) and apply a timezone conversion, but the values are - // already in UTC. spark_parquet_convert handles this as a metadata-only change. + // - `INT64 -> Int*` truncates lower bits. + // - `INT64 -> Float*` and `INT32 -> Float32` lose precision. + // - `Float* -> Int*` and `Float64 -> Float32` truncate / overflow. + // - `INT32 -> Timestamp` / `INT64 -> Date32` / `INT64 -> Timestamp`: + // date/timestamp-annotated columns surface as Date32 / Timestamp, + // so reaching this branch means the column was un-annotated. + // - `Date32 -> Timestamp(LTZ)`: Spark only allows Date -> TimestampNTZ. + // - `Timestamp -> Date32`: no Timestamp updater branches into Date. // - // Timestamp→Int64 occurs when Spark's `nanosAsLong` config converts - // TIMESTAMP(NANOS) to LongType. Spark's Cast would divide by MICROS_PER_SECOND - // (assuming microseconds), but the values are nanoseconds. Arrow cast correctly - // reinterprets the raw i64 value without conversion. - // Reject scalar/complex mismatches at planning time. Spark's - // vectorized reader rejects e.g. reading a TIMESTAMP column as - // ARRAY with SchemaColumnConvertNotSupportedException - // (see SPARK-45604). Without this guard the runtime cast would - // raise a less-specific Arrow CastError. Same-complex-type pairs - // and timestamp→timestamp / timestamp→int64 are handled below. + // Deferred to runtime (SPARK-26709). See #4297. + let is_spark_rejected_conversion = matches!( + (physical_type, target_type), + // Long -> narrower int. + ( + DataType::Int64, + DataType::Int8 | DataType::Int16 | DataType::Int32, + ) + // Long -> floating point. + | (DataType::Int64, DataType::Float32 | DataType::Float64) + // Long -> date / timestamp (raw INT64; annotated columns surface as Date32/Timestamp). + | (DataType::Int64, DataType::Date32) + | (DataType::Int64, DataType::Timestamp(_, _)) + // Int -> float (DoubleType is allowed via IntegerToDoubleUpdater; FloatType is not). + | ( + DataType::Int8 | DataType::Int16 | DataType::Int32, + DataType::Float32, + ) + // Int -> timestamp (raw INT32; DATE-annotated columns surface as Date32). + | ( + DataType::Int8 | DataType::Int16 | DataType::Int32, + DataType::Timestamp(_, _), + ) + // Float -> int / Double -> int (no integer branches under FLOAT/DOUBLE). + | ( + DataType::Float32 | DataType::Float64, + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64, + ) + // Double -> float (narrowing). + | (DataType::Float64, DataType::Float32) + // Date -> Timestamp(LTZ). Spark allows Date -> TimestampNTZ only. + | (DataType::Date32, DataType::Timestamp(_, Some(_))) + // Timestamp -> Date. + | (DataType::Timestamp(_, _), DataType::Date32) + ); + if is_spark_rejected_conversion { + let rejection = reject_on_non_empty_expr( + child, + cast.target_field(), + cast.input_field().name(), + physical_type, + target_type, + ); + return Ok(Transformed::yes(rejection)); + } + + // Scalar/complex mismatch (e.g. TIMESTAMP read as ARRAY): + // Spark's vectorized reader rejects with + // SchemaColumnConvertNotSupportedException (SPARK-45604). Same-shape + // complex pairs and timestamp→timestamp / timestamp→int64 fall through + // to CometCastColumnExpr below. let is_complex = |t: &DataType| { matches!( t, @@ -624,16 +819,18 @@ impl SparkPhysicalExprAdapter { ) }; if is_complex(physical_type) != is_complex(target_type) { - return Err(DataFusionError::External(Box::new( - SparkError::ParquetSchemaConvert { - file_path: String::new(), - column: cast.input_field().name().to_string(), - physical_type: physical_type.to_string(), - spark_type: target_type.to_string(), - }, - ))); + return Err(parquet_schema_convert_err( + cast.input_field().name(), + physical_type, + target_type, + )); } + // Same-shape complex casts, timestamp tz relabel (e.g. Timestamp(us, None) + // -> Timestamp(us, Some("UTC")) for INT96 reads), and Timestamp -> Int64 + // (Spark's `nanosAsLong`) need spark_parquet_convert: it handles nested + // field selection, metadata-only tz changes, and raw-value reinterpretation + // that Spark's Cast would otherwise convert incorrectly. if matches!( (physical_type, target_type), (DataType::Struct(_), DataType::Struct(_)) @@ -750,12 +947,119 @@ impl SparkPhysicalExprAdapter { } } +/// Defers a Parquet type-promotion rejection to runtime: returns an empty array +/// when the input batch has no rows, and raises `ParquetSchemaConvert` otherwise. +/// +/// Mirrors Spark's vectorized reader, which only invokes +/// `ParquetVectorUpdaterFactory.getUpdater` while decoding a row group. A +/// Parquet file with no row groups (e.g. one written from an empty DataFrame) +/// never triggers the per-row-group check, so a partition mixing such a file +/// with another whose schema would otherwise fail the type-promotion check +/// (SPARK-26709) is still readable. +#[derive(Debug, Eq)] +struct RejectOnNonEmpty { + child: Arc, + target_field: FieldRef, + column: String, + physical_type: String, + spark_type: String, +} + +impl PartialEq for RejectOnNonEmpty { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + && self.target_field.eq(&other.target_field) + && self.column == other.column + && self.physical_type == other.physical_type + && self.spark_type == other.spark_type + } +} + +impl Hash for RejectOnNonEmpty { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.target_field.hash(state); + self.column.hash(state); + self.physical_type.hash(state); + self.spark_type.hash(state); + } +} + +impl Display for RejectOnNonEmpty { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "REJECT_PARQUET_TYPE_PROMOTION({} AS {})", + self.column, self.spark_type + ) + } +} + +impl PhysicalExpr for RejectOnNonEmpty { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _input_schema: &Schema) -> DataFusionResult { + Ok(self.target_field.data_type().clone()) + } + + fn nullable(&self, _input_schema: &Schema) -> DataFusionResult { + Ok(self.target_field.is_nullable()) + } + + fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { + if batch.num_rows() == 0 { + return Ok(ColumnarValue::Array(new_empty_array( + self.target_field.data_type(), + ))); + } + Err(DataFusionError::External(Box::new( + SparkError::ParquetSchemaConvert { + file_path: String::new(), + column: self.column.clone(), + physical_type: self.physical_type.clone(), + spark_type: self.spark_type.clone(), + }, + ))) + } + + fn return_field(&self, _input_schema: &Schema) -> DataFusionResult { + Ok(Arc::clone(&self.target_field)) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> DataFusionResult> { + assert_eq!(children.len(), 1); + Ok(Arc::new(RejectOnNonEmpty { + child: children.pop().expect("child"), + target_field: Arc::clone(&self.target_field), + column: self.column.clone(), + physical_type: self.physical_type.clone(), + spark_type: self.spark_type.clone(), + })) + } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} + #[cfg(test)] mod test { use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use arrow::array::UInt32Array; - use arrow::array::{Int32Array, StringArray}; + use arrow::array::{ + BinaryArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int32Array, + Int64Array, StringArray, TimestampMicrosecondArray, + }; use arrow::datatypes::SchemaRef; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; @@ -774,25 +1078,542 @@ mod test { use std::fs::File; use std::sync::Arc; + /// Reading a non-BINARY Parquet column as `StringType` must raise the same + /// `_LEGACY_ERROR_TEMP_2063`-shaped error as Spark's vectorized reader + /// (`ParquetVectorUpdaterFactory.getUpdater` has no INT32 -> string updater). #[tokio::test] - async fn parquet_roundtrip_int_as_string() -> Result<(), DataFusionError> { - let file_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - ])); + async fn parquet_int_read_as_string_errors() -> Result<(), DataFusionError> { + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Utf8, + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: string") + && msg.contains("Found: INT32"), + "unexpected error: {msg}" + ); + Ok(()) + } - let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; - let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) - as Arc; - let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names])?; + /// Companion: BINARY (string physical) read as IntegerType must raise the + /// same Spark-compatible error. + #[tokio::test] + async fn parquet_string_read_as_int_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(StringArray::from(vec!["bcd", "efg"])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Utf8, false), + values, + DataType::Int32, + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: int") + && msg.contains("Found: BINARY"), + "unexpected error: {msg}" + ); + Ok(()) + } - let required_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Utf8, false), - Field::new("name", DataType::Utf8, false), - ])); + /// Reading a plain BINARY Parquet column (no `DecimalLogicalTypeAnnotation`) + /// as `DecimalType` must raise a Spark-compatible `ParquetSchemaConvert` + /// error. Spark's `canReadAsDecimal` / `canReadAsBinaryDecimal` both require + /// the column to carry a `DecimalLogicalTypeAnnotation`. See #4351. + #[tokio::test] + async fn parquet_binary_read_as_decimal_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(BinaryArray::from_vec(vec![b"1.2", b"3.4"])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Binary, false), + values, + DataType::Decimal128(37, 1), + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: decimal(37,1)") + && msg.contains("Found: BINARY"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// INT32 -> Decimal where `precision - scale < 10` (the minimum that can + /// represent the full INT32 range). See #4344. + #[tokio::test] + async fn parquet_int32_read_as_narrow_decimal_errors() -> Result<(), DataFusionError> { + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Decimal128(9, 0), + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: decimal") + && msg.contains("Found: INT32"), + "unexpected error: {msg}" + ); + Ok(()) + } + /// INT32 -> Decimal where `precision - scale >= 10` is allowed. + #[tokio::test] + async fn parquet_int32_read_as_wide_decimal_succeeds() -> Result<(), DataFusionError> { + let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![values])?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(10, 0), + false, + )])); let _ = roundtrip(&batch, required_schema).await?; + Ok(()) + } + + /// INT64 -> Decimal where `precision - scale < 20`. See #4344. + #[tokio::test] + async fn parquet_int64_read_as_narrow_decimal_errors() -> Result<(), DataFusionError> { + let values = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Decimal128(19, 0), + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: decimal") + && msg.contains("Found: INT64"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// Non-zero scale that pushes `precision - scale` below the integer minimum + /// (INT32 -> Decimal(10, 1) leaves int-precision 9). + #[tokio::test] + async fn parquet_int32_read_as_decimal_with_scale_errors() -> Result<(), DataFusionError> { + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Decimal128(10, 1), + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: decimal") + && msg.contains("Found: INT32"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// Helper to build a tiny decimal Parquet batch for the decimal-to-decimal tests. + fn decimal_batch(precision: u8, scale: i8) -> Result { + let file_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(precision, scale), + false, + )])); + let values = Arc::new( + Decimal128Array::from(vec![123i128, 456]) + .with_precision_and_scale(precision, scale) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?, + ) as Arc; + Ok(RecordBatch::try_new(file_schema, vec![values])?) + } + + /// Reading Decimal(P, S) as Decimal(P', S) where P' < P (precision-only + /// narrowing, equal scale) must raise the Spark-compatible error. Spark's + /// `isDecimalTypeMatched` rejects this because `precisionIncrease < 0` + /// while `scaleIncrease == 0`. See #4343. + #[tokio::test] + async fn parquet_decimal_precision_narrowing_errors() -> Result<(), DataFusionError> { + let batch = decimal_batch(10, 2)?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(5, 2), + false, + )])); + + let err = roundtrip(&batch, required_schema) + .await + .expect_err("expected ParquetSchemaConvert for Decimal(10, 2) -> Decimal(5, 2)"); + let msg = err.to_string(); + assert!( + msg.contains("Column: [[a]]") && msg.contains("Expected: decimal(5,2)"), + "unexpected error: {msg}" + ); + Ok(()) + } + /// Reading Decimal(P, S) as Decimal(P', S') where the integer-precision + /// `P - S` shrinks must raise the Spark-compatible error. Example: + /// Decimal(10, 4) (int-precision 6) -> Decimal(5, 2) (int-precision 3). + /// See #4343. + #[tokio::test] + async fn parquet_decimal_int_precision_narrowing_errors() -> Result<(), DataFusionError> { + let batch = decimal_batch(10, 4)?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(5, 2), + false, + )])); + + let err = roundtrip(&batch, required_schema) + .await + .expect_err("expected ParquetSchemaConvert for Decimal(10, 4) -> Decimal(5, 2)"); + let msg = err.to_string(); + assert!(msg.contains("Column: [[a]]"), "unexpected error: {msg}"); + Ok(()) + } + + /// Reading Decimal(P, S) as Decimal(P, S') where S' > S but `P - S` did + /// not grow means the cast would shift integer digits into the fractional + /// part and lose the most-significant digit. Example: Decimal(5, 2) -> + /// Decimal(5, 3): scaleIncrease=1, precisionIncrease=0. See #4343. + #[tokio::test] + async fn parquet_decimal_scale_widening_without_precision_errors() -> Result<(), DataFusionError> + { + let batch = decimal_batch(5, 2)?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(5, 3), + false, + )])); + + let err = roundtrip(&batch, required_schema) + .await + .expect_err("expected ParquetSchemaConvert for Decimal(5, 2) -> Decimal(5, 3)"); + let msg = err.to_string(); + assert!(msg.contains("Column: [[a]]"), "unexpected error: {msg}"); + Ok(()) + } + + /// Sanity check: widening both precision and scale by the same amount is + /// allowed (the cast is lossless). Decimal(5, 2) -> Decimal(7, 4) gives + /// scaleIncrease=2, precisionIncrease=2, so `precisionIncrease >= scaleIncrease`. + #[tokio::test] + async fn parquet_decimal_widening_succeeds() -> Result<(), DataFusionError> { + let batch = decimal_batch(5, 2)?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(7, 4), + false, + )])); + + let _ = roundtrip(&batch, required_schema).await?; + Ok(()) + } + + /// Helper for the #4297 rejection tests: write a 1-row batch and assert + /// that reading it under `read_type` raises `ParquetSchemaConvert`. + async fn assert_rejected_conversion( + file_field: Field, + values: Arc, + read_type: DataType, + ) -> Result { + let file_schema = Arc::new(Schema::new(vec![file_field])); + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![values])?; + let read_field_name = file_schema.field(0).name(); + let required_schema = Arc::new(Schema::new(vec![Field::new( + read_field_name, + read_type, + false, + )])); + let err = roundtrip(&batch, required_schema) + .await + .expect_err("expected ParquetSchemaConvert"); + Ok(err.to_string()) + } + + /// `INT64 -> INT32` truncates to the lower 32 bits in DataFusion's cast. + /// Spark's vectorized reader rejects this. See #4297. + #[tokio::test] + async fn parquet_long_read_as_int_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(Int64Array::from(vec![1i64, 1 << 33])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Int32, + ) + .await?; + assert!( + msg.contains("Found: INT64") && msg.contains("Expected: int"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `INT64 -> Float64` loses precision for large values; Spark rejects. + #[tokio::test] + async fn parquet_long_read_as_double_errors() -> Result<(), DataFusionError> { + let values = Arc::new(Int64Array::from(vec![1i64, (1i64 << 54) + 1])) + as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Float64, + ) + .await?; + assert!( + msg.contains("Found: INT64") && msg.contains("Expected: double"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Float64 -> Float32` overflows / loses precision; Spark rejects. + #[tokio::test] + async fn parquet_double_read_as_float_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(Float64Array::from(vec![1.5_f64, 1e40])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Float64, false), + values, + DataType::Float32, + ) + .await?; + assert!( + msg.contains("Found: DOUBLE") && msg.contains("Expected: float"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Float32 -> Int64` truncates the fractional part; Spark rejects. + #[tokio::test] + async fn parquet_float_read_as_long_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(Float32Array::from(vec![1.5_f32, 2.5])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Float32, false), + values, + DataType::Int64, + ) + .await?; + assert!( + msg.contains("Found: FLOAT") && msg.contains("Expected: bigint"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Float64 -> Int64` similarly. + #[tokio::test] + async fn parquet_double_read_as_long_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(Float64Array::from(vec![1.5_f64, 2.5])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Float64, false), + values, + DataType::Int64, + ) + .await?; + assert!( + msg.contains("Found: DOUBLE") && msg.contains("Expected: bigint"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Int32 -> Float32` loses precision for values past `2^24`. Spark + /// allows `Int32 -> Float64` but rejects `Int32 -> Float32`. + #[tokio::test] + async fn parquet_int_read_as_float_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(Int32Array::from(vec![1, (1 << 25) + 1])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Float32, + ) + .await?; + assert!( + msg.contains("Found: INT32") && msg.contains("Expected: float"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Int32 -> Timestamp(_, None)`: raw INT32 reinterpreted as epoch seconds + /// produces dates near the Unix epoch. Only DATE-annotated INT32 columns + /// (which surface as `Date32`) are allowed to read as `TimestampNTZ`. + #[tokio::test] + async fn parquet_int_read_as_timestamp_ntz_errors() -> Result<(), DataFusionError> { + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), + ) + .await?; + assert!( + msg.contains("Found: INT32") && msg.contains("Expected: timestamp"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Int64 -> Date32` similarly: raw INT64 (no DATE annotation, otherwise + /// the file would surface as `Date32`). + #[tokio::test] + async fn parquet_long_read_as_date_errors() -> Result<(), DataFusionError> { + let values = Arc::new(Int64Array::from(vec![1i64, 2])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Date32, + ) + .await?; + assert!( + msg.contains("Found: INT64") && msg.contains("Expected: date"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Date32 -> Timestamp(_, Some(_))` (LTZ). Spark's vectorized reader + /// allows `Date -> TimestampNTZ` but not `Date -> Timestamp(LTZ)`. + #[tokio::test] + async fn parquet_date_read_as_ltz_timestamp_errors() -> Result<(), DataFusionError> { + let values = + Arc::new(Date32Array::from(vec![18262, 18263])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Date32, false), + values, + DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some("UTC".into())), + ) + .await?; + assert!( + msg.contains("Found: INT32") && msg.contains("Expected: timestamp"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Timestamp(_, _) -> Date32`: no Timestamp updater branches into + /// `DateType`, so Spark rejects. + #[tokio::test] + async fn parquet_timestamp_read_as_date_errors() -> Result<(), DataFusionError> { + let values = Arc::new(TimestampMicrosecondArray::from(vec![0i64, 1_000_000])) + as Arc; + let msg = assert_rejected_conversion( + Field::new( + "a", + DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), + false, + ), + values, + DataType::Date32, + ) + .await?; + assert!(msg.contains("Expected: date"), "unexpected error: {msg}"); + Ok(()) + } + + /// SPARK-26709: an empty Parquet file with a column that would otherwise fail + /// the type-promotion check (INT32 read as INT64 when allow_type_promotion is + /// false) must still be readable. Spark's vectorized reader only enforces the + /// check per row group, so a file with no row groups passes silently. The + /// adapter's plan-time rejection must not fire for the empty-file case. + #[tokio::test] + async fn parquet_empty_file_disallowed_widening() -> Result<(), DataFusionError> { + let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let filename = get_temp_filename(); + let filename = filename.as_path().as_os_str().to_str().unwrap().to_string(); + let file = File::create(&filename)?; + let writer = ArrowWriter::try_new(file, Arc::clone(&file_schema), None)?; + writer.close()?; + + let required_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)])); + + let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + spark_parquet_options.allow_type_promotion = false; + + let expr_adapter_factory: Arc = Arc::new( + SparkPhysicalExprAdapterFactory::new(spark_parquet_options, None), + ); + + let object_store_url = ObjectStoreUrl::local_filesystem(); + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![PartitionedFile::from_path(filename)?]); + let file_scan_config = + FileScanConfigBuilder::new(object_store_url, Arc::new(parquet_source)) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(expr_adapter_factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + while let Some(batch) = stream.next().await { + let batch = batch?; + assert_eq!(batch.num_rows(), 0); + } + Ok(()) + } + + /// Companion to `parquet_empty_file_disallowed_widening`: a file with rows + /// must still raise `ParquetSchemaConvert` when the same widening is + /// rejected. Verifies the runtime check fires on non-empty input, + /// matching Spark's per-row-group behavior. + #[tokio::test] + async fn parquet_non_empty_file_disallowed_widening_errors() -> Result<(), DataFusionError> { + let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![values])?; + + let filename = get_temp_filename(); + let filename = filename.as_path().as_os_str().to_str().unwrap().to_string(); + let file = File::create(&filename)?; + let mut writer = ArrowWriter::try_new(file, Arc::clone(&file_schema), None)?; + writer.write(&batch)?; + writer.close()?; + + let required_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)])); + + let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + spark_parquet_options.allow_type_promotion = false; + + let expr_adapter_factory: Arc = Arc::new( + SparkPhysicalExprAdapterFactory::new(spark_parquet_options, None), + ); + + let object_store_url = ObjectStoreUrl::local_filesystem(); + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![PartitionedFile::from_path(filename)?]); + let file_scan_config = + FileScanConfigBuilder::new(object_store_url, Arc::new(parquet_source)) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(expr_adapter_factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + let first = stream.next().await.unwrap(); + let err = first.expect_err("expected ParquetSchemaConvert error on non-empty file"); + let msg = err.to_string(); + // The JVM shim sees the inner "[col]" via the JSON `column` field, matching + // Spark's `Arrays.toString(descriptor.getPath())` format. The Rust display + // wraps with another `[...]` from the error template. + assert!( + msg.contains("Column: [[col]]") + && msg.contains("Expected: bigint") + && msg.contains("Found: INT32"), + "unexpected error: {msg}" + ); Ok(()) } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 7cefe06da7..7a33d46282 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -121,6 +121,12 @@ message NativeScanCommon { bool use_field_id = 15; // True when spark.sql.parquet.fieldId.read.ignoreMissing is set. bool ignore_missing_field_id = 16; + // Whether widening type promotion is allowed (e.g. INT32 -> INT64, + // FLOAT -> DOUBLE). Set from Comet's per-Spark-version constant in + // ShimCometConf (false on 3.x, true on 4.x). When false, reading a column + // with a disallowed promoted type throws an error matching Spark's + // SchemaColumnConvertNotSupportedException behavior. + bool allow_type_promotion = 17; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 4c5c8e8fcc..5009232a46 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -211,6 +211,8 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setIgnoreMissingFieldId( scan.conf.getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)) + commonBuilder.setAllowTypePromotion(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED) + // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 3a634bb2b0..d4d19dfcc9 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -967,32 +967,21 @@ abstract class ParquetReadSuite extends CometTestBase { } test("schema evolution") { - Seq(true, false).foreach { enableSchemaEvolution => - Seq(true, false).foreach { useDictionary => - { - withSQLConf( - CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> enableSchemaEvolution.toString) { - val data = (0 until 100).map(i => { - val v = if (useDictionary) i % 5 else i - (v, v.toFloat) - }) - val readSchema = - StructType( - Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) - - withParquetDataFrame(data, schema = Some(readSchema)) { df => - val scan = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) - val isNativeDataFusionScan = - scan == CometConf.SCAN_NATIVE_DATAFUSION || scan == CometConf.SCAN_AUTO - if (enableSchemaEvolution || isNativeDataFusionScan) { - // native_datafusion has more permissive schema evolution - // https://github.com/apache/datafusion-comet/issues/3720 - checkAnswer(df, data.map(Row.fromTuple)) - } else { - assertThrows[SparkException](df.collect()) - } - } - } + // Comet's widening behavior tracks the Spark version (see ShimCometConf): + // 3.x rejects INT32 -> LONG and FLOAT -> DOUBLE on read, 4.x accepts. + Seq(true, false).foreach { useDictionary => + val data = (0 until 100).map(i => { + val v = if (useDictionary) i % 5 else i + (v, v.toFloat) + }) + val readSchema = + StructType(Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) + + withParquetDataFrame(data, schema = Some(readSchema)) { df => + if (CometConf.COMET_SCHEMA_EVOLUTION_ENABLED) { + checkAnswer(df, data.map(Row.fromTuple)) + } else { + assertThrows[SparkException](df.collect()) } } } @@ -1022,70 +1011,164 @@ abstract class ParquetReadSuite extends CometTestBase { } } + test("native_datafusion rejects BINARY (no decimal annotation) read as DecimalType") { + // Regression guard for https://github.com/apache/datafusion-comet/issues/4351, + // mirroring the BINARY -> DECIMAL(37, 1) iteration in SPARK-34212. + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + // CAST('1.2' AS BINARY) writes BYTE_ARRAY with no decimal annotation. + sql("SELECT CAST('1.2' AS BINARY) c").write.parquet(path) + Seq("DECIMAL(3, 2)", "DECIMAL(18, 1)", "DECIMAL(37, 1)").foreach { schema => + val outer = intercept[SparkException] { + spark.read.schema(s"c $schema").parquet(path).collect() + } + // Walk the cause chain: Comet's shim adds an extra SparkException + // wrap on Spark 3.x compared to vanilla Spark. + val chain = Iterator + .iterate[Throwable](outer)(_.getCause) + .takeWhile(_ != null) + .toSeq + assert( + chain.exists(_.isInstanceOf[ + org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException]), + s"expected SchemaColumnConvertNotSupportedException for $schema; chain was:\n" + + chain.map(t => s" ${t.getClass.getName}: ${t.getMessage}").mkString("\n")) + } + } + } + } + test("native_datafusion rejects incompatible decimal precision/scale") { - // Regression guard for https://github.com/apache/datafusion-comet/issues/4089. - // Reading Decimal(10,2) under a Decimal(5,0) read schema is unconditionally - // lossy: target precision is smaller than source precision and scales differ. - // Spark's vectorized reader throws SchemaColumnConvertNotSupportedException - // here on all versions. The native_datafusion scan must reject this in its - // schema adapter rather than letting Spark Cast silently rescale/truncate. + // Regression guard for #4089 and #4343. Spark's `isDecimalTypeMatched` + // accepts decimal-to-decimal only when `scaleIncrease >= 0` AND + // `precisionIncrease >= scaleIncrease`. + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + val cases = Seq( + // (file_p, file_s, read_p, read_s) + (10, 2, 5, 0), // #4089: scale narrows. + (10, 2, 5, 2), // precision-only narrowing. + (10, 4, 5, 2), // integer-precision narrowing (int-prec 6 -> 3). + (5, 2, 5, 3) + ) // scale widening overflows the integer side. + cases.foreach { case (filePrec, fileScale, readPrec, readScale) => + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .sql(s"select cast('1.23' as decimal($filePrec,$fileScale)) as d " + + s"union all select cast('4.56' as decimal($filePrec,$fileScale))") + .write + .parquet(path) + val df = spark.read.schema(s"d decimal($readPrec,$readScale)").parquet(path) + assertThrows[SparkException](df.collect()) + } + } + } + } + + test("native_datafusion rejects integer read as too-narrow decimal") { + // Regression guard for #4344. Spark's `canReadAsDecimal` requires + // `precision - scale >= 10` for INT32 sources and `>= 20` for INT64. withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + // INT32 source (Byte/Short/Int all written as INT32 by Spark). + Seq("byte", "short", "int").foreach { writeType => + withTempPath { dir => + val path = dir.getCanonicalPath + spark.range(1, 4).selectExpr(s"cast(id as $writeType) as c").write.parquet(path) + val df = spark.read.schema("c decimal(9, 0)").parquet(path) + assertThrows[SparkException](df.collect()) + } + } + // INT64 source. withTempPath { dir => val path = dir.getCanonicalPath - spark - .sql("select cast('123.45' as decimal(10,2)) as d " + - "union all select cast('67.89' as decimal(10,2))") - .write - .parquet(path) - val df = spark.read.schema("d decimal(5,0)").parquet(path) + spark.range(1, 4).selectExpr("cast(id as long) as c").write.parquet(path) + val df = spark.read.schema("c decimal(19, 0)").parquet(path) assertThrows[SparkException](df.collect()) } } } + test("native_datafusion rejects primitive Parquet conversions Spark rejects") { + // Regression guard for #4297. `getUpdater` has no branch for these + // (write_type, read_type) pairs. + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + val cases = Seq( + ("bigint", "8589934592", "int"), + ("double", "1e40", "float"), + ("float", "1.5", "bigint"), + ("bigint", "1", "double"), + ("int", "33554433", "float"), + ("int", "1", "timestamp"), + ("int", "1", "timestamp_ntz"), + ("bigint", "1", "date"), + ("double", "1.0", "bigint"), + ("date", "DATE'2020-01-01'", "timestamp"), + ("timestamp", "TIMESTAMP'2020-01-01 00:00:00'", "date"), + ("timestamp_ntz", "TIMESTAMP_NTZ'2020-01-01 00:00:00'", "date")) + cases.foreach { case (writeType, sourceLiteral, readType) => + withTempPath { dir => + val path = dir.getCanonicalPath + spark.sql(s"select cast($sourceLiteral as $writeType) as c").write.parquet(path) + val df = spark.read.schema(s"c $readType").parquet(path) + withClue(s"$writeType -> $readType: ") { + assertThrows[SparkException](df.collect()) + } + } + } + } + } + test("type widening: byte → short/int/long, short → int/long, int → long") { - withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") { - withTempPath { dir => - val path = dir.getCanonicalPath - val values = 1 to 10 - val options: Map[String, String] = Map.empty[String, String] - - // Input types and corresponding DataFrames - val inputDFs = Seq( - "byte" -> values.map(_.toByte).toDF("col1"), - "short" -> values.map(_.toShort).toDF("col1"), - "int" -> values.map(_.toInt).toDF("col1")) - - // Target Spark read schemas for widening - val widenTargets = Seq( - "short" -> values.map(_.toShort).toDF("col1"), - "int" -> values.map(_.toInt).toDF("col1"), - "long" -> values.map(_.toLong).toDF("col1")) - - for ((inputType, inputDF) <- inputDFs) { - val writePath = s"$path/$inputType" - inputDF.write.format("parquet").options(options).save(writePath) - - for ((targetType, targetDF) <- widenTargets) { - // Only test valid widenings (e.g., don't test int → short) - val wideningValid = (inputType, targetType) match { - case ("byte", "short" | "int" | "long") => true - case ("short", "int" | "long") => true - case ("int", "long") => true - case _ => false - } + // Widening of INT32 -> LONG is only allowed when Comet's type-promotion + // default permits it (Spark 4.x). See ShimCometConf. + assume(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED) + withTempPath { dir => + val path = dir.getCanonicalPath + val values = 1 to 10 + val options: Map[String, String] = Map.empty[String, String] + + // Input types and corresponding DataFrames + val inputDFs = Seq( + "byte" -> values.map(_.toByte).toDF("col1"), + "short" -> values.map(_.toShort).toDF("col1"), + "int" -> values.map(_.toInt).toDF("col1")) + + // Target Spark read schemas for widening + val widenTargets = Seq( + "short" -> values.map(_.toShort).toDF("col1"), + "int" -> values.map(_.toInt).toDF("col1"), + "long" -> values.map(_.toLong).toDF("col1")) + + for ((inputType, inputDF) <- inputDFs) { + val writePath = s"$path/$inputType" + inputDF.write.format("parquet").options(options).save(writePath) + + for ((targetType, targetDF) <- widenTargets) { + // Only test valid widenings (e.g., don't test int → short) + val wideningValid = (inputType, targetType) match { + case ("byte", "short" | "int" | "long") => true + case ("short", "int" | "long") => true + case ("int", "long") => true + case _ => false + } - if (wideningValid) { - val reader = spark.read - .schema(s"col1 $targetType") - .format("parquet") - .options(options) - .load(writePath) + if (wideningValid) { + val reader = spark.read + .schema(s"col1 $targetType") + .format("parquet") + .options(options) + .load(writePath) - checkAnswer(reader, targetDF) - } + checkAnswer(reader, targetDF) } } } @@ -1093,37 +1176,38 @@ abstract class ParquetReadSuite extends CometTestBase { } test("read byte, int, short, long together") { - withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") { - withTempPath { dir => - val path = dir.getCanonicalPath + // Reading INT32-encoded files under a LONG schema only succeeds when Comet's + // type-promotion default permits it (Spark 4.x). See ShimCometConf. + assume(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED) + withTempPath { dir => + val path = dir.getCanonicalPath - val byteDF = (Byte.MaxValue - 2 to Byte.MaxValue).map(_.toByte).toDF("col1") - val shortDF = (Short.MaxValue - 2 to Short.MaxValue).map(_.toShort).toDF("col1") - val intDF = (Int.MaxValue - 2 to Int.MaxValue).toDF("col1") - val longDF = (Long.MaxValue - 2 to Long.MaxValue).toDF("col1") - val unionDF = byteDF.union(shortDF).union(intDF).union(longDF) + val byteDF = (Byte.MaxValue - 2 to Byte.MaxValue).map(_.toByte).toDF("col1") + val shortDF = (Short.MaxValue - 2 to Short.MaxValue).map(_.toShort).toDF("col1") + val intDF = (Int.MaxValue - 2 to Int.MaxValue).toDF("col1") + val longDF = (Long.MaxValue - 2 to Long.MaxValue).toDF("col1") + val unionDF = byteDF.union(shortDF).union(intDF).union(longDF) - val byteDir = s"$path${File.separator}part=byte" - val shortDir = s"$path${File.separator}part=short" - val intDir = s"$path${File.separator}part=int" - val longDir = s"$path${File.separator}part=long" + val byteDir = s"$path${File.separator}part=byte" + val shortDir = s"$path${File.separator}part=short" + val intDir = s"$path${File.separator}part=int" + val longDir = s"$path${File.separator}part=long" - val options: Map[String, String] = Map.empty[String, String] + val options: Map[String, String] = Map.empty[String, String] - byteDF.write.format("parquet").options(options).save(byteDir) - shortDF.write.format("parquet").options(options).save(shortDir) - intDF.write.format("parquet").options(options).save(intDir) - longDF.write.format("parquet").options(options).save(longDir) + byteDF.write.format("parquet").options(options).save(byteDir) + shortDF.write.format("parquet").options(options).save(shortDir) + intDF.write.format("parquet").options(options).save(intDir) + longDF.write.format("parquet").options(options).save(longDir) - val df = spark.read - .schema(unionDF.schema) - .format("parquet") - .options(options) - .load(path) - .select("col1") + val df = spark.read + .schema(unionDF.schema) + .format("parquet") + .options(options) + .load(path) + .select("col1") - checkAnswer(df, unionDF) - } + checkAnswer(df, unionDF) } }