Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@
],
"sqlState" : "22023"
},
"AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : {
"message" : [
"Using <caseSensitivity> column name comparison, the AutoCDC key column `<keyColumnName>` is not present in the flow's selected source schema. AutoCDC requires every key column to be present in the source change-data feed and retained by any configured column selection."
],
"sqlState" : "22023"
},
"AUTOCDC_MICROBATCH_VALIDATION" : {
"message" : [
"AutoCDC flow on table <tableName> in batch <batchId> failed microbatch validation."
Expand Down Expand Up @@ -232,12 +238,24 @@
],
"sqlState" : "42703"
},
"AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : {
"AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
"message" : [
"Invalid AutoCDC destination <tableName> with multiple flows: <flows>. An AutoCDC target table must have exactly one flow writing to it."
],
"sqlState" : "42000"
},
"AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
"message" : [
"Using <caseSensitivity> column name comparison, the column `<columnName>` in the <schemaName> schema conflicts with the reserved AutoCDC column name `<reservedColumnName>`. Rename or remove the column."
"The column `<columnName>` in the <schemaName> schema collides with the reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using <caseSensitivity> column name comparison). Rename or remove the column."
],
"sqlState" : "42710"
},
"AUTOCDC_SCD2_NOT_SUPPORTED" : {
"message" : [
"AutoCDC flows do not currently support SCD Type 2 transformations."
],
"sqlState" : "0A000"
},
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
"message" : [
"Cannot write null value for field <name> defined as non-null Avro data type <dataType>.",
Expand Down Expand Up @@ -3710,6 +3728,11 @@
"Flow <flowIdentifier> returns an invalid relation type."
],
"subClass" : {
"AUTOCDC_RELATION_FOR_TEMPORARY_VIEW" : {
"message" : [
"AutoCDC flows must target a streaming table because their reconciliation semantics require a streaming-table sink, but the flow <flowIdentifier> attempts to write an AutoCDC relation to the temporary view <viewIdentifier>."
]
},
"BATCH_RELATION_FOR_STREAMING_TABLE" : {
"message" : [
"Streaming tables may only be defined by streaming relations, but the flow <flowIdentifier> attempts to write a batch relation to the streaming table <tableIdentifier>. Consider using the STREAM operator in Spark-SQL to convert the batch relation into a streaming relation, or populating the streaming table with an append once-flow instead."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, ShowNamespacesCommand}
import org.apache.spark.sql.pipelines.Language.Python
import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UnresolvedFlow}
import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow}
import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -371,7 +371,7 @@ private[connect] object PipelinesHandler extends Logging {
case proto.PipelineCommand.DefineFlow.DetailsCase.RELATION_FLOW_DETAILS =>
val relationFlowDetails = flow.getRelationFlowDetails
graphElementRegistry.registerFlow(
UnresolvedFlow(
UntypedFlow(
identifier = flowIdentifier,
destinationIdentifier = destinationIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object ColumnSelection {
}

/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */
private[autocdc] object CaseSensitivityLabels {
private[pipelines] object CaseSensitivityLabels {
val CaseSensitive: String = "case-sensitive"
val CaseInsensitive: String = "case-insensitive"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,15 @@ case class Scd1BatchProcessor(
}

object Scd1BatchProcessor {
// Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing.
private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row"
private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata"
/**
* Reserved column-name prefix for internal SDP AutoCDC processing. Source change-data-feed
* dataframes must not contain any columns starting with this prefix; the invariant is
* enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction.
*/
private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"

private[autocdc] val winningRowColName: String = s"${reservedColumnNamePrefix}winning_row"
private[pipelines] val cdcMetadataColName: String = s"${reservedColumnNamePrefix}metadata"

private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
Expand All @@ -434,7 +440,7 @@ object Scd1BatchProcessor {
/**
* Schema of the CDC metadata struct column for SCD1.
*/
private def cdcMetadataColSchema(sequencingType: DataType): StructType =
private[pipelines] def cdcMetadataColSchema(sequencingType: DataType): StructType =
StructType(
Seq(
// The sequencing of the event if it represents a delete, null otherwise.
Expand All @@ -448,7 +454,7 @@ object Scd1BatchProcessor {
* Construct the CDC metadata struct column for SCD1, following the exact schema and field
* ordering defined by [[cdcMetadataColSchema]].
*/
private[autocdc] def constructCdcMetadataCol(
private[pipelines] def constructCdcMetadataCol(
deleteSequence: Column,
upsertSequence: Column,
sequencingType: DataType): Column = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
} else {
f
}
convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult)
resolveFlow(flowToResolve, maybeNewFuncResult)

// If the flow failed due to an UnresolvedDatasetException, it means that one of the
// flow's inputs wasn't available. After other flows are resolved, these inputs
Expand All @@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) {
}
}

private def convertResolvedToTypedFlow(
private def resolveFlow(
flow: UnresolvedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UntypedFlow resolution uses funcResult.dataFrame.get.isStreaming to choose StreamingFlow vs CompleteFlow, but AutoCdcFlow always becomes AutoCdcMergeFlow regardless of whether the source is streaming.

That means an AutoCdcFlow with a batch source can still resolve successfully when the destination is a non-streaming table (e.g. materialized view), because validateFlowStreamingness only rejects streaming sources for MVs—not batch sources for AutoCDC specifically.

Since this PR documents that AutoCDC is streaming-only (once = false, class-level comments), consider enforcing df.isStreaming here (or in validateFlowStreamingness with an AutoCDC-specific check), e.g.:

case acf: AutoCdcFlow =>
  if (!funcResult.dataFrame.get.isStreaming) {
    throw new AnalysisException(
      errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
      messageParameters = Map("flowIdentifier" -> acf.identifier.quotedString)
    )
  }
  new AutoCdcMergeFlow(acf, funcResult)

Happy to defer to the execution PR if you prefer, but worth tracking so we don't rely on destination-type checks alone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a good question, but we don't actually need to do anything here because:

  1. It's not possible to create an MV with an AutoCDC flow input because (a) MV's must be defined with an inline flow function, (b) AutoCDC flows must be defined as standalone flows with a target, and (c) MVs are not allowed to have multiple input flows. That means if an AutoCDC flow targets an MV, the MV necessarily has at least 2 flows, and would be invalidated
  2. AutoCDC flows are unique in that users don't actually get to define their flow functions. SDP will define the AutoCDC flow function at flow registration time, and we will define it in such a way that forces it to be streaming by construction (i.e spark.read_stream(source))

Eventually we will support AutoCDC once flows which would indeed be batch flows, but that's not supported today - once = false always by construction.

As a middle ground though I'll also introduce a test demonstrating that AutoCDC flows cannot write to MVs in either the flow execution or flow registration PR.

case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf, funcResult)
}
}

private def transformUntypedFlowToResolvedFlow(
flow: UntypedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case _ if flow.once => new AppendOnceFlow(flow, funcResult)
case _ if funcResult.dataFrame.get.isStreaming =>
Expand All @@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
// then get their results overwritten.
val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size > 1
new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend)
case _: UnresolvedFlow => new CompleteFlow(flow, funcResult)
case _ => new CompleteFlow(flow, funcResult)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ package org.apache.spark.sql.pipelines.graph
import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{functions => F, AnalysisException, Column}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.pipelines.AnalysisWarning
import org.apache.spark.sql.pipelines.autocdc.{
CaseSensitivityLabels,
ChangeArgs,
ColumnSelection,
Scd1BatchProcessor,
ScdType
}
import org.apache.spark.sql.pipelines.util.InputReadOptions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructField, StructType}

/**
* Contains the catalog and database context information for query execution.
Expand Down Expand Up @@ -121,15 +129,56 @@ case class FlowFunctionResult(
}

/** A [[Flow]] whose output schema and dependencies aren't known. */
case class UnresolvedFlow(
sealed trait UnresolvedFlow extends Flow {
/** Returns a copy of this flow with the given SQL confs overriding the existing ones. */
def withSqlConf(newSqlConf: Map[String, String]): UnresolvedFlow
}

/**
* An [[UnresolvedFlow]] whose execution-type has not yet been determined.
*
* In some cases, we know the execution-type for an [[UnresolvedFlow]] even before flow analysis
* and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a
* flow will be an AutoCDC flow immediately on construction, because it has its own special
* registration API. Such flows are considered "typed flows", but there isn't any semantic reason
* yet to explicitly introduce a `TypedFlow` trait/class.
*/
case class UntypedFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
queryContext: QueryContext,
sqlConf: Map[String, String],
override val once: Boolean,
override val origin: QueryOrigin
) extends Flow
) extends UnresolvedFlow {
override def withSqlConf(newSqlConf: Map[String, String]): UntypedFlow =
copy(sqlConf = newSqlConf)
}

/**
* An unresolved but typed flow that applies a CDC event stream to a target table via MERGE.
*
* [[AutoCdcFlow]] is a typed flow because it is only supported for streaming, and not as a once
* flow. Therefore by definition it is a streaming-type flow.
*
* In the future once-support for [[AutoCdcFlow]] may be added.
*/
case class AutoCdcFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
queryContext: QueryContext,
sqlConf: Map[String, String] = Map.empty,
comment: Option[String] = None,
override val origin: QueryOrigin,
changeArgs: ChangeArgs
) extends UnresolvedFlow {
override val once: Boolean = false

override def withSqlConf(newSqlConf: Map[String, String]): AutoCdcFlow =
copy(sqlConf = newSqlConf)
}

/**
* A [[Flow]] whose flow function has been invoked, meaning either:
Expand Down Expand Up @@ -194,3 +243,140 @@ class AppendOnceFlow(

override val once = true
}

/**
* A resolved flow that applies a CDC event stream to a target table via MERGE, in accordance to
* the configured [[flow.changeArgs]].
*/
class AutoCdcMergeFlow(
val flow: AutoCdcFlow,
val funcResult: FlowFunctionResult
) extends ResolvedFlow {
requireReservedPrefixAbsentInSourceColumns()

def changeArgs: ChangeArgs = flow.changeArgs

/** The user-selected projection of [[df.schema]] (i.e. before the SCD metadata column). */
private val userSelectedSchema: StructType = {
val selectedSchema = ColumnSelection.applyToSchema(
schemaName = "changeDataFeed",
schema = df.schema,
columnSelection = changeArgs.columnSelection,
caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
)
// AutoCDC flows require all key columns to be present in the target table, to adhere to SCD
// semantics.
requireKeysPresentInSelectedSchema(selectedSchema)
selectedSchema
}

/** The DataType of the sequencing expression, derived once from the source change feed. */
private val sequencingType: DataType =
df.select(changeArgs.sequencing).schema.head.dataType

/**
* Returns the augmented output schema of this flow, which can differ from the schema of the
* source change-data-feed dataframe.
*
* The source dataframe's schema describes the incoming CDC events; the augmented schema here
* applies the user-specified [[ColumnSelection]] and appends the SCD-specific metadata
* columns that the AutoCDC MERGE engine projects onto the target table. Downstream
* dependencies in the pipeline see this augmented schema.
*/
override val schema: StructType = changeArgs.storedAsScdType match {
case ScdType.Type1 =>
// SCD1 produces a target table with all the user-selected output columns and a projected
// CDC operational metadata column at the end.
StructType(
userSelectedSchema.fields :+ StructField(
Scd1BatchProcessor.cdcMetadataColName,
Scd1BatchProcessor.cdcMetadataColSchema(sequencingType),
nullable = false
)
)
case ScdType.Type2 =>
throw new AnalysisException(
errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
messageParameters = Map.empty
)
}

/**
* Returns an empty dataframe whose schema matches [[AutoCdcMergeFlow.schema]].
*
* Today, [[AutoCdcMergeFlow.load]] is not actually ever called during graph analysis or
* execution. An AutoCdcMergeFlow can only be an input to a streaming table (not an MV or
* persisted/temp view), and streaming tables take a [[VirtualTableInput]] as input, not
* the producing [[Flow]] directly. [[VirtualTableInput]] overrides its own [[load]] to do
* schema inference on its input flows, rather than a transitive [[Flow.load]].
*
* The [[AutoCdcMergeFlow.load]] implementation exists solely for API consistency.
*/
override def load(readOptions: InputReadOptions): DataFrame = changeArgs.storedAsScdType match {
case ScdType.Type1 =>
val userSelectedCols: Seq[Column] = userSelectedSchema.fieldNames.toSeq.map(F.col)
val emptyCdcMetadataCol: Column = Scd1BatchProcessor.constructCdcMetadataCol(
deleteSequence = F.lit(null),
upsertSequence = F.lit(null),
sequencingType = sequencingType
).as(Scd1BatchProcessor.cdcMetadataColName)

df.select(userSelectedCols :+ emptyCdcMetadataCol: _*)
case ScdType.Type2 =>
throw new AnalysisException(
errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
messageParameters = Map.empty
)
}

/**
* Validate that the resolved source dataframe for the AutoCDC flow does not contain any column
* names that use the reserved Spark AutoCDC prefix.
*/
private def requireReservedPrefixAbsentInSourceColumns(): Unit = {
val resolver = spark.sessionState.conf.resolver
val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix

def nameContainsReservedPrefix(name: String): Boolean = {
name.length >= reservedPrefix.length && resolver(
name.substring(0, reservedPrefix.length),
reservedPrefix
)
}

df.schema.fieldNames.find(nameContainsReservedPrefix).foreach { conflictingColumnName =>
throw new AnalysisException(
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(
spark.sessionState.conf.caseSensitiveAnalysis
),
"columnName" -> conflictingColumnName,
"schemaName" -> "changeDataFeed",
"reservedColumnNamePrefix" -> reservedPrefix
)
)
}
}

/**
* Validate all keys specified in changeArgs are actually present in the user-selected schema.
*/
private def requireKeysPresentInSelectedSchema(selectedSchema: StructType): Unit = {
val resolver = spark.sessionState.conf.resolver

changeArgs.keys
.find(key => !selectedSchema.fieldNames.exists(name => resolver(name, key.name)))
.foreach { missingKey =>
throw new AnalysisException(
errorClass = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(
spark.sessionState.conf.caseSensitiveAnalysis
),
"keyColumnName" -> missingKey.name
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class GraphRegistrationContext(
}

def registerFlow(flowDef: UnresolvedFlow): Unit = {
flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf)
flows += flowDef.withSqlConf(defaultSqlConf ++ flowDef.sqlConf)
}

private def isEmpty: Boolean = {
Expand Down
Loading