diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 66531397d2cc1..721e438c62570 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -25,6 +25,7 @@ license: | ## Upgrading from Spark SQL 4.2 to 4.3 - Since Spark 4.3, the configuration key `spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled` has been renamed to `spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled` to reflect that it now applies to storage-partitioned joins, aggregates, and windows. The old key continues to work as an alias. +- Since Spark 4.3, `df.write.format().mode("overwrite").save()` creates the table when it does not yet exist, matching the long-standing V1 source behavior (parquet/JSON/ORC). Previously this surfaced `NoSuchTableException`. `SaveMode.Append` on a non-existent table continues to throw, since Append explicitly expects an existing table. To restore the pre-4.3 behavior, set `spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows` to `true`. ## Upgrading from Spark SQL 4.1 to 4.2 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 270b8aa31a565..6913535b9cd59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -7147,6 +7147,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS = + buildConf("spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows") + .internal() + .doc("When set to true, SaveMode.Overwrite against a missing table on a " + + "SupportsCatalogOptions source throws NoSuchTableException instead of " + + "creating the table. Restores the pre-SPARK-57068 behavior.") + .version("4.3.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + val CTE_RELATION_DEF_MAX_ROWS = buildConf("spark.sql.cteRelationDefMaxRows.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala index a9f16ffa87be1..c0accf7c0a23e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala @@ -176,7 +176,20 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - (catalog.loadTable(ident), Some(catalog), Some(ident)) + try { + (catalog.loadTable(ident), Some(catalog), Some(ident)) + } catch { + // SPARK-57068: align Overwrite-on-missing with V1 source behavior + // (parquet/JSON/ORC) by falling back to CreateTableAsSelect. Append is + // intentionally not handled here: it expects an existing table, and + // silently creating one would mask user mistakes. + case _: NoSuchTableException + if curmode == SaveMode.Overwrite && + !df.sparkSession.sessionState.conf.getConf( + SQLConf.LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS) => + return createTableAsSelectForCatalogOptions( + catalog, ident, finalOptions, ignoreIfExists = false) + } case _: TableProvider => val t = getTable if (t.supports(BATCH_WRITE)) { @@ -203,30 +216,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram case createMode => provider match { case supportsExtract: SupportsCatalogOptions => - if (_withSchemaEvolution) { - throw QueryCompilationErrors.schemaEvolutionNotSupportedForCreateTableWriteError() - } val ident = supportsExtract.extractIdentifier(dsOptions) val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - - val tableSpec = UnresolvedTableSpec( - properties = Map.empty, - provider = Some(source), - optionExpression = OptionList(Seq.empty), - location = extraOptions.get("path"), - comment = extraOptions.get(TableCatalog.PROP_COMMENT), - collation = extraOptions.get(TableCatalog.PROP_COLLATION), - serde = None, - external = false, - constraints = Seq.empty) - CreateTableAsSelect( - UnresolvedIdentifier( - catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name), - partitioningAsV2, - df.queryExecution.analyzed, - tableSpec, - finalOptions, + createTableAsSelectForCatalogOptions( + catalog, ident, finalOptions, ignoreIfExists = createMode == SaveMode.Ignore) case _: TableProvider => if (getTable.supports(BATCH_WRITE)) { @@ -248,6 +242,34 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram } } + private def createTableAsSelectForCatalogOptions( + catalog: TableCatalog, + ident: Identifier, + finalOptions: Map[String, String], + ignoreIfExists: Boolean): LogicalPlan = { + if (_withSchemaEvolution) { + throw QueryCompilationErrors.schemaEvolutionNotSupportedForCreateTableWriteError() + } + val tableSpec = UnresolvedTableSpec( + properties = Map.empty, + provider = Some(source), + optionExpression = OptionList(Seq.empty), + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + collation = extraOptions.get(TableCatalog.PROP_COLLATION), + serde = None, + external = false, + constraints = Seq.empty) + CreateTableAsSelect( + UnresolvedIdentifier( + catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name), + partitioningAsV2, + df.queryExecution.analyzed, + tableSpec, + finalOptions, + ignoreIfExists = ignoreIfExists) + } + private def assertSchemaEvolutionNotEnabledForV1Write(): Unit = { if (_withSchemaEvolution) { throw QueryCompilationErrors.schemaEvolutionNotSupportedForV1TableWriteError() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index 98904e6976074..7e35e76e38a31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.LongType @@ -199,6 +200,62 @@ class SupportsCatalogOptionsSuite extends SharedSparkSession with BeforeAndAfter checkAnswer(load("t1", Some(catalogName)), df2.toDF()) } + // SPARK-57068: SaveMode.Overwrite against a missing table now creates it, + // matching the long-standing V1 source behavior (parquet/JSON/ORC). + test(s"save works with Overwrite - no table, no partitioning, session catalog") { + testCreateAndRead(SaveMode.Overwrite, None, Nil) + } + + test(s"save works with Overwrite - no table, with partitioning, session catalog") { + testCreateAndRead(SaveMode.Overwrite, None, Seq("part")) + } + + test(s"save works with Overwrite - no table, no partitioning, testcat catalog") { + testCreateAndRead(SaveMode.Overwrite, Some(catalogName), Nil) + } + + test(s"save works with Overwrite - no table, with partitioning, testcat catalog") { + testCreateAndRead(SaveMode.Overwrite, Some(catalogName), Seq("part")) + } + + // SPARK-57068: SaveMode.Append on a missing table is intentionally still + // rejected (Append expects an existing table). Pin the strict behavior so a + // future broadening is an explicit decision. + test("SPARK-57068: Append mode still fails when table is missing - testcat catalog") { + val df = spark.range(10) + val e = intercept[NoSuchTableException] { + df.write.format(format).option("name", "t1").option("catalog", catalogName) + .mode(SaveMode.Append).save() + } + checkErrorTableNotFound(e, "`t1`") + } + + // SPARK-57068: legacy flag restores the pre-fix behavior (throw on Overwrite-missing). + test("SPARK-57068: legacy flag restores throw on Overwrite-missing") { + withSQLConf(SQLConf.LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS.key -> "true") { + val e = intercept[NoSuchTableException] { + spark.range(10).write.format(format).option("name", "t1") + .option("catalog", catalogName) + .mode(SaveMode.Overwrite).save() + } + checkErrorTableNotFound(e, "`t1`") + } + } + + // SPARK-57068: Overwrite-on-missing routes to CreateTableAsSelect, which does + // not support schema evolution. Verify the gate fires with a clear error + // rather than silently dropping the withSchemaEvolution() request. + test("SPARK-57068: Overwrite + withSchemaEvolution on missing table is rejected") { + checkError( + exception = intercept[AnalysisException] { + spark.range(10).write.format(format).option("name", "t1") + .option("catalog", catalogName) + .mode(SaveMode.Overwrite).withSchemaEvolution().save() + }, + condition = "UNSUPPORTED_SCHEMA_EVOLUTION.CREATE_TABLE", + parameters = Map.empty) + } + test("fail on user specified schema when reading - session catalog") { sql(s"create table t1 (id bigint) using $format") val e = intercept[IllegalArgumentException] {