From e530a209880423de1164d542362b8caccae315c7 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 16 Feb 2026 14:39:45 +0530 Subject: [PATCH 1/3] Fix multi-insert with native writer in Spark 4.x (#3430) --- .../apache/comet/rules/CometExecRule.scala | 19 +++++++++-- .../parquet/CometParquetWriterSuite.scala | 33 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 76e741e3bf..7f2d832791 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -483,7 +483,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]] if (isOperatorEnabled(serde, op)) { // For operators that require native children (like writes), check if all data-producing - // children are CometNativeExec. This prevents runtime failures when the native operator + // children produce Arrow data. This prevents runtime failures when the native operator // expects Arrow arrays but receives non-Arrow data (e.g., OnHeapColumnVector). if (serde.requiresNativeChildren && op.children.nonEmpty) { // Get the actual data-producing children (unwrap WriteFilesExec if present) @@ -491,7 +491,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { case writeFiles: WriteFilesExec => Seq(writeFiles.child) case other => Seq(other) } - if (!dataProducingChildren.forall(_.isInstanceOf[CometNativeExec])) { + if (!dataProducingChildren.forall(producesArrowData)) { withInfo(op, "Cannot perform native operation because input is not in Arrow format") return None } @@ -600,4 +600,19 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } + /** + * Checks if a plan produces Arrow-formatted data by unwrapping wrapper operators. This handles + * ReusedExchangeExec (used in multi-insert), QueryStageExec (AQE), and checks for CometExec + * (includes CometNativeExec and sink operators like CometUnionExec, CometCoalesceExec, etc.). + */ + private def producesArrowData(plan: SparkPlan): Boolean = { + plan match { + case _: CometExec => true + case r: ReusedExchangeExec => producesArrowData(r.child) + case s: ShuffleQueryStageExec => producesArrowData(s.plan) + case b: BroadcastQueryStageExec => producesArrowData(b.plan) + case _ => false + } + } + } diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index 815f03f213..ecb0bf5397 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -140,6 +140,39 @@ class CometParquetWriterSuite extends CometTestBase { } } + // Test for issue #3430: SPARK-48817 multi-insert with native writer in Spark 4.x + test("parquet write with multi-insert pattern") { + withTempPath { dir => + val output1 = new File(dir, "output1.parquet").getAbsolutePath + val output2 = new File(dir, "output2.parquet").getAbsolutePath + + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + + // Create source data with repartition (simulating SPARK-48817 test pattern) + val sourceData = spark.range(1, 10).toDF("id").repartition(3) + + // Write to first output + val plan1 = captureWritePlan(path => sourceData.write.parquet(path), output1) + + // Write to second output (simulating multi-insert reuse pattern) + val plan2 = captureWritePlan(path => sourceData.write.parquet(path), output2) + + // Verify both writes completed correctly + val result1 = spark.read.parquet(output1) + val result2 = spark.read.parquet(output2) + assert(result1.count() == 9) + assert(result2.count() == 9) + + // Verify native write was used for both + assertHasCometNativeWriteExec(plan1) + assertHasCometNativeWriteExec(plan2) + } + } + } + test("parquet write with map type") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath From 06552cf131af5b428a206df4655b2168d2f4e6a5 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 16 Mar 2026 23:43:36 +0530 Subject: [PATCH 2/3] Add ReusedExchangeExec assertion to multi-insert test --- .../parquet/CometParquetWriterSuite.scala | 68 +++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index ecb0bf5397..2d741c08a5 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -141,34 +142,47 @@ class CometParquetWriterSuite extends CometTestBase { } // Test for issue #3430: SPARK-48817 multi-insert with native writer in Spark 4.x + // Uses SQL multi-insert syntax to produce a plan with ReusedExchangeExec, + // which exercises the producesArrowData() path in CometExecRule. test("parquet write with multi-insert pattern") { - withTempPath { dir => - val output1 = new File(dir, "output1.parquet").getAbsolutePath - val output2 = new File(dir, "output2.parquet").getAbsolutePath - - withSQLConf( - CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", - CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true") { - - // Create source data with repartition (simulating SPARK-48817 test pattern) - val sourceData = spark.range(1, 10).toDF("id").repartition(3) - - // Write to first output - val plan1 = captureWritePlan(path => sourceData.write.parquet(path), output1) - - // Write to second output (simulating multi-insert reuse pattern) - val plan2 = captureWritePlan(path => sourceData.write.parquet(path), output2) - - // Verify both writes completed correctly - val result1 = spark.read.parquet(output1) - val result2 = spark.read.parquet(output2) - assert(result1.count() == 9) - assert(result2.count() == 9) - - // Verify native write was used for both - assertHasCometNativeWriteExec(plan1) - assertHasCometNativeWriteExec(plan2) + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + + withTable("src", "dst1", "dst2") { + sql("CREATE TABLE src(c1 INT) USING PARQUET") + sql("INSERT INTO src VALUES (1), (2), (3)") + sql("CREATE TABLE dst1(c1 INT) USING PARQUET") + sql("CREATE TABLE dst2(c1 INT) USING PARQUET") + + // Multi-insert: single plan inserts from one source into two tables. + // The REPARTITION hint forces a shuffle exchange that Spark reuses + // via ReusedExchangeExec for the second insert. + val multiInsertDf = sql(""" + |FROM (SELECT /*+ REPARTITION(3) */ c1 FROM src) + |INSERT OVERWRITE TABLE dst1 SELECT c1 + |INSERT OVERWRITE TABLE dst2 SELECT c1 + """.stripMargin) + val plan = multiInsertDf.queryExecution.executedPlan + + // Assert that the plan contains ReusedExchangeExec, proving + // we are exercising the multi-insert reuse path + val reusedExchanges = plan.collect { case r: ReusedExchangeExec => r } + assert( + reusedExchanges.nonEmpty, + s"Expected ReusedExchangeExec in the multi-insert plan, but found none:\n${plan.treeString}") + + // Assert native write was used + val nativeWrites = plan.collect { case n: CometNativeWriteExec => n } + assert( + nativeWrites.nonEmpty, + s"Expected CometNativeWriteExec in the plan, but found none:\n${plan.treeString}") + + // Verify data correctness + checkAnswer(sql("SELECT c1 FROM dst1 ORDER BY c1"), Seq(Row(1), Row(2), Row(3))) + checkAnswer(sql("SELECT c1 FROM dst2 ORDER BY c1"), Seq(Row(1), Row(2), Row(3))) } } } From 77bb427d511302a4aa960f01a60e6709f6e5fa4b Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 17 Mar 2026 00:03:42 +0530 Subject: [PATCH 3/3] Add SQL file test for multi-insert with native writer --- .../sql-tests/write/multi_insert.sql | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 spark/src/test/resources/sql-tests/write/multi_insert.sql diff --git a/spark/src/test/resources/sql-tests/write/multi_insert.sql b/spark/src/test/resources/sql-tests/write/multi_insert.sql new file mode 100644 index 0000000000..7e27bb2ba6 --- /dev/null +++ b/spark/src/test/resources/sql-tests/write/multi_insert.sql @@ -0,0 +1,75 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Test multi-insert with Comet native writer (issue #3430, SPARK-48817) +-- Validates that data written via multi-insert SQL is correct when +-- the native writer handles ReusedExchangeExec in the plan. + +-- Config: spark.comet.parquet.write.enabled=true +-- Config: spark.comet.operator.DataWritingCommandExec.allowIncompatible=true +-- Config: spark.comet.exec.enabled=true +-- Config: spark.sql.adaptive.enabled=false + +statement +CREATE TABLE multi_src(c1 INT) USING PARQUET + +statement +INSERT INTO multi_src VALUES (1), (2), (3), (4), (5) + +statement +CREATE TABLE multi_dst1(c1 INT) USING PARQUET + +statement +CREATE TABLE multi_dst2(c1 INT) USING PARQUET + +-- Multi-insert: single plan with ReusedExchangeExec +statement +FROM (SELECT /*+ REPARTITION(3) */ c1 FROM multi_src) +INSERT OVERWRITE TABLE multi_dst1 SELECT c1 +INSERT OVERWRITE TABLE multi_dst2 SELECT c1 + +-- Validate data in both destination tables +query +SELECT c1 FROM multi_dst1 ORDER BY c1 + +query +SELECT c1 FROM multi_dst2 ORDER BY c1 + +-- Verify both tables have equal content +query +SELECT count(*) FROM multi_dst1 + +query +SELECT count(*) FROM multi_dst2 + +-- Multi-insert with filtered inserts into different targets +statement +CREATE TABLE multi_dst3(c1 INT) USING PARQUET + +statement +CREATE TABLE multi_dst4(c1 INT) USING PARQUET + +statement +FROM (SELECT /*+ REPARTITION(2) */ c1 FROM multi_src) +INSERT OVERWRITE TABLE multi_dst3 SELECT c1 WHERE c1 <= 3 +INSERT OVERWRITE TABLE multi_dst4 SELECT c1 WHERE c1 > 3 + +query +SELECT c1 FROM multi_dst3 ORDER BY c1 + +query +SELECT c1 FROM multi_dst4 ORDER BY c1