Skip to content

[FLINK-38918][PyFlink] Support per cluster offset for DynamicKafkaSource in pyflink#28458

Open
KenanAdel wants to merge 7 commits into
apache:masterfrom
KenanAdel:FLINK-38918
Open

[FLINK-38918][PyFlink] Support per cluster offset for DynamicKafkaSource in pyflink#28458
KenanAdel wants to merge 7 commits into
apache:masterfrom
KenanAdel:FLINK-38918

Conversation

@KenanAdel

Copy link
Copy Markdown

What is the purpose of the change

This PR adds support for per-cluster starting and stopping offset initializers in PyFlink's DynamicKafkaSource, aligning the Python API with the underlying Java implementation introduced in FLINK-38876.

Brief change log

  • Updated ClusterMetadata and SingleClusterTopicMetadataService constructors in dynamic_kafka.py to accept optional starting_offsets_initializer and stopping_offsets_initializer.
  • Forwarded the PyFlink offset initializers to the corresponding Java objects via the Py4J gateway using _j_initializer.
  • Ensured proper propagation of offset configuration from Python layer to the underlying Java metadata service.
  • Added comprehensive unit tests in test_dynamic_kafka.py to verify offset forwarding and ensure backward compatibility when offsets are not provided.

Verifying this change

This change is covered by the following newly added unit tests in test_dynamic_kafka.py:

  • test_single_cluster_metadata_service_with_offsets
  • test_single_cluster_metadata_service_default_offsets
  • test_cluster_metadata_with_offsets

@flinkbot

flinkbot commented Jun 16, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@KenanAdel KenanAdel marked this pull request as draft June 16, 2026 13:57
@KenanAdel KenanAdel marked this pull request as ready for review June 16, 2026 14:58
@KenanAdel KenanAdel marked this pull request as draft June 16, 2026 17:02
@KenanAdel KenanAdel marked this pull request as ready for review June 16, 2026 17:28
@KenanAdel KenanAdel force-pushed the FLINK-38918 branch 3 times, most recently from 3773a30 to f90bb1f Compare June 17, 2026 05:59
@KenanAdel

Copy link
Copy Markdown
Author

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants