Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ license: |
## Upgrading from Spark SQL 4.2 to 4.3

- Since Spark 4.3, the configuration key `spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled` has been renamed to `spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled` to reflect that it now applies to storage-partitioned joins, aggregates, and windows. The old key continues to work as an alias.
- Since Spark 4.3, `df.write.format(<V2 provider with SupportsCatalogOptions>).mode("overwrite").save(<new identifier>)` creates the table when it does not yet exist, matching the long-standing V1 source behavior (parquet/JSON/ORC). Previously this surfaced `NoSuchTableException`. `SaveMode.Append` on a non-existent table continues to throw, since Append explicitly expects an existing table. To restore the pre-4.3 behavior, set `spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows` to `true`.

## Upgrading from Spark SQL 4.1 to 4.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7147,6 +7147,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS =
buildConf("spark.sql.legacy.dataFrameWriter.overwriteOnMissingTableThrows")
.internal()
.doc("When set to true, SaveMode.Overwrite against a missing table on a " +
"SupportsCatalogOptions source throws NoSuchTableException instead of " +
"creating the table. Restores the pre-SPARK-57068 behavior.")
.version("4.3.0")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.3.0 or 4.2.0?

.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

val CTE_RELATION_DEF_MAX_ROWS =
buildConf("spark.sql.cteRelationDefMaxRows.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,20 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
val catalog = CatalogV2Util.getTableProviderCatalog(
supportsExtract, catalogManager, dsOptions)

(catalog.loadTable(ident), Some(catalog), Some(ident))
try {
(catalog.loadTable(ident), Some(catalog), Some(ident))
} catch {
// SPARK-57068: align Overwrite-on-missing with V1 source behavior
// (parquet/JSON/ORC) by falling back to CreateTableAsSelect. Append is
// intentionally not handled here: it expects an existing table, and
// silently creating one would mask user mistakes.
case _: NoSuchTableException
if curmode == SaveMode.Overwrite &&
!df.sparkSession.sessionState.conf.getConf(
SQLConf.LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS) =>
return createTableAsSelectForCatalogOptions(
catalog, ident, finalOptions, ignoreIfExists = false)
}
case _: TableProvider =>
val t = getTable
if (t.supports(BATCH_WRITE)) {
Expand All @@ -203,30 +216,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
case createMode =>
provider match {
case supportsExtract: SupportsCatalogOptions =>
if (_withSchemaEvolution) {
throw QueryCompilationErrors.schemaEvolutionNotSupportedForCreateTableWriteError()
}
val ident = supportsExtract.extractIdentifier(dsOptions)
val catalog = CatalogV2Util.getTableProviderCatalog(
supportsExtract, catalogManager, dsOptions)

val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
collation = extraOptions.get(TableCatalog.PROP_COLLATION),
serde = None,
external = false,
constraints = Seq.empty)
CreateTableAsSelect(
UnresolvedIdentifier(
catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
finalOptions,
createTableAsSelectForCatalogOptions(
catalog, ident, finalOptions,
ignoreIfExists = createMode == SaveMode.Ignore)
case _: TableProvider =>
if (getTable.supports(BATCH_WRITE)) {
Expand All @@ -248,6 +242,34 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
}
}

private def createTableAsSelectForCatalogOptions(
catalog: TableCatalog,
ident: Identifier,
finalOptions: Map[String, String],
ignoreIfExists: Boolean): LogicalPlan = {
if (_withSchemaEvolution) {
throw QueryCompilationErrors.schemaEvolutionNotSupportedForCreateTableWriteError()
}
val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
collation = extraOptions.get(TableCatalog.PROP_COLLATION),
serde = None,
external = false,
constraints = Seq.empty)
CreateTableAsSelect(
UnresolvedIdentifier(
catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
finalOptions,
ignoreIfExists = ignoreIfExists)
}

private def assertSchemaEvolutionNotEnabledForV1Write(): Unit = {
if (_withSchemaEvolution) {
throw QueryCompilationErrors.schemaEvolutionNotSupportedForV1TableWriteError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.LongType
Expand Down Expand Up @@ -199,6 +200,62 @@ class SupportsCatalogOptionsSuite extends SharedSparkSession with BeforeAndAfter
checkAnswer(load("t1", Some(catalogName)), df2.toDF())
}

// SPARK-57068: SaveMode.Overwrite against a missing table now creates it,
// matching the long-standing V1 source behavior (parquet/JSON/ORC).
test(s"save works with Overwrite - no table, no partitioning, session catalog") {
testCreateAndRead(SaveMode.Overwrite, None, Nil)
}

test(s"save works with Overwrite - no table, with partitioning, session catalog") {
testCreateAndRead(SaveMode.Overwrite, None, Seq("part"))
}

test(s"save works with Overwrite - no table, no partitioning, testcat catalog") {
testCreateAndRead(SaveMode.Overwrite, Some(catalogName), Nil)
}

test(s"save works with Overwrite - no table, with partitioning, testcat catalog") {
testCreateAndRead(SaveMode.Overwrite, Some(catalogName), Seq("part"))
}

// SPARK-57068: SaveMode.Append on a missing table is intentionally still
// rejected (Append expects an existing table). Pin the strict behavior so a
// future broadening is an explicit decision.
test("SPARK-57068: Append mode still fails when table is missing - testcat catalog") {
val df = spark.range(10)
val e = intercept[NoSuchTableException] {
df.write.format(format).option("name", "t1").option("catalog", catalogName)
.mode(SaveMode.Append).save()
}
checkErrorTableNotFound(e, "`t1`")
}

// SPARK-57068: legacy flag restores the pre-fix behavior (throw on Overwrite-missing).
test("SPARK-57068: legacy flag restores throw on Overwrite-missing") {
withSQLConf(SQLConf.LEGACY_DF_WRITER_OVERWRITE_MISSING_TABLE_THROWS.key -> "true") {
val e = intercept[NoSuchTableException] {
spark.range(10).write.format(format).option("name", "t1")
.option("catalog", catalogName)
.mode(SaveMode.Overwrite).save()
}
checkErrorTableNotFound(e, "`t1`")
}
}

// SPARK-57068: Overwrite-on-missing routes to CreateTableAsSelect, which does
// not support schema evolution. Verify the gate fires with a clear error
// rather than silently dropping the withSchemaEvolution() request.
test("SPARK-57068: Overwrite + withSchemaEvolution on missing table is rejected") {
checkError(
exception = intercept[AnalysisException] {
spark.range(10).write.format(format).option("name", "t1")
.option("catalog", catalogName)
.mode(SaveMode.Overwrite).withSchemaEvolution().save()
},
condition = "UNSUPPORTED_SCHEMA_EVOLUTION.CREATE_TABLE",
parameters = Map.empty)
}

test("fail on user specified schema when reading - session catalog") {
sql(s"create table t1 (id bigint) using $format")
val e = intercept[IllegalArgumentException] {
Expand Down