Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@

## New Features / Improvements

* (Python) Added foundation for UnboundedSource API: CheckpointMark, UnboundedReader, and UnboundedSource abstract base classes enabling support for continuous data streams in Apache Beam ([#19137](https://github.com/apache/beam/issues/19137)).
* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)).
* (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added support for large pipeline options via a file (Python) ([#37370](https://github.com/apache/beam/issues/37370)).

## Breaking Changes
Expand All @@ -80,6 +84,7 @@

## Bugfixes

* Fixed IndexError in `apache_beam.utils.processes` when pip subprocess fails with short command (e.g. `pip install pkg`) (Python) ([#37515](https://github.com/apache/beam/issues/37515)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues
Expand Down
210 changes: 210 additions & 0 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@

__all__ = [
'BoundedSource',
'CheckpointMark',
'RangeTracker',
'Read',
'RestrictionProgress',
'RestrictionTracker',
'UnboundedReader',
'UnboundedSource',
'WatermarkEstimator',
'Sink',
'Write',
Expand Down Expand Up @@ -241,6 +244,213 @@ def is_bounded(self):
return True


class CheckpointMark(object):
"""Represents a checkpoint for an UnboundedReader.

A CheckpointMark is a position in an unbounded source that allows the reader
to resume reading from where it left off. When a checkpoint is finalized,
the reader acknowledges that all records up to this point have been
successfully processed.

Implementations must be serializable so they can be persisted and restored
across pipeline restarts.

Example:
class MyCheckpointMark(CheckpointMark):
def __init__(self, offset):
self.offset = offset

def finalize(self):
# Acknowledge records up to offset
pass
"""
def finalize(self) -> None:
"""Called when all records up to this checkpoint have been processed.

This method should acknowledge or commit the position, ensuring that
records up to this point won't be re-read. This may involve:
- Acknowledging messages in a message queue
- Committing offsets in a log-based system
- Updating a cursor in a database

Raises:
Exception: if finalization fails
"""
pass


class UnboundedReader(object):
"""A reader that reads an unbounded amount of input.

An UnboundedReader is similar to an iterator but designed for continuous
data streams. The reader repeatedly calls start() and advance() to fetch
new records, and returns False when no data is currently available
(but may become available later).

Lifecycle:
1. start() - begins reading, returns True if a record is available
2. advance() - moves to next record, returns True if available
3. get_current_timestamp() - gets timestamp of current record
4. get_checkpoint_mark() - creates a checkpoint for resuming

Example:
reader = source.reader()
if reader.start():
process(reader.get_current())
while reader.advance():
process(reader.get_current())
"""
def start(self) -> bool:
"""Initializes the reader and reads the first record.

Returns:
True if a record was read, False if no data is currently available.

Raises:
Exception: if reading fails
"""
raise NotImplementedError

def advance(self) -> bool:
"""Advances to the next record.

Returns:
True if a record was read, False if no data is currently available.

Raises:
Exception: if reading fails
"""
raise NotImplementedError

def get_current(self) -> Any:
"""Returns the current record.

Should only be called after start() or advance() returns True.

Returns:
The current record.
"""
raise NotImplementedError

def get_current_timestamp(self) -> timestamp.Timestamp:
"""Returns the timestamp of the current record.

Should only be called after start() or advance() returns True.
The timestamp represents the event time of the record.

Returns:
A Timestamp object representing when the record was created.
"""
raise NotImplementedError

def get_watermark(self) -> timestamp.Timestamp:
"""Returns the current watermark.

The watermark is a lower bound on timestamps of future records.
It signals that no records with earlier timestamps will be produced.

Returns:
A Timestamp representing the current watermark.
"""
raise NotImplementedError

def get_checkpoint_mark(self) -> CheckpointMark:
"""Returns a checkpoint mark for the current position.

This checkpoint can be used to resume reading from this position in case
of failures or restarts. The checkpoint represents all records that have
been read up to this point.

Returns:
A CheckpointMark for the current read position.
"""
raise NotImplementedError

def close(self) -> None:
"""Closes the reader and releases resources.

Should be called when reading is complete to clean up connections,
file handles, or other resources.
"""
pass


class UnboundedSource(SourceBase):
"""A source that reads an unbounded amount of input.

An UnboundedSource represents a continuous stream of data, such as:
- Message queues (Kafka, Pub/Sub, Kinesis)
- Database change streams
- Event logs
- Real-time sensors or feeds

Unlike BoundedSource which reads a finite dataset, UnboundedSource continues
producing data indefinitely. The source is responsible for:
- Creating readers that fetch data continuously
- Tracking watermarks for event time progress
- Supporting checkpointing for fault tolerance
- Splitting into multiple parallel readers if possible

Example:
class MyUnboundedSource(UnboundedSource):
def reader(self, checkpoint=None):
return MyUnboundedReader(checkpoint)

def split(self, desired_num_splits):
# Return sub-sources for parallel reading
return [MyUnboundedSource(shard) for shard in shards]

Note: This is a foundational API. Integration with Read transform and
Splittable DoFn wrappers will be implemented in future work.
"""
def reader(
self,
checkpoint: Optional[CheckpointMark] = None,
) -> UnboundedReader:
"""Returns a reader for this source.

Args:
checkpoint: Optional checkpoint to resume reading from. If None,
reading starts from the beginning or the default position.

Returns:
An UnboundedReader that reads from this source.

Raises:
Exception: if reader creation fails
"""
raise NotImplementedError

def split(
self,
desired_num_splits: int,
) -> list['UnboundedSource']:
"""Splits this source into approximately desired_num_splits sub-sources.

Splitting allows parallel reading of the source by creating multiple
independent sub-sources that each read a portion of the data.
For example, a Kafka source might split by partition, or a Pub/Sub
source might split by subscription.

Args:
desired_num_splits: The desired number of sub-sources. The actual number
returned may be more or less based on the source's characteristics.

Returns:
A list of UnboundedSource objects that together cover the same data
as this source. Returning [self] is valid for sources that cannot split.

Raises:
Exception: if splitting fails
"""
# Default implementation: source cannot be split
return [self]

def is_bounded(self) -> bool:
"""Returns False to indicate this is an unbounded source."""
return False


class RangeTracker(object):
"""A thread safe object used by Dataflow source framework.

Expand Down
91 changes: 89 additions & 2 deletions sdks/python/apache_beam/io/iobase_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import mock

import apache_beam as beam
from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.concat_source_test import RangeSource
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.concat_source_test import RangeSource
from apache_beam.io.iobase import SourceBundle
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.utils import timestamp


class SDFBoundedSourceRestrictionProviderTest(unittest.TestCase):
Expand Down Expand Up @@ -220,5 +221,91 @@ def test_sdf_wrap_range_source(self):
self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3])


class UnboundedSourceTest(unittest.TestCase):
"""Basic tests for UnboundedSource, UnboundedReader, and CheckpointMark.

These tests verify the foundational API structure. Full integration with
Read transform and SDF wrappers will be tested in future implementation.
"""

def test_checkpoint_mark_finalize(self):
"""Test that CheckpointMark can be subclassed and finalized."""
class TestCheckpointMark(iobase.CheckpointMark):
def __init__(self):
self.finalized = False

def finalize(self):
self.finalized = True

checkpoint = TestCheckpointMark()
self.assertFalse(checkpoint.finalized)
checkpoint.finalize()
self.assertTrue(checkpoint.finalized)

def test_unbounded_source_basic_interface(self):
"""Test that UnboundedSource can be subclassed with basic methods."""
class TestUnboundedSource(iobase.UnboundedSource):
def reader(self, checkpoint=None):
return TestUnboundedReader()

def default_output_coder(self):
return beam.coders.VarIntCoder()

class TestUnboundedReader(iobase.UnboundedReader):
def __init__(self):
self.index = -1
self.data = [1, 2, 3]

def start(self):
self.index = 0
return True

def advance(self):
self.index += 1
return self.index < len(self.data)

def get_current(self):
return self.data[self.index]

def get_current_timestamp(self):
return timestamp.Timestamp.of(self.index)

def get_watermark(self):
return timestamp.Timestamp.of(self.index)

def get_checkpoint_mark(self):
return iobase.CheckpointMark()

source = TestUnboundedSource()
self.assertFalse(source.is_bounded())

# Test reader basic operations
reader = source.reader()
self.assertTrue(reader.start())
self.assertEqual(1, reader.get_current())
self.assertEqual(timestamp.Timestamp.of(0), reader.get_current_timestamp())

self.assertTrue(reader.advance())
self.assertEqual(2, reader.get_current())

self.assertTrue(reader.advance())
self.assertEqual(3, reader.get_current())

self.assertFalse(reader.advance())

def test_unbounded_source_split_default(self):
"""Test that UnboundedSource.split() returns [self] by default."""
class SimpleUnboundedSource(iobase.UnboundedSource):
def reader(self, checkpoint=None):
pass

def default_output_coder(self):
return beam.coders.VarIntCoder()

source = SimpleUnboundedSource()
splits = source.split(desired_num_splits=10)
self.assertEqual([source], splits)


if __name__ == '__main__':
unittest.main()
Loading
Loading