[SPARK-56957][SDP] AutoCDC Flow Execution; Introduce and Integrate SCD1 Scd1MergeStreamingWrite#56122
Conversation
Upstream layers (MERGE planner, schema-merging utils, V2 connector column resolution) already surface organic failures for the only data-corrupting key-schema changes -- adding, dropping, or swapping keys raise UNRESOLVED_COLUMN; incompatible key-type changes raise CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE. Upcastable type changes and key-list reorderings remain (intentionally) silent no-ops. Removing the explicit drift check lets us delete the auxiliary-table numKeyColumns table property, the keys-first invariant on the aux schema, and the AUTOCDC_INVALID_STATE error condition. The auxiliary-table create/load path is now a one-liner that creates the table on first run and otherwise leaves it untouched. A follow-up branch (`prevent-autocdc-key-drift`) revisits drift detection; this commit is the baseline for that exploration. - common/utils/.../error-conditions.json: drop AUTOCDC_INVALID_STATE. - pipelines/.../FlowExecution.scala: drop validateNoAutoCdcKeyDrift, parseRecordedNumKeyColumns, autoCdcKeyColumns, numKeyColumnsProperty, and the now-unused identifier abstract member; rename createOrValidateAuxiliaryTable -> createAuxiliaryTableIfNotExists. - pipelines/.../AutoCdcScd1SchemaEvolutionSuite: drop the five KEY_SCHEMA_DRIFT cases; suite docstring now scopes to schema evolution only. - pipelines/.../AutoCdcScd1AuxiliaryTableDurabilitySuite: drop the two numKeyColumns INTERNAL_ERROR cases and the getAuxTableNumKeyColumns helper; existing structural tests no longer assert on the property. - pipelines/.../AutoCdcScd1MultiPipelineSuite: drop the cross-pipeline KEY_SCHEMA_DRIFT case. - pipelines/.../AutoCdcGraphExecutionTestMixin: drop KEY_SCHEMA_DRIFT reference in the retry-disable comment.
| .map(constructTargetColumnAssignmentsFromMicrobatch) | ||
| .toMap | ||
|
|
||
| val columnsToInsertOnNewKey: Map[String, Column] = | ||
| microbatchDf.columns | ||
| .map(constructTargetColumnAssignmentsFromMicrobatch) | ||
| .toMap |
There was a problem hiding this comment.
These changes were needed to support schema evolution, which I could only test now that we've integrated flow execution with the rest of SDP.
Over multiple pipeline executions, the source microbatch's schema could ultimately become a subset of the target table's schema - we should take care to construct the column mappings appropriately.
|
|
||
| override def startStream(): StreamingQuery = { | ||
| val sourceChangeDataFeed = graph.reanalyzeFlow(flow).df | ||
| val auxiliaryTableIdentifier = createAuxiliaryTableIfNotExists(spark = updateContext.spark) |
There was a problem hiding this comment.
I'll leave a comment in the code in a bit, but we create the auxiliary table on flow execution and not dataset materialization because:
- Unlike other pipeline entities, the aux table is effectively an internal state store. We generally do not want the table to be exposed to other flows, and therefore the aux table is not added in the graph registration context's table set. That means it will also not be materialized by dataset manager.
- Whether we create an aux table is a function of flow type. But we cannot create the aux table when we resolve into the
AutoCdcMergeFlowflow, because the auxiliary table's format is dependent on the target table's format, which is only created at table materialization time. Flow resolution should generally also be side-affect free, ex. for validation runs.
|
@szehon-ho @gengliangwang Ready for review, this is a major piece in actually executing the SCD1 foreach batch logic within a pipeline. There's less than 300 LOC for non-test changes, the remaining 2k LOC is just tests 😛. Now that we can actually execute the AutoCDC flow within a pipeline, there's a number of E2E tests that we need to add, hence the blown up diff. |
Scd1MergeStreamingWriteScd1MergeStreamingWrite
szehon-ho
left a comment
There was a problem hiding this comment.
A couple of cross-cutting points beyond the inline notes:
-
PR description is now out of date relative to the latest commit. It still claims
AutoCdcScd1MultiPipelineSuiteincludes the cross-pipeline key-drift case,AutoCdcScd1SchemaEvolutionSuitecovers key-set drift, andAutoCdcScd1AuxiliaryTableDurabilitySuitetests thenumKeyColumns/ keys-first invariants. Those tests were removed inf17c7d0. Worth refreshing the description so reviewers don't go looking for them. -
Key-drift safety messaging. The commit message for
f17c7d0says upstream MERGE / V2 resolution raise organic errors for the only data-corrupting key changes, but that's only true for adding keys and type-incompatible changes. Shrinking a composite key ([region, id]->[id]) or swapping a key ([id, region]->[id, country]) does not raiseUNRESOLVED_COLUMN-- all referenced columns still exist; only the aux-table join semantics change. Until the follow-upprevent-autocdc-key-driftbranch lands, please either (a) add an eager subset/equality check on the existing aux table's schema increateAuxiliaryTableIfNotExists(see inline comment), or (b) document in the SDP programming guide that changing AutoCDC keys on an existing target requires full refresh.
| * Idempotently create the auxiliary table for [[destination]] if it does not already exist | ||
| * and return its [[TableIdentifier]]. | ||
| */ | ||
| protected def createAuxiliaryTableIfNotExists(spark: SparkSession): TableIdentifier = { |
There was a problem hiding this comment.
Concern: CREATE TABLE IF NOT EXISTS is a silent no-op against a stale aux schema.
After dropping validateNoAutoCdcKeyDrift in f17c7d0, this helper is the only thing that runs against a previously-created aux table. If a prior run created the aux table with a wider key set (e.g. (region, id, _cdc_metadata)) and the flow now declares keys = Seq("id"), the existing aux table persists, the MERGE joins only on id, and tombstones from (region=X, id=1) apply to every event with id=1 regardless of region -- silent semantic corruption.
Two options worth considering:
- Cheapest: branch on
catalog.tableExistsand, when the table exists, load it and assert that its column set equalsauxiliaryTableSchema's column set. Much lighter than the removednumKeyColumnsvalidation, but enough to catch shrink/swap. - Alternatively, keep current behavior but document in the SDP programming guide that changing AutoCDC keys on an existing target requires a full refresh, so users know how to recover.
Also a minor perf nit: today this issues CREATE TABLE IF NOT EXISTS on every pipeline start; gating on tableExists would skip the redundant SQL after the first run.
There was a problem hiding this comment.
(2) is definitely the intended behavior - any key changes must require a full refresh, it is semantically invalid to change the keys mid-way through and SCD decomposition.
I'm choosing not to do the tableExists check though because:
- This is a very small cost incurred once when the streaming query executes; anyway processing the microbatch will take many orders of magnitude more wall clock time
- Very small edge case, but we'd still need to do
CREATE TABLE IF NOT EXISTSin the DDL, because there could otherwise be a TOCTOU race condition between checking and creating table catalog.tableExistsis still going to incur some cost
Btw something else that I'm tangentially thinking about is for SCD2, the aux table will additionally contain data columns (it must, for to support track-history columns), which are subject to schema evolution.
Or even for SCD1, when we eventually support ignoreNull, it should be possible for users to upgrade their SCD1 flows to start using ignoreNull, which would require evolving the CDC metadata column.
What this means is eventually we will have to add schema evolution for the aux table, even if the table is not being created for the first time.
But we don't need to do that now, lets do it when either SCD2 or ignoreNull lands. And in all cases, its still not valid for key columns to change without full refresh.
gengliangwang
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem. Before this PR, AutoCdcMergeFlow could be registered with the dataflow graph and reach planning, but FlowPlanner.plan had no case for it — execution fell through to UnsupportedOperationException. The per-microbatch SCD1 machinery (Scd1BatchProcessor, Scd1ForeachBatchHandler, ScdBatchValidator) was already in place; what was missing was the physical execution wrapper that drives those over a foreachBatch streaming sink, plus the lifecycle of the auxiliary state table that tracks per-key tombstones.
Design approach. Introduce Scd1MergeStreamingWrite (a StreamingFlowExecution) that, on startStream, idempotently creates the AutoCDC auxiliary table (schema = keys in declaration order + CDC metadata struct) and starts a streaming query whose foreachBatch instantiates Scd1ForeachBatchHandler. Auxiliary-table responsibilities are split between AutoCdcAuxiliaryTable (identifier derivation) and AutoCdcMergeWriteMixin (creation + row-level-ops eligibility check at construction). The reserved-name prefix __spark_autocdc_, previously a private constant on Scd1BatchProcessor, is hoisted to AutoCdcReservedNames.prefix so column- and table-name reservation share a single source of truth and DatasetManager can safely DROP the aux table on full refresh without colliding with user-managed tables.
Key design decisions made by this PR.
- Aux-table creation site: deferred to flow execution (
startStream), not dataset materialization — because the aux table is not a graph entity and its format must mirror the target's, which is only finalized after materialization. (Worth landing the rationale as a code comment — see inline.) - Aux-table cleanup on full refresh: explicit
DROP IF EXISTSissued inDatasetManager.materializeTablefor every fully-refreshed target (no-op for non-AutoCDC targets thanks to the reserved prefix). Justified by the aux table holding stateful table properties (e.g. SCD key count) and a schema that doesn't flow through the graph's schema-evolution path. - Eligibility check: target must implement
SupportsRowLevelOperations(V2 MERGE/UPDATE/DELETE-with-rewrite contract). Raised asAUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE(sqlState0A000, mirroringAUTOCDC_SCD2_NOT_SUPPORTED). - Insert-on-new-key uses explicit column map:
Scd1BatchProcessor.mergeMicrobatchOntoTargetswitches.insertAll()→.insert(columnsToInsertOnNewKey)so microbatches whose columns are a strict subset of the target (narrowedcolumn_list, target rows with extra schema-evolution columns) insert correctly; target-only columns become NULL. - Out of scope (per PR description): cross-pipeline-invocation key-set drift detection.
Implementation sketch. AutoCdcMergeFlow (analysis-time) → FlowPlanner.plan (new case for Type1) → Scd1MergeStreamingWrite (constructor validates row-level-ops; startStream creates aux table and foreachBatchs into Scd1ForeachBatchHandler.execute → Scd1BatchProcessor.reconcile + mergeOntoAuxiliaryTable + mergeOntoTarget). DatasetManager.materializeTable cooperates by dropping the aux table on full refresh. AutoCdcGraphExecutionTestMixin wires a SharedTablesInMemoryRowLevelOperationTableCatalog so cloned streaming-query sessions see the same tables.
Main things to discuss in the inline comments: (1) FlowPlanner falls back to output.asInstanceOf[Table] instead of mirroring the StreamingFlow case's output match arms, (2) DatasetManager (graph-layer) now embeds AutoCDC-specific drop logic while creation lives in Scd1MergeStreamingWrite — the asymmetry, (3) Scd1MergeStreamingWrite.sequencingType duplicates AutoCdcMergeFlow.sequencingType, and (4) resolveTableCatalog's bare asInstanceOf[TableCatalog] produces a cryptic ClassCastException. The remainder are localized text-quality nits.
Reviewed by Claude (Opus 4.7) on behalf of @gengliangwang.
| // drop the existing auxiliary table schema so it can be recomputed. | ||
|
|
||
| val auxiliaryTableId = AutoCdcAuxiliaryTable.identifier(table.identifier) | ||
| context.spark.sql(s"DROP TABLE IF EXISTS ${auxiliaryTableId.quotedString}") |
There was a problem hiding this comment.
Layer-mixing / asymmetry. DatasetManager is otherwise flow-type-agnostic, but this block reaches into AutoCDC's identifier scheme to drop a feature-specific internal table. Meanwhile, aux-table creation lives in Scd1MergeStreamingWrite.startStream. The asymmetry will compound when another internal-state table (SCD2, future formats) lands — each will need a matching drop sprinkled here.
Worth considering a per-flow onFullRefresh hook (or a registry of "managed internal tables per target") so the create/drop pair stays in the feature module. Acceptable in this PR if the team explicitly accepts this as the single exception for now, but please leave that decision visible in the comment or PR description.
There was a problem hiding this comment.
Yeah this is a pretty fair concern, it's something I've been thinking about too.
The AutoCDC flow is the first type of flow to introduce the concept of an internal state table.
Streaming queries/flows have internal state, but that's managed in the form of checkpoint directories, not catalog table entities. And while its an intentional decision to make the AutoCDC auxiliary tables real catalog tables (ex. to inherit catalog based permission model and other functionality), it also means the pipeline needs to manage those internal tables in a similar fashion to actual destination tables.
I'm not too worried about compounding behavior with SCD2, but I agree there's probably a better data model here that we should eventually refactor to. Either an onFullRefresh hook like you mentioned, or maybe just add something like internalTableIdentifiers: Seq[TableIdentifier] to the flow interface, and then have DatasetManager do the same full refresh and schema evolution logic on the internal tables as it does for actual destination tables.
But in either case, these are pure refactorings and don't affect user observed behavior at all. Let's revisit this in the future as SCD2 lands, so that we don't end up prematurely choosing a refactor path that doesn't actually fit well.
The V2 writer's TableOutputResolver produces INCOMPATIBLE_DATA_FOR_TABLE errors with an empty `tableName` during plan analysis (the merge plan does not carry the target's catalog identifier through to the resolver call site), and AMBIGUOUS_REFERENCE now carries a SQL query context. Update the SCD1 schema-evolution test assertions to match.
szehon-ho
left a comment
There was a problem hiding this comment.
lgtm, some non blocking nits
| * AutoCDC-managed and cannot collide with a user-managed table. | ||
| */ | ||
| def identifier(destination: TableIdentifier): TableIdentifier = TableIdentifier( | ||
| table = s"${AutoCdcReservedNames.prefix}aux_state_${destination.table}", |
There was a problem hiding this comment.
suggestion: should we put scd type in the name of the table for debuggability? name will be hard to change once released
There was a problem hiding this comment.
I decided to put it in a table property on the aux table instead. If we embed it in the name, then there's some weird edge cases such as when deleting the aux table on a full refresh we'd have to enumerate all possible names based on SCD type and try dropping the table.
Also leaves the door open for a future where we allow users to rename the aux table, although that's not supported today.
| * `changeArgs.keys` declaration order. Keys are guaranteed to be present in the schema | ||
| * because [[AutoCdcMergeFlow.schema]] validates that. | ||
| */ | ||
| private lazy val autoCdcKeyFields: Seq[StructField] = { |
There was a problem hiding this comment.
question: do we inherit nullabie property of the fields in the aux table? even for 'keys' where we expect non-null?
There was a problem hiding this comment.
We do indeed inherit, which is my preference - we should try to mirror the user's declared schema as-is.
That being said, per microbatch validator invalidates microbatches that have null key column values. Note that even if we forced the field to be non-null in the schema, the spark engine doesn't actually enforce null-ness at runtime (AFAIK) so either way the we'd still have to do our own validation.
The schema's nullability is only really meaningful for downstream schema resolution.
cloud-fan
left a comment
There was a problem hiding this comment.
Three observations on top of the existing reviewer threads (which between them already cover the core design and most cleanups).
Reviewed by Claude (Opus 4.7) on behalf of @cloud-fan.
| // table generation. We unconditionally issue the DROP for every fully-refreshed target; for | ||
| // non-AutoCDC tables this is a no-op because [[AutoCdcAuxiliaryTable.identifier]] derives | ||
| // its name from [[AutoCdcReservedNames.prefix]], which is reserved by AutoCDC and | ||
| // therefore cannot collide with a user-managed table. |
There was a problem hiding this comment.
Reserved prefix is asserted, not enforced. This comment ('reserved by AutoCDC and therefore cannot collide with a user-managed table') is what makes the unconditional DROP IF EXISTS below safe — and what makes createAuxiliaryTableIfNotExists safe against silently adopting a stale user table. The PR hoisted AutoCdcReservedNames.prefix and uses it to reject column names via AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns, but the symmetric reservation for table names is not enforced anywhere. A user who happens to have created ${catalog}.${db}.__spark_autocdc_aux_state_<target> would either be silently dropped on full refresh or silently adopted as an aux table.
Either (a) soften the wording ("… by convention …") and acknowledge that AutoCDC trusts user tables not to use this prefix, or (b) add a graph-registration-time check that rejects any user-registered table name starting with AutoCdcReservedNames.prefix. This concern compounds with the layer-mixing thread on this same block.
There was a problem hiding this comment.
I went with (a) and softened the wording both here and in AutoCdcAuxiliaryTable.identifier to make it clear that any table with the same name as what we derive is always assumed to an auxiliary table, and managed appropriately by the running pipeline.
I did not go with (b) because:
- adding validation to all tables in the dataflow graph incurs a non-trivial cost, there can be thousands of tables
- the validation doesn't properly protect user tables anyway. There can be a user table created outside of this dataflow graph (ex. by another pipeline or just manually created) that shares the same name. If we introduced the concept of pipeline<->table ownership, then we could throw if there already exists a table with the constructed name that does not belong to this pipeline, but that concept does not exist today.
- we don't necessarily want to even block users from creating a table with the same name as what we would deduce for AutoCDC flows; users may intentionally want to manually tweak or prefill the aux table. The analogy here to me is Spark streaming checkpoints; users are free to create their own streaming checkpoint directories and hook it up to a new streaming query.
- The constructed name uses a very specific and fairly long prefix, in addition to embedding the target table's name. If the user created a table with that name themselves, chances are they know what they are doing and it wasn't just an accident
|
thanks, merging to master/4.x/4.2 (auto CDC is in 4.2) |
…D1 `Scd1MergeStreamingWrite` ### What changes were proposed in this pull request? In order for a pipeline to actually execute an AutoCDC SCD1 flow, the SDP engine needs to have a "physical" flow definition that defines what streaming transformation must be done for SCD1, and how to construct this physical flow given a "logical" flow. The `FlowPlanner` is responsible for converting a resolved SCD1 streaming logical flow into the SCD1 streaming physical flow. The physical flow implements the SCD1 `foreachBatch` streaming query on the flow input. Integration of physical flow unblocks pipeline execution with AutoCDC flows, which means we need to also fill gaps for auxiliary/target table management, schema evolution, inter-pipeline validation, etc. One validation these changes intentionally do not include is validating against a changing key-set across pipeline invocations - that requires more design and will be handled in a separate PR. ### Why are the changes needed? To actually execute AutoCDC SCD1 flow transformation with an SDP pipeline. Before this point if an AutoCDC flow was registered with the graph, the graph analysis engine would throw an unrecognized flow exception. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? With these changes, we can now actually test how an AutoCDC flow interacts with the rest of SDP. This unlocks a number of features/integrations that we should test, such as: - Schema evolution - Writing to different pipeline dataset types - Full refresh semantics - Multiflow, multipipeline, for the same AutoCDC target - Executing AutoCDC flows over multiple independent pipeline runs - etc. As such I added a new `AutoCdcGraphExecutionTestMixin` providing a v2-row-level-ops-capable catalog (`SharedTablesInMemoryRowLevelOperationTableCatalog`) and the standard fixtures all AutoCDC E2E suites share, plus six new end-to-end suites (~30 tests) covering: - `AutoCdcScd1SinglePipelineSuite` — basic upsert/delete reconciliation in a single pipeline, plus the `AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE` failure path. - `AutoCdcScd1MultiPipelineSuite` — independent targets across pipelines, downstream readers of an AutoCDC target. - `AutoCdcScd1FullRefreshSuite` — full refresh wipes target rows + aux table; sequence comparisons reset; selective refresh isolates state. - `AutoCdcScd1SchemaEvolutionSuite` — broadening/narrowing column selection, nullable column addition, type widening/narrowing, nested struct/array evolution, case-only collisions, etc. - `AutoCdcScd1AuxiliaryTableDurabilitySuite` — keys-first invariant, declared key order preserved, multi-run sequence comparisons, transparent aux-table recreation if dropped. - `AutoCdcScd1TargetTableDurabilitySuite` — pre-loaded target rows, late-added CDC metadata column. ### Was this patch authored or co-authored using generative AI tooling? Co-authored. Generated-by: Claude-Opus-4.7-thinking-xhigh Closes #56122 from AnishMahto/SPARK-56957-implement-SCD1-execution-streaming-flow. Lead-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7632d37) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…D1 `Scd1MergeStreamingWrite` ### What changes were proposed in this pull request? In order for a pipeline to actually execute an AutoCDC SCD1 flow, the SDP engine needs to have a "physical" flow definition that defines what streaming transformation must be done for SCD1, and how to construct this physical flow given a "logical" flow. The `FlowPlanner` is responsible for converting a resolved SCD1 streaming logical flow into the SCD1 streaming physical flow. The physical flow implements the SCD1 `foreachBatch` streaming query on the flow input. Integration of physical flow unblocks pipeline execution with AutoCDC flows, which means we need to also fill gaps for auxiliary/target table management, schema evolution, inter-pipeline validation, etc. One validation these changes intentionally do not include is validating against a changing key-set across pipeline invocations - that requires more design and will be handled in a separate PR. ### Why are the changes needed? To actually execute AutoCDC SCD1 flow transformation with an SDP pipeline. Before this point if an AutoCDC flow was registered with the graph, the graph analysis engine would throw an unrecognized flow exception. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? With these changes, we can now actually test how an AutoCDC flow interacts with the rest of SDP. This unlocks a number of features/integrations that we should test, such as: - Schema evolution - Writing to different pipeline dataset types - Full refresh semantics - Multiflow, multipipeline, for the same AutoCDC target - Executing AutoCDC flows over multiple independent pipeline runs - etc. As such I added a new `AutoCdcGraphExecutionTestMixin` providing a v2-row-level-ops-capable catalog (`SharedTablesInMemoryRowLevelOperationTableCatalog`) and the standard fixtures all AutoCDC E2E suites share, plus six new end-to-end suites (~30 tests) covering: - `AutoCdcScd1SinglePipelineSuite` — basic upsert/delete reconciliation in a single pipeline, plus the `AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE` failure path. - `AutoCdcScd1MultiPipelineSuite` — independent targets across pipelines, downstream readers of an AutoCDC target. - `AutoCdcScd1FullRefreshSuite` — full refresh wipes target rows + aux table; sequence comparisons reset; selective refresh isolates state. - `AutoCdcScd1SchemaEvolutionSuite` — broadening/narrowing column selection, nullable column addition, type widening/narrowing, nested struct/array evolution, case-only collisions, etc. - `AutoCdcScd1AuxiliaryTableDurabilitySuite` — keys-first invariant, declared key order preserved, multi-run sequence comparisons, transparent aux-table recreation if dropped. - `AutoCdcScd1TargetTableDurabilitySuite` — pre-loaded target rows, late-added CDC metadata column. ### Was this patch authored or co-authored using generative AI tooling? Co-authored. Generated-by: Claude-Opus-4.7-thinking-xhigh Closes #56122 from AnishMahto/SPARK-56957-implement-SCD1-execution-streaming-flow. Lead-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7632d37) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In order for a pipeline to actually execute an AutoCDC SCD1 flow, the SDP engine needs to have a "physical" flow definition that defines what streaming transformation must be done for SCD1, and how to construct this physical flow given a "logical" flow.
The
FlowPlanneris responsible for converting a resolved SCD1 streaming logical flow into the SCD1 streaming physical flow. The physical flow implements the SCD1foreachBatchstreaming query on the flow input.Integration of physical flow unblocks pipeline execution with AutoCDC flows, which means we need to also fill gaps for auxiliary/target table management, schema evolution, inter-pipeline validation, etc.
One validation these changes intentionally do not include is validating against a changing key-set across pipeline invocations - that requires more design and will be handled in a separate PR.
Why are the changes needed?
To actually execute AutoCDC SCD1 flow transformation with an SDP pipeline. Before this point if an AutoCDC flow was registered with the graph, the graph analysis engine would throw an unrecognized flow exception.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
With these changes, we can now actually test how an AutoCDC flow interacts with the rest of SDP. This unlocks a number of features/integrations that we should test, such as:
As such I added a new
AutoCdcGraphExecutionTestMixinproviding a v2-row-level-ops-capablecatalog (
SharedTablesInMemoryRowLevelOperationTableCatalog) and the standardfixtures all AutoCDC E2E suites share, plus six new end-to-end suites
(~30 tests) covering:
AutoCdcScd1SinglePipelineSuite— basic upsert/delete reconciliation in a single pipeline, plus theAUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGEfailure path.AutoCdcScd1MultiPipelineSuite— independent targets across pipelines, downstream readers of an AutoCDC target.AutoCdcScd1FullRefreshSuite— full refresh wipes target rows + aux table; sequence comparisons reset; selective refresh isolates state.AutoCdcScd1SchemaEvolutionSuite— broadening/narrowing column selection, nullable column addition, type widening/narrowing, nested struct/array evolution, case-only collisions, etc.AutoCdcScd1AuxiliaryTableDurabilitySuite— keys-first invariant, declared key order preserved, multi-run sequence comparisons, transparent aux-table recreation if dropped.AutoCdcScd1TargetTableDurabilitySuite— pre-loaded target rows, late-added CDC metadata column.Was this patch authored or co-authored using generative AI tooling?
Co-authored.
Generated-by: Claude-Opus-4.7-thinking-xhigh