Skip to content

Add OpenLineage extension for Kafka Connect#22050

Draft
rahul-madaan wants to merge 1 commit intoapache:trunkfrom
rahul-madaan:kafka-connect-openlineage-extension
Draft

Add OpenLineage extension for Kafka Connect#22050
rahul-madaan wants to merge 1 commit intoapache:trunkfrom
rahul-madaan:kafka-connect-openlineage-extension

Conversation

@rahul-madaan
Copy link
Copy Markdown

Add a new connect/openlineage-extension module that emits OpenLineage lineage events from Kafka Connect connectors. This enables automatic data lineage tracking for Connect pipelines, following the same pattern used by the OpenLineage integrations for Apache Spark and Apache Flink.

The extension implements ConnectRestExtension (KIP-285) and runs inside the Connect worker JVM. A background monitor polls ConnectClusterState to detect connector lifecycle changes (start, pause, fail, delete) and emits OpenLineage RunEvents with input/output dataset information.

Supported connectors:

  • JDBC Source/Sink (PostgreSQL, MySQL, SQL Server, Oracle, DB2, Redshift)
  • Debezium CDC (all variants)
  • S3, GCS, Azure Blob, HDFS Sink
  • MongoDB Source/Sink
  • Elasticsearch Sink
  • BigQuery, Snowflake, Redshift Sink
  • MirrorMaker 2
  • HTTP Sink
  • Generic fallback for unknown connectors

Events follow the OpenLineage spec (https://openlineage.io/docs/spec/naming/) and include:

  • processing_engine run facet (name: kafka-connect)
  • jobType job facet (processingType: STREAMING, integration: KAFKA_CONNECT)
  • errorMessage run facet on FAIL events
  • Input/output datasets on all event types (START, COMPLETE, FAIL)

Transport options: HTTP (with Bearer auth), File (NDJSON), Console (SLF4J). Configuration via OPENLINEAGE_CONFIG env var or openlineage.* worker properties.

@github-actions github-actions bot added triage PRs from the community connect build Gradle build or GitHub Actions labels Apr 14, 2026
@rahul-madaan rahul-madaan marked this pull request as draft April 14, 2026 10:08
@rahul-madaan rahul-madaan marked this pull request as ready for review April 14, 2026 10:50
@mimaison
Copy link
Copy Markdown
Member

Thanks for the PR. This introduces new public APIs so this change requires a Kafka Improvement Proposal (KIP). See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals for the process

@mimaison mimaison added the kip Requires or implements a KIP label Apr 14, 2026
@rahul-madaan rahul-madaan force-pushed the kafka-connect-openlineage-extension branch from 23498f1 to 802f4a0 Compare April 14, 2026 17:55
@rahul-madaan rahul-madaan marked this pull request as draft April 14, 2026 20:20
Add a new connect/openlineage-extension module that emits OpenLineage
lineage events from Kafka Connect connectors. This enables automatic
data lineage tracking for Connect pipelines, following the same pattern
used by the OpenLineage integrations for Apache Spark and Apache Flink.

The extension implements ConnectRestExtension (KIP-285) and runs inside
the Connect worker JVM. A background monitor polls ConnectClusterState
to detect connector lifecycle changes (start, pause, fail, delete) and
emits OpenLineage RunEvents with input/output dataset information.

Event lifecycle (matches Flink streaming pattern):
  START   — when a connector is first observed RUNNING
  RUNNING — periodic heartbeat (default every 5 minutes, configurable)
            confirming the connector is still active with current lineage
  COMPLETE — when a connector is paused or deleted
  FAIL    — when a connector enters FAILED state

Supported connectors (17 visitors):
- JDBC Source/Sink (PostgreSQL, MySQL, SQL Server, Oracle, DB2, Redshift)
- Debezium CDC (all variants)
- S3, GCS, Azure Blob, HDFS Sink
- MongoDB Source/Sink
- Elasticsearch Sink
- BigQuery, Snowflake, Redshift Sink
- MirrorMaker 2, HTTP Sink
- Generic fallback for unknown connectors

Events follow the OpenLineage spec (https://openlineage.io/docs/spec/naming/)
and include:
- processing_engine run facet (name, version, adapter version)
- jobType job facet (processingType: STREAMING, integration: KAFKA_CONNECT)
- errorMessage run facet on FAIL events
- Input/output datasets on all event types (START, RUNNING, COMPLETE, FAIL)
- Lineage cached per-connector for reliable COMPLETE events on deletion
- Lineage refreshed on RUNNING events to detect config changes

Dataset namespaces follow OL naming conventions:
  postgres://, mysql://, kafka://, s3://, gs://, wasbs://,
  hdfs://, mongodb://, elasticsearch://, bigquery, snowflake://,
  redshift://, cassandra://

Transport options: HTTP (with Bearer auth), File (NDJSON), Console (SLF4J).
Configuration via OPENLINEAGE_CONFIG env var or openlineage.* worker properties.

Signed-off-by: Rahul Madan <madan.rahul9@gmail.com>
@rahul-madaan rahul-madaan force-pushed the kafka-connect-openlineage-extension branch from 802f4a0 to b02a7d5 Compare April 14, 2026 20:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions connect kip Requires or implements a KIP triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants