Skip to content
Closed
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@
],
"sqlState" : "0A000"
},
"AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE" : {
Comment thread
AnishMahto marked this conversation as resolved.
"message" : [
"Cannot start AutoCDC flow: the target table <tableName> (format: <format>) does not support row-level operations. AutoCDC requires a target backed by a connector that supports MERGE."
],
"sqlState" : "0A000"
},
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
"message" : [
"Cannot write null value for field <name> defined as non-null Avro data type <dataType>.",
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/pipelines/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ def create_auto_cdc_flow(
Note that for keys, sequence_by, column_list, and except_column_list the arguments have to
be column identifiers without qualifiers, e.g. they cannot be col("sourceTable.keyId").

The set and types of `keys` are part of the Auto CDC flow's persisted state. Changing keys
across incremental runs (renaming, swapping, growing, shrinking, or changing the type of a
key column) is not supported and will produce undefined behavior. To change the key set,
fully refresh the target table.

:param target: The name of the target table that receives the Auto CDC flow.
:param source: The name of the CDC source to stream from.
:param keys: The column or combination of columns that uniquely identify a row in the source \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.pipelines.autocdc

/**
* Names that AutoCDC reserves for its own use, both for internal columns it inserts during
* reconciliation (e.g. `${prefix}metadata`, `${prefix}winning_row`) and for internal tables it
* manages alongside user-defined targets (e.g. the per-target auxiliary state table).
*
* A single recognizable prefix gives a single auditable answer to "what does AutoCDC own", and
* lets user-defined columns and tables be unambiguously distinguished from AutoCDC-managed ones.
*/
private[pipelines] object AutoCdcReservedNames {

/** Common reserved-name prefix shared by AutoCDC internal columns and internal tables. */
val prefix: String = "__spark_autocdc_"
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,23 @@ private[pipelines] object CaseSensitivityLabels {
}

/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */
sealed trait ScdType
sealed trait ScdType {
/**
* Short, stable label for this SCD type. Persisted as table property on AutoCDC flow auxiliary
* tables.
*/
def label: String
}

object ScdType {
/** Representation for the standard SCD1 strategy. */
case object Type1 extends ScdType
case object Type1 extends ScdType {
override val label: String = "SCD1"
}
/** Representation for the standard SCD2 strategy. */
case object Type2 extends ScdType
case object Type2 extends ScdType {
override val label: String = "SCD2"
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,19 +367,29 @@ case class Scd1BatchProcessor(
val incomingWinsDelete = microbatchDeleteVersionField.isNotNull &&
microbatchDeleteVersionField > destinationUpsertVersionField

// When the incoming upsert wins against an existing record, the entire row (all columns)
// will be overwritten, including the CDC metadata column. We only exclude keys because
// most merge implementations require that join columns are not being mutated, even if
// the mutation is a no-op.
val resolver = microbatchDf.sparkSession.sessionState.conf.resolver
val keyNames = changeArgs.keys.map(_.name)

def constructTargetColumnAssignmentsFromMicrobatch(columnName: String): (String, Column) = {
// Map a column in the target table to its direct equivalent in the microbatch. Note that
// because of target-table schema evolution during SDP dataset materialization, the
// microbatch's columns are always a subset of (or equal to) the target's columns.
val quotedCol = QuotingUtils.quoteIdentifier(columnName)
s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
}

// Most merge implementations require that join columns are not mutated, even when the
// mutation would be a no-op. The remaining microbatch columns (including the CDC metadata
// column) are overwritten outright when the incoming upsert wins.
val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] =
microbatchDf.columns
.filterNot(c => keyNames.exists(resolver(_, c)))
.map { c =>
val quotedCol = QuotingUtils.quoteIdentifier(c)
s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
}
.map(constructTargetColumnAssignmentsFromMicrobatch)
.toMap

val columnsToInsertOnNewKey: Map[String, Column] =
microbatchDf.columns
.map(constructTargetColumnAssignmentsFromMicrobatch)
.toMap
Comment on lines +387 to 393
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes were needed to support schema evolution, which I could only test now that we've integrated flow execution with the rest of SDP.

Over multiple pipeline executions, the source microbatch's schema could ultimately become a subset of the target table's schema - we should take care to construct the column mappings appropriately.


microbatchDf
Expand All @@ -391,7 +401,12 @@ case class Scd1BatchProcessor(
// New key: only insert upserts; deletes for absent keys are no-ops for the target table
// merge, and instead would have been inserted as tombstones into the auxiliary table.
.whenNotMatched(microbatchDeleteVersionField.isNull)
.insertAll()
// When inserting a brand new row for a new key, construct column mappings from microbatch.
// The microbatch's columns may be a strict subset of the target's columns -- e.g. the user
// narrowed `column_list` between runs, or the source DF dropped a column. The target's
// columns can never be a strict subset of the microbatch's, however, because SDP's schema
// evolution always unions old and new schemas onto the target.
.insert(columnsToInsertOnNewKey)
.merge()
}

Expand All @@ -417,17 +432,15 @@ case class Scd1BatchProcessor(

object Scd1BatchProcessor {
/**
* Reserved column-name prefix for internal SDP AutoCDC processing. Source change-data-feed
* dataframes must not contain any columns starting with this prefix; the invariant is
* Internal columns inserted by AutoCDC reconciliation. Source change-data-feed dataframes must
* not contain any columns starting with [[AutoCdcReservedNames.prefix]]; the invariant is
* enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction.
*/
private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"

private[autocdc] val winningRowColName: String = s"${reservedColumnNamePrefix}winning_row"
private[pipelines] val cdcMetadataColName: String = s"${reservedColumnNamePrefix}metadata"
private[autocdc] val winningRowColName: String = s"${AutoCdcReservedNames.prefix}winning_row"
private[pipelines] val cdcMetadataColName: String = s"${AutoCdcReservedNames.prefix}metadata"

private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence"

/** Project the delete sequence out of the CDC metadata column. */
private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,20 @@ object DatasetManager extends Logging {
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
}

if (isFullRefresh) {
// On full refresh, drop the AutoCDC auxiliary state associated with this table (if any) so
// that stale delete-tracking data and table properties are not carried forward into the new
// table generation. We unconditionally issue the DROP for every fully-refreshed target.

// Intentionally DROP and not TRUNCATE: the auxiliary table is an internal state store
// that is not part of the dataflow graph, so it does not participate in regular schema
// evolution like user tables do. On a full refresh we want a clean recreation against
// the new target schema rather than carrying forward the previous generation's layout.

val auxiliaryTableId = AutoCdcAuxiliaryTable.identifier(table.identifier)
context.spark.sql(s"DROP TABLE IF EXISTS ${auxiliaryTableId.quotedString}")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Layer-mixing / asymmetry. DatasetManager is otherwise flow-type-agnostic, but this block reaches into AutoCDC's identifier scheme to drop a feature-specific internal table. Meanwhile, aux-table creation lives in Scd1MergeStreamingWrite.startStream. The asymmetry will compound when another internal-state table (SCD2, future formats) lands — each will need a matching drop sprinkled here.

Worth considering a per-flow onFullRefresh hook (or a registry of "managed internal tables per target") so the create/drop pair stays in the feature module. Acceptable in this PR if the team explicitly accepts this as the single exception for now, but please leave that decision visible in the comment or PR description.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a pretty fair concern, it's something I've been thinking about too.

The AutoCDC flow is the first type of flow to introduce the concept of an internal state table.

Streaming queries/flows have internal state, but that's managed in the form of checkpoint directories, not catalog table entities. And while its an intentional decision to make the AutoCDC auxiliary tables real catalog tables (ex. to inherit catalog based permission model and other functionality), it also means the pipeline needs to manage those internal tables in a similar fashion to actual destination tables.

I'm not too worried about compounding behavior with SCD2, but I agree there's probably a better data model here that we should eventually refactor to. Either an onFullRefresh hook like you mentioned, or maybe just add something like internalTableIdentifiers: Seq[TableIdentifier] to the flow interface, and then have DatasetManager do the same full refresh and schema evolution logic on the internal tables as it does for actual destination tables.

But in either case, these are pure refactorings and don't affect user observed behavior at all. Let's revisit this in the future as SCD2 lands, so that we don't end up prematurely choosing a refactor path that doesn't actually fit well.

}

// Alter the table if we need to
existingTableOpt.foreach { existingTable =>
val existingSchema = v2ColumnsToStructType(existingTable.columns())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.pipelines.AnalysisWarning
import org.apache.spark.sql.pipelines.autocdc.{
AutoCdcReservedNames,
CaseSensitivityLabels,
ChangeArgs,
ColumnSelection,
Expand Down Expand Up @@ -271,7 +272,7 @@ class AutoCdcMergeFlow(
}

/** The DataType of the sequencing expression, derived once from the source change feed. */
private val sequencingType: DataType =
private[graph] val sequencingType: DataType =
df.select(changeArgs.sequencing).schema.head.dataType

/**
Expand Down Expand Up @@ -335,7 +336,7 @@ class AutoCdcMergeFlow(
*/
private def requireReservedPrefixAbsentInSourceColumns(): Unit = {
val resolver = spark.sessionState.conf.resolver
val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix
val reservedPrefix = AutoCdcReservedNames.prefix

def nameContainsReservedPrefix(name: String): Boolean = {
name.length >= reservedPrefix.length && resolver(
Expand Down
Loading