From 0b3a0efca41f1a49c39e28c6ac2b25451d0dc9a4 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 26 May 2026 15:14:28 +0800 Subject: [PATCH] [SPARK-57068][SQL] Make SaveMode.Overwrite create the table when missing for SupportsCatalogOptions sources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? In `DataFrameWriter.saveCommand`, the `SaveMode.Append | SaveMode.Overwrite` branch calls `catalog.loadTable(ident)` without catching `NoSuchTableException` when the V2 source implements `SupportsCatalogOptions`. The exception propagates straight to the user, even though `SaveMode.ErrorIfExists` and `SaveMode.Ignore` on the same call succeed by routing to `CreateTableAsSelect`. This change catches `NoSuchTableException` for `SaveMode.Overwrite` only and routes to `CreateTableAsSelect(ignoreIfExists = false)`, mirroring the `createMode` arm immediately below. `SaveMode.Append` on a non-existent identifier intentionally continues to throw, because Append explicitly expects an existing table and silently creating would mask user mistakes. A new internal SQL conf `spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows` restores the pre-fix behavior for users who depend on it. The `CreateTableAsSelect` construction shared between the new fall-back path and the existing `createMode` arm is extracted into a private helper `createTableAsSelectForCatalogOptions` to keep both sites in sync. ### Why are the changes needed? The most idiomatic write call for any V2 connector, df.write.format(provider).mode("overwrite").save(newPath) fails with `NoSuchTableException` when `newPath` does not yet exist, whereas the equivalent V1 call (e.g. `format("parquet")`) succeeds by creating the table. V2 sources that implement `SupportsCatalogOptions` (Iceberg, Lance, and custom connectors) all hit this asymmetry. The fix aligns V2 `SaveMode.Overwrite` semantics with V1: overwrite-on-missing creates the table, overwrite-on-existing truncates and writes. Behavior matrix after this change: | Mode × Target | V1 | V2 before | V2 after | |------------------------|---------------|--------------|------------| | Overwrite, missing | creates | **throws** | creates | | Overwrite, existing | truncate+write| overwrite | unchanged | | Append, missing | creates | throws | throws* | | Append, existing | append | append | unchanged | | ErrorIfExists, missing | creates | creates | unchanged | | ErrorIfExists, existing| throws | throws | unchanged | | Ignore, missing | creates | creates | unchanged | | Ignore, existing | no-op | no-op | unchanged | \* Intentional V1 divergence — see PR description. There is an inherent race window between `loadTable` (throws) and `CreateTableAsSelect`: a concurrent writer creating the table in between will cause `TableAlreadyExistsException` rather than overwriting. This is acceptable; V1's filesystem-atomic path doesn't expose it because V1 never consults a catalog. Users retry. ### Does this PR introduce _any_ user-facing change? Yes. `df.write.format().mode("overwrite") .save()` now creates the table instead of throwing `NoSuchTableException`. No behavior change for paths that already exist. The migration guide has been updated. The legacy flag `spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows` restores the prior behavior. ### How was this patch tested? New tests in `SupportsCatalogOptionsSuite`: - `save works with Overwrite - no table, no partitioning, session catalog` - `save works with Overwrite - no table, with partitioning, session catalog` - `save works with Overwrite - no table, no partitioning, testcat catalog` - `save works with Overwrite - no table, with partitioning, testcat catalog` These reuse the existing `testCreateAndRead` helper, which verifies catalog state (table identity, partitioning, columns) in addition to data. Plus three behavior-pinning tests: - `Append mode still fails when table is missing - testcat catalog` (pins the intentional Append divergence) - `legacy flag restores throw on Overwrite-missing` (verifies the new conf) - `Overwrite + withSchemaEvolution on missing table is rejected` (verifies the schema-evolution gate fires with the expected error class) Existing tests continue to pass. ### Was this patch authored or co-authored using generative AI tooling? No. --- docs/sql-migration-guide.md | 1 + .../apache/spark/sql/internal/SQLConf.scala | 11 ++++ .../spark/sql/classic/DataFrameWriter.scala | 66 ++++++++++++------- .../SupportsCatalogOptionsSuite.scala | 57 ++++++++++++++++ 4 files changed, 113 insertions(+), 22 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 66531397d2cc1..721e438c62570 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -25,6 +25,7 @@ license: | ## Upgrading from Spark SQL 4.2 to 4.3 - Since Spark 4.3, the configuration key `spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled` has been renamed to `spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled` to reflect that it now applies to storage-partitioned joins, aggregates, and windows. The old key continues to work as an alias. +- Since Spark 4.3, `df.write.format().mode("overwrite").save()` creates the table when it does not yet exist, matching the long-standing V1 source behavior (parquet/JSON/ORC). Previously this surfaced `NoSuchTableException`. `SaveMode.Append` on a non-existent table continues to throw, since Append explicitly expects an existing table. To restore the pre-4.3 behavior, set `spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows` to `true`. ## Upgrading from Spark SQL 4.1 to 4.2 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 270b8aa31a565..6913535b9cd59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -7147,6 +7147,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS = + buildConf("spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows") + .internal() + .doc("When set to true, SaveMode.Overwrite against a missing table on a " + + "SupportsCatalogOptions source throws NoSuchTableException instead of " + + "creating the table. Restores the pre-SPARK-57068 behavior.") + .version("4.3.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + val CTE_RELATION_DEF_MAX_ROWS = buildConf("spark.sql.cteRelationDefMaxRows.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala index a9f16ffa87be1..c0accf7c0a23e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala @@ -176,7 +176,20 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - (catalog.loadTable(ident), Some(catalog), Some(ident)) + try { + (catalog.loadTable(ident), Some(catalog), Some(ident)) + } catch { + // SPARK-57068: align Overwrite-on-missing with V1 source behavior + // (parquet/JSON/ORC) by falling back to CreateTableAsSelect. Append is + // intentionally not handled here: it expects an existing table, and + // silently creating one would mask user mistakes. + case _: NoSuchTableException + if curmode == SaveMode.Overwrite && + !df.sparkSession.sessionState.conf.getConf( + SQLConf.LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS) => + return createTableAsSelectForCatalogOptions( + catalog, ident, finalOptions, ignoreIfExists = false) + } case _: TableProvider => val t = getTable if (t.supports(BATCH_WRITE)) { @@ -203,30 +216,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram case createMode => provider match { case supportsExtract: SupportsCatalogOptions => - if (_withSchemaEvolution) { - throw QueryCompilationErrors.schemaEvolutionNotSupportedForCreateTableWriteError() - } val ident = supportsExtract.extractIdentifier(dsOptions) val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - - val tableSpec = UnresolvedTableSpec( - properties = Map.empty, - provider = Some(source), - optionExpression = OptionList(Seq.empty), - location = extraOptions.get("path"), - comment = extraOptions.get(TableCatalog.PROP_COMMENT), - collation = extraOptions.get(TableCatalog.PROP_COLLATION), - serde = None, - external = false, - constraints = Seq.empty) - CreateTableAsSelect( - UnresolvedIdentifier( - catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name), - partitioningAsV2, - df.queryExecution.analyzed, - tableSpec, - finalOptions, + createTableAsSelectForCatalogOptions( + catalog, ident, finalOptions, ignoreIfExists = createMode == SaveMode.Ignore) case _: TableProvider => if (getTable.supports(BATCH_WRITE)) { @@ -248,6 +242,34 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram } } + private def createTableAsSelectForCatalogOptions( + catalog: TableCatalog, + ident: Identifier, + finalOptions: Map[String, String], + ignoreIfExists: Boolean): LogicalPlan = { + if (_withSchemaEvolution) { + throw QueryCompilationErrors.schemaEvolutionNotSupportedForCreateTableWriteError() + } + val tableSpec = UnresolvedTableSpec( + properties = Map.empty, + provider = Some(source), + optionExpression = OptionList(Seq.empty), + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + collation = extraOptions.get(TableCatalog.PROP_COLLATION), + serde = None, + external = false, + constraints = Seq.empty) + CreateTableAsSelect( + UnresolvedIdentifier( + catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name), + partitioningAsV2, + df.queryExecution.analyzed, + tableSpec, + finalOptions, + ignoreIfExists = ignoreIfExists) + } + private def assertSchemaEvolutionNotEnabledForV1Write(): Unit = { if (_withSchemaEvolution) { throw QueryCompilationErrors.schemaEvolutionNotSupportedForV1TableWriteError() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index 98904e6976074..7e35e76e38a31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.LongType @@ -199,6 +200,62 @@ class SupportsCatalogOptionsSuite extends SharedSparkSession with BeforeAndAfter checkAnswer(load("t1", Some(catalogName)), df2.toDF()) } + // SPARK-57068: SaveMode.Overwrite against a missing table now creates it, + // matching the long-standing V1 source behavior (parquet/JSON/ORC). + test(s"save works with Overwrite - no table, no partitioning, session catalog") { + testCreateAndRead(SaveMode.Overwrite, None, Nil) + } + + test(s"save works with Overwrite - no table, with partitioning, session catalog") { + testCreateAndRead(SaveMode.Overwrite, None, Seq("part")) + } + + test(s"save works with Overwrite - no table, no partitioning, testcat catalog") { + testCreateAndRead(SaveMode.Overwrite, Some(catalogName), Nil) + } + + test(s"save works with Overwrite - no table, with partitioning, testcat catalog") { + testCreateAndRead(SaveMode.Overwrite, Some(catalogName), Seq("part")) + } + + // SPARK-57068: SaveMode.Append on a missing table is intentionally still + // rejected (Append expects an existing table). Pin the strict behavior so a + // future broadening is an explicit decision. + test("SPARK-57068: Append mode still fails when table is missing - testcat catalog") { + val df = spark.range(10) + val e = intercept[NoSuchTableException] { + df.write.format(format).option("name", "t1").option("catalog", catalogName) + .mode(SaveMode.Append).save() + } + checkErrorTableNotFound(e, "`t1`") + } + + // SPARK-57068: legacy flag restores the pre-fix behavior (throw on Overwrite-missing). + test("SPARK-57068: legacy flag restores throw on Overwrite-missing") { + withSQLConf(SQLConf.LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS.key -> "true") { + val e = intercept[NoSuchTableException] { + spark.range(10).write.format(format).option("name", "t1") + .option("catalog", catalogName) + .mode(SaveMode.Overwrite).save() + } + checkErrorTableNotFound(e, "`t1`") + } + } + + // SPARK-57068: Overwrite-on-missing routes to CreateTableAsSelect, which does + // not support schema evolution. Verify the gate fires with a clear error + // rather than silently dropping the withSchemaEvolution() request. + test("SPARK-57068: Overwrite + withSchemaEvolution on missing table is rejected") { + checkError( + exception = intercept[AnalysisException] { + spark.range(10).write.format(format).option("name", "t1") + .option("catalog", catalogName) + .mode(SaveMode.Overwrite).withSchemaEvolution().save() + }, + condition = "UNSUPPORTED_SCHEMA_EVOLUTION.CREATE_TABLE", + parameters = Map.empty) + } + test("fail on user specified schema when reading - session catalog") { sql(s"create table t1 (id bigint) using $format") val e = intercept[IllegalArgumentException] {