[SPARK-56956][SPARK-56651][CONNECT][SDP][FOLLOWUP] Address review comments for AutoCDC flow dataclasses and Python APIs#56113
Conversation
… dataclasses ### What changes were proposed in this pull request? Follow-up cleanup of review feedback on apache#56042: - Remove the now-dead `Scd1BatchProcessor.validateCdcMetadataColumnNotPresent` validator and its call site. It referenced the error class `AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT` which the parent PR removed from `error-conditions.json`; the new construction-time check in `AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns` is the authoritative validator and supersedes it. - Lift `comment: Option[String]` to the `UnresolvedFlow` trait so both `UntypedFlow` and `AutoCdcFlow` carry it symmetrically; previously only `AutoCdcFlow` had it. - Reorder `AutoCdcFlow`'s constructor so defaulted params (`sqlConf`, `comment`) trail the non-defaulted ones (`origin`, `changeArgs`), allowing positional construction. - Fix Scaladoc/comment text: factual wording for the keys-presence check, the `[[ResolvedFlow.load]]` link, the `Scd1ForeachBatchHandler` reference (was `Scd1ForeachBatchExec`, which does not exist), and several minor grammar/typography nits. ### Why are the changes needed? Cleanup of follow-up items identified during review of the parent PR. The dead validator is the most material: if its code path were reached, it would throw an internal `SparkException("Cannot find main error class ...")` instead of a user-facing `AnalysisException`. The path is unreachable through current call patterns (the construction-time check rejects bad schemas before this code runs), but the references should not be left behind. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests in `AutoCdcFlowSuite`, `Scd1BatchProcessorSuite`, `ConnectInvalidPipelineSuite`, and `ConnectValidPipelineSuite` continue to cover the affected paths. No new tests added; the change is structural cleanup and doc edits. ### Was this patch authored or co-authored using generative AI tooling? Co-authored by Claude.
…oCDC Python APIs ### What changes were proposed in this pull request? Follow-up cleanup of review feedback on apache#56069: - Enforce client-side that `column_list` and `except_column_list` are not both set (raises `PySparkValueError` with `INVALID_MULTIPLE_ARGUMENT_CONDITIONS`). The docstring already promised this, but the API previously accepted both. - Enforce client-side that `keys` is non-empty (raises `PySparkValueError` with `CANNOT_BE_EMPTY`). The docstring promised "at least one key must be provided"; the implementation accepted `keys=[]`. - Drop `Optional` from `AutoCdcFlow.name` since the API always defaults the field to the target name before construction. - Add a comment on the lazy `pyspark.sql.connect.functions.builtin` imports explaining the docs-build constraint (transitive grpc dependency missing from the docs environment), so a future refactor doesn't hoist the imports and silently break docs CI. - Minor Scaladoc/docstring fixes: "merge" operation (was "merged"), consistency between `api.py` and `flow.py` on the `1`/`"1"` accepted set, missing commas, "excluded from" (was "excluded in"), `DataFrame` casing, and a backtick on `create_streaming_table`. Add two tests covering the new client-side validations. ### Why are the changes needed? The two enforcement gaps are real contract/code divergences in the public Python API — users following the docstring would expect the validation to fire client-side, but it would silently slip through to the server (with a less actionable error message). The `Optional[str]` field type was also misleading since the API guarantees the field is always populated. ### Does this PR introduce _any_ user-facing change? No new behavior beyond surfacing client-side validation errors earlier. The errors raised here would otherwise have been raised by the server (or, in the case of empty keys, possibly accepted with an empty repeated proto field). ### How was this patch tested? Added `test_create_auto_cdc_flow_rejects_empty_keys` and `test_create_auto_cdc_flow_rejects_both_column_lists`. Existing tests continue to pass. ### Was this patch authored or co-authored using generative AI tooling? Co-authored by Claude.
200660a to
80b4b5e
Compare
…ONS template placeholder
### What changes were proposed in this pull request?
The error-conditions.json template for `INVALID_MULTIPLE_ARGUMENT_CONDITIONS`
used `[{arg_names}]` (curly braces) instead of `[<arg_names>]` (angle
brackets). `ErrorClassesReader.get_error_message` extracts required
placeholders via `re.findall("<([a-zA-Z0-9_-]+)>", template)` — which only
matches angle-bracket placeholders — and asserts the extracted set equals
`messageParameters.keys()`. Callers supplying both `arg_names` and
`condition` would trip an `AssertionError` instead of getting their
intended `PySparkValueError`.
This typo affected existing code in `sql/session.py:2266` and
`sql/connect/session.py:1056`, but those paths are not exercised by tests.
The new test added in the previous commit
(`test_create_auto_cdc_flow_rejects_both_column_lists`) does exercise it,
so this fix is required for CI to pass.
### Why are the changes needed?
Without this fix, any code raising `PySparkValueError` with errorClass
`INVALID_MULTIPLE_ARGUMENT_CONDITIONS` and the documented two-parameter
shape will raise `AssertionError` at the formatter rather than the intended
exception.
### Does this PR introduce _any_ user-facing change?
No. The rendered error message is the same:
`"['pyfile', 'archive' and/or 'file'] cannot be True together."` — Python's
`str.format()` substitutes `{arg_names}` from the message-parameters dict
either way. Only the validator's pre-format assertion was failing.
### How was this patch tested?
Verified locally by calling `ErrorClassesReader.get_error_message` directly
with both my new call site's parameters and the existing `addArtifacts`
call site's parameters — both now succeed.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored by Claude.
AnishMahto
left a comment
There was a problem hiding this comment.
Thanks for the cleanup! Just a couple small comments.
|
|
||
| private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { | ||
| val microbatchSqlConf = microbatch.sparkSession.sessionState.conf | ||
| val resolver = microbatchSqlConf.resolver | ||
|
|
||
| microbatch.schema.fieldNames | ||
| .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) | ||
| .foreach { conflictingColumnName => | ||
| throw new AnalysisException( | ||
| errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", | ||
| messageParameters = Map( | ||
| "caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis), | ||
| "columnName" -> conflictingColumnName, | ||
| "schemaName" -> "microbatch", | ||
| "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName | ||
| ) | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
Thanks, this was supposed to be removed in the other PR but I think it got accidentally added back as I rebased the stack!
| :param source_code_location: The location of the source code that created this flow. | ||
| """ | ||
|
|
||
| name: Optional[str] | ||
| name: str |
There was a problem hiding this comment.
This should still be optional.
| if column_list is not None and except_column_list is not None: | ||
| raise PySparkValueError( | ||
| errorClass="INVALID_MULTIPLE_ARGUMENT_CONDITIONS", | ||
| messageParameters={ | ||
| "arg_names": "column_list, except_column_list", | ||
| "condition": "specified together", | ||
| }, | ||
| ) | ||
|
|
There was a problem hiding this comment.
I don't feel super strongly but I'm still of the opinion this validation should be done server side on flow registration, not client side, for consistency across all language interfaces (ex. SQL).
Empty keys are already validated against on the server side, and I will be adding invalidation against specifying both column_list and except_column_list in the next PR that registers flows from the RPCs that are received.
There was a problem hiding this comment.
i think i also asked in the review, im ok either way
| /** Optional user-supplied comment attached to this flow at definition time. */ | ||
| def comment: Option[String] | ||
|
|
There was a problem hiding this comment.
Actually, can we go the opposite direction and drop comment from AutoCdcFlow.
I remember that we intentionally dropped comment from all flows a while ago, and I accidentally added it to AutoCdcFlow when I first introduced the flow object. It shouldn't have it, and neither should any other UnresolvedFlow.
- Drop `comment` field from `UnresolvedFlow`/`UntypedFlow`/`AutoCdcFlow`; it was previously removed from all flows and accidentally re-added here. - Restore `AutoCdcFlow.name` to `Optional[str]` in the Python dataclass. - Drop client-side validations of empty `keys` and mutually-exclusive `column_list`/`except_column_list` from `create_auto_cdc_flow`; these are/will be enforced server-side for consistency with the SQL interface.
af49e07 to
5721173
Compare
This follow-up PR addresses review comments left after #56042 (SPARK-56956, AutoCDC flow dataclasses) and #56069 (SPARK-56651, AutoCDC Python APIs) merged.
What changes were proposed in this pull request?
Scala —
Scd1BatchProcessor/FlowScd1BatchProcessor.validateCdcMetadataColumnNotPresentvalidator and its call site. It referenced the error classAUTOCDC_RESERVED_COLUMN_NAME_CONFLICTwhich the parent PR removed fromerror-conditions.json; the new construction-time check inAutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumnsis the authoritative validator and supersedes it.AutoCdcFlow's constructor so defaulted params trail the non-defaulted ones (origin,changeArgs), allowing positional construction.[[ResolvedFlow.load]]link, theScd1ForeachBatchHandlerreference (wasScd1ForeachBatchExec, which does not exist), and several minor grammar/typography nits.Python —
create_auto_cdc_flow/AutoCdcFlowpyspark.sql.connect.functions.builtinimports explaining the docs-build constraint (transitive grpc dependency missing from the docs environment), so a future refactor doesn't hoist the imports and silently break docs CI.INVALID_MULTIPLE_ARGUMENT_CONDITIONSerror template placeholder:[{arg_names}]→[<arg_names>].ErrorClassesReader.get_error_messageextracts required placeholders viare.findall("<([a-zA-Z0-9_-]+)>", template)and asserts the extracted set equalsmessageParameters.keys(), so the curly-brace form would trip anAssertionErrorinstead of producing the intendedPySparkValueError. The typo also affected existing callers insql/session.py:2267andsql/connect/session.py:1056, but those paths are not exercised by tests.api.pyandflow.pyon the1/"1"accepted set, missing commas, "excluded from" (was "excluded in"),DataFramecasing, and a backtick oncreate_streaming_table.Why are the changes needed?
Cleanup of follow-up items identified during review of the parent PRs. The dead Scala validator is the most material: if its code path were reached, it would throw an internal
SparkException("Cannot find main error class ...")instead of a user-facingAnalysisException. The error-template typo would surface as anAssertionErrorrather than a user-actionable error for the two existing callers.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests in
AutoCdcFlowSuite,Scd1BatchProcessorSuite,ConnectInvalidPipelineSuite, andConnectValidPipelineSuitecontinue to cover the affected paths.Was this patch authored or co-authored using generative AI tooling?
Co-authored by Claude.