[FLINK-38844][pipeline-connector][postgres]Add metadata column support#4202
[FLINK-38844][pipeline-connector][postgres]Add metadata column support#4202tchivs wants to merge 2 commits intoapache:masterfrom
Conversation
This commit adds metadata column support for the PostgreSQL Pipeline Connector, enabling users to access metadata information in their data pipelines. Changes: - Add OpTsMetadataColumn for operation timestamp - Add DatabaseNameMetadataColumn for database name - Add SchemaNameMetadataColumn for schema name - Add TableNameMetadataColumn for table name - Update PostgresDataSource to support metadata columns - Add comprehensive E2E test testAllMetadataColumns() - Update documentation (English and Chinese)
yuxiqian
left a comment
There was a problem hiding this comment.
Thanks for @tchivs' contribution.
I wonder if we need individual metadata columns for database, schema, and table, since they're always available in Transform expressions (only after FLINK-38840 got closed).
Thanks for the review @yuxiqian! You raise an important point about the overlap with Transform metadata fields. You're right that namespace_name, schema_name, and table_name are already available in Transform expressions. Let me clarify the design rationale:
I see two perspectives here: Argument for keeping them:
Argument for removing them:
My suggestion:
What's your preference? I'm happy to adjust the PR based on the team's direction. |
|
I think it's OK to polish documentations in this PR, leaving metadata definitions as it is. |
…relationship with Transform expressions
Thanks @yuxiqian for the feedback! I've polished the documentation to clarify the relationship between metadata columns and Transform expressions. Changes made:
The metadata definitions remain unchanged as you suggested. |
There was a problem hiding this comment.
Pull request overview
Adds metadata column support to the PostgreSQL Pipeline Connector so pipelines can expose source metadata (op timestamp, database/schema/table name) to transforms and downstream sinks.
Changes:
- Introduces
SupportedMetadataColumnimplementations forop_ts,database_name,schema_name, andtable_name. - Extends
PostgresDataSourcewithsupportedMetadataColumns()to advertise these columns to the pipeline runtime. - Adds/updates tests and documentation (EN + ZH) covering metadata configuration and behavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java | Adds E2E coverage verifying metadata presence/values in snapshot and incremental phases. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java | Adds a unit test asserting the set of supported metadata columns exposed by the data source. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java | Advertises supported metadata columns to the pipeline runtime. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java | Implements op_ts metadata column reader/type definition. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java | Implements database_name metadata column reader/type definition. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java | Implements schema_name metadata column reader/type definition. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java | Implements table_name metadata column reader/type definition. |
| docs/content/docs/connectors/pipeline-connectors/postgres.md | Documents metadata.list updates and adds a “Supported Metadata Columns” section. |
| docs/content.zh/docs/connectors/pipeline-connectors/postgres.md | Same as EN docs, localized. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assertThat(metadataColumns[0]).isInstanceOf(OpTsMetadataColumn.class); | ||
| assertThat(metadataColumns[0].getName()).isEqualTo("op_ts"); | ||
| assertThat(metadataColumns[1]).isInstanceOf(TableNameMetadataColumn.class); | ||
| assertThat(metadataColumns[1].getName()).isEqualTo("table_name"); | ||
| assertThat(metadataColumns[2]).isInstanceOf(DatabaseNameMetadataColumn.class); | ||
| assertThat(metadataColumns[2].getName()).isEqualTo("database_name"); | ||
| assertThat(metadataColumns[3]).isInstanceOf(SchemaNameMetadataColumn.class); | ||
| assertThat(metadataColumns[3].getName()).isEqualTo("schema_name"); | ||
|
|
There was a problem hiding this comment.
supportedMetadataColumns() returns an array but its element ordering is not part of the DataSource contract; asserting on fixed indices makes this test brittle to harmless reordering. Consider asserting by (name -> class) mapping or using assertions that are order-insensitive (e.g., extract getName() and verify it contains exactly the expected set, and separately verify each expected name maps to the expected implementation).
| assertThat(metadataColumns[0]).isInstanceOf(OpTsMetadataColumn.class); | |
| assertThat(metadataColumns[0].getName()).isEqualTo("op_ts"); | |
| assertThat(metadataColumns[1]).isInstanceOf(TableNameMetadataColumn.class); | |
| assertThat(metadataColumns[1].getName()).isEqualTo("table_name"); | |
| assertThat(metadataColumns[2]).isInstanceOf(DatabaseNameMetadataColumn.class); | |
| assertThat(metadataColumns[2].getName()).isEqualTo("database_name"); | |
| assertThat(metadataColumns[3]).isInstanceOf(SchemaNameMetadataColumn.class); | |
| assertThat(metadataColumns[3].getName()).isEqualTo("schema_name"); | |
| // Verify that the set of metadata column names is exactly as expected, ignoring order. | |
| List<String> metadataNames = | |
| Arrays.stream(metadataColumns) | |
| .map(SupportedMetadataColumn::getName) | |
| .collect(Collectors.toList()); | |
| assertThat(metadataNames) | |
| .containsExactlyInAnyOrder( | |
| "op_ts", "table_name", "database_name", "schema_name"); | |
| // Verify that each metadata column name maps to the expected implementation class. | |
| Map<String, Class<?>> metadataTypesByName = | |
| Arrays.stream(metadataColumns) | |
| .collect( | |
| Collectors.toMap( | |
| SupportedMetadataColumn::getName, | |
| SupportedMetadataColumn::getClass)); | |
| assertThat(metadataTypesByName.get("op_ts")) | |
| .isEqualTo(OpTsMetadataColumn.class); | |
| assertThat(metadataTypesByName.get("table_name")) | |
| .isEqualTo(TableNameMetadataColumn.class); | |
| assertThat(metadataTypesByName.get("database_name")) | |
| .isEqualTo(DatabaseNameMetadataColumn.class); | |
| assertThat(metadataTypesByName.get("schema_name")) | |
| .isEqualTo(SchemaNameMetadataColumn.class); |
What is the purpose of the pull request
This PR adds metadata column support for the PostgreSQL Pipeline Connector, enabling users to access metadata information such as operation timestamp, database name, schema name, and table name in their data pipelines.
Brief change log
OpTsMetadataColumn: Operation timestamp metadataDatabaseNameMetadataColumn: Database name metadataSchemaNameMetadataColumn: Schema name metadataTableNameMetadataColumn: Table name metadataPostgresDataSourceto support metadata columns viasupportedMetadataColumns()methodtestAllMetadataColumns()inPostgresFullTypesITCaseVerifying this change
This change added tests and can be verified as follows:
testAllMetadataColumns()E2E test inPostgresFullTypesITCaseDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation