diff --git a/CHANGES.md b/CHANGES.md index 5499cb066476..2f4d926158f1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,10 @@ ## New Features / Improvements +* (Python) Added foundation for UnboundedSource API: CheckpointMark, UnboundedReader, and UnboundedSource abstract base classes enabling support for continuous data streams in Apache Beam ([#19137](https://github.com/apache/beam/issues/19137)). +* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)). +* (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)). +* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Added support for large pipeline options via a file (Python) ([#37370](https://github.com/apache/beam/issues/37370)). ## Breaking Changes @@ -80,6 +84,7 @@ ## Bugfixes +* Fixed IndexError in `apache_beam.utils.processes` when pip subprocess fails with short command (e.g. `pip install pkg`) (Python) ([#37515](https://github.com/apache/beam/issues/37515)). * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Known Issues diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 67d6cd358a07..a30d7adb11e9 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -66,10 +66,13 @@ __all__ = [ 'BoundedSource', + 'CheckpointMark', 'RangeTracker', 'Read', 'RestrictionProgress', 'RestrictionTracker', + 'UnboundedReader', + 'UnboundedSource', 'WatermarkEstimator', 'Sink', 'Write', @@ -241,6 +244,213 @@ def is_bounded(self): return True +class CheckpointMark(object): + """Represents a checkpoint for an UnboundedReader. + + A CheckpointMark is a position in an unbounded source that allows the reader + to resume reading from where it left off. When a checkpoint is finalized, + the reader acknowledges that all records up to this point have been + successfully processed. + + Implementations must be serializable so they can be persisted and restored + across pipeline restarts. + + Example: + class MyCheckpointMark(CheckpointMark): + def __init__(self, offset): + self.offset = offset + + def finalize(self): + # Acknowledge records up to offset + pass + """ + def finalize(self) -> None: + """Called when all records up to this checkpoint have been processed. + + This method should acknowledge or commit the position, ensuring that + records up to this point won't be re-read. This may involve: + - Acknowledging messages in a message queue + - Committing offsets in a log-based system + - Updating a cursor in a database + + Raises: + Exception: if finalization fails + """ + pass + + +class UnboundedReader(object): + """A reader that reads an unbounded amount of input. + + An UnboundedReader is similar to an iterator but designed for continuous + data streams. The reader repeatedly calls start() and advance() to fetch + new records, and returns False when no data is currently available + (but may become available later). + + Lifecycle: + 1. start() - begins reading, returns True if a record is available + 2. advance() - moves to next record, returns True if available + 3. get_current_timestamp() - gets timestamp of current record + 4. get_checkpoint_mark() - creates a checkpoint for resuming + + Example: + reader = source.reader() + if reader.start(): + process(reader.get_current()) + while reader.advance(): + process(reader.get_current()) + """ + def start(self) -> bool: + """Initializes the reader and reads the first record. + + Returns: + True if a record was read, False if no data is currently available. + + Raises: + Exception: if reading fails + """ + raise NotImplementedError + + def advance(self) -> bool: + """Advances to the next record. + + Returns: + True if a record was read, False if no data is currently available. + + Raises: + Exception: if reading fails + """ + raise NotImplementedError + + def get_current(self) -> Any: + """Returns the current record. + + Should only be called after start() or advance() returns True. + + Returns: + The current record. + """ + raise NotImplementedError + + def get_current_timestamp(self) -> timestamp.Timestamp: + """Returns the timestamp of the current record. + + Should only be called after start() or advance() returns True. + The timestamp represents the event time of the record. + + Returns: + A Timestamp object representing when the record was created. + """ + raise NotImplementedError + + def get_watermark(self) -> timestamp.Timestamp: + """Returns the current watermark. + + The watermark is a lower bound on timestamps of future records. + It signals that no records with earlier timestamps will be produced. + + Returns: + A Timestamp representing the current watermark. + """ + raise NotImplementedError + + def get_checkpoint_mark(self) -> CheckpointMark: + """Returns a checkpoint mark for the current position. + + This checkpoint can be used to resume reading from this position in case + of failures or restarts. The checkpoint represents all records that have + been read up to this point. + + Returns: + A CheckpointMark for the current read position. + """ + raise NotImplementedError + + def close(self) -> None: + """Closes the reader and releases resources. + + Should be called when reading is complete to clean up connections, + file handles, or other resources. + """ + pass + + +class UnboundedSource(SourceBase): + """A source that reads an unbounded amount of input. + + An UnboundedSource represents a continuous stream of data, such as: + - Message queues (Kafka, Pub/Sub, Kinesis) + - Database change streams + - Event logs + - Real-time sensors or feeds + + Unlike BoundedSource which reads a finite dataset, UnboundedSource continues + producing data indefinitely. The source is responsible for: + - Creating readers that fetch data continuously + - Tracking watermarks for event time progress + - Supporting checkpointing for fault tolerance + - Splitting into multiple parallel readers if possible + + Example: + class MyUnboundedSource(UnboundedSource): + def reader(self, checkpoint=None): + return MyUnboundedReader(checkpoint) + + def split(self, desired_num_splits): + # Return sub-sources for parallel reading + return [MyUnboundedSource(shard) for shard in shards] + + Note: This is a foundational API. Integration with Read transform and + Splittable DoFn wrappers will be implemented in future work. + """ + def reader( + self, + checkpoint: Optional[CheckpointMark] = None, + ) -> UnboundedReader: + """Returns a reader for this source. + + Args: + checkpoint: Optional checkpoint to resume reading from. If None, + reading starts from the beginning or the default position. + + Returns: + An UnboundedReader that reads from this source. + + Raises: + Exception: if reader creation fails + """ + raise NotImplementedError + + def split( + self, + desired_num_splits: int, + ) -> list['UnboundedSource']: + """Splits this source into approximately desired_num_splits sub-sources. + + Splitting allows parallel reading of the source by creating multiple + independent sub-sources that each read a portion of the data. + For example, a Kafka source might split by partition, or a Pub/Sub + source might split by subscription. + + Args: + desired_num_splits: The desired number of sub-sources. The actual number + returned may be more or less based on the source's characteristics. + + Returns: + A list of UnboundedSource objects that together cover the same data + as this source. Returning [self] is valid for sources that cannot split. + + Raises: + Exception: if splitting fails + """ + # Default implementation: source cannot be split + return [self] + + def is_bounded(self) -> bool: + """Returns False to indicate this is an unbounded source.""" + return False + + class RangeTracker(object): """A thread safe object used by Dataflow source framework. diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index eb9617cfae34..2357ee4aba0b 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -24,14 +24,15 @@ import mock import apache_beam as beam -from apache_beam.io.concat_source import ConcatSource -from apache_beam.io.concat_source_test import RangeSource from apache_beam.io import iobase from apache_beam.io import range_trackers +from apache_beam.io.concat_source import ConcatSource +from apache_beam.io.concat_source_test import RangeSource from apache_beam.io.iobase import SourceBundle from apache_beam.options.pipeline_options import DebugOptions from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils import timestamp class SDFBoundedSourceRestrictionProviderTest(unittest.TestCase): @@ -220,5 +221,91 @@ def test_sdf_wrap_range_source(self): self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3]) +class UnboundedSourceTest(unittest.TestCase): + """Basic tests for UnboundedSource, UnboundedReader, and CheckpointMark. + + These tests verify the foundational API structure. Full integration with + Read transform and SDF wrappers will be tested in future implementation. + """ + + def test_checkpoint_mark_finalize(self): + """Test that CheckpointMark can be subclassed and finalized.""" + class TestCheckpointMark(iobase.CheckpointMark): + def __init__(self): + self.finalized = False + + def finalize(self): + self.finalized = True + + checkpoint = TestCheckpointMark() + self.assertFalse(checkpoint.finalized) + checkpoint.finalize() + self.assertTrue(checkpoint.finalized) + + def test_unbounded_source_basic_interface(self): + """Test that UnboundedSource can be subclassed with basic methods.""" + class TestUnboundedSource(iobase.UnboundedSource): + def reader(self, checkpoint=None): + return TestUnboundedReader() + + def default_output_coder(self): + return beam.coders.VarIntCoder() + + class TestUnboundedReader(iobase.UnboundedReader): + def __init__(self): + self.index = -1 + self.data = [1, 2, 3] + + def start(self): + self.index = 0 + return True + + def advance(self): + self.index += 1 + return self.index < len(self.data) + + def get_current(self): + return self.data[self.index] + + def get_current_timestamp(self): + return timestamp.Timestamp.of(self.index) + + def get_watermark(self): + return timestamp.Timestamp.of(self.index) + + def get_checkpoint_mark(self): + return iobase.CheckpointMark() + + source = TestUnboundedSource() + self.assertFalse(source.is_bounded()) + + # Test reader basic operations + reader = source.reader() + self.assertTrue(reader.start()) + self.assertEqual(1, reader.get_current()) + self.assertEqual(timestamp.Timestamp.of(0), reader.get_current_timestamp()) + + self.assertTrue(reader.advance()) + self.assertEqual(2, reader.get_current()) + + self.assertTrue(reader.advance()) + self.assertEqual(3, reader.get_current()) + + self.assertFalse(reader.advance()) + + def test_unbounded_source_split_default(self): + """Test that UnboundedSource.split() returns [self] by default.""" + class SimpleUnboundedSource(iobase.UnboundedSource): + def reader(self, checkpoint=None): + pass + + def default_output_coder(self): + return beam.coders.VarIntCoder() + + source = SimpleUnboundedSource() + splits = source.split(desired_num_splits=10) + self.assertEqual([source], splits) + + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index f6daecea2125..f34b4a48085f 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -52,15 +52,12 @@ def call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): - raise RuntimeError( \ - "Full traceback: {}\n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error. output)) from error - else: - raise RuntimeError("Full trace: {}\ - \n Output of the failed child process: {} " \ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, + traceback.format_exc())) from error return out def check_call(*args, **kwargs): @@ -71,15 +68,12 @@ def check_call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): - raise RuntimeError( \ - "Full traceback: {} \n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) from error - else: - raise RuntimeError("Full trace: {} \ - \n Output of the failed child process: {}" \ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, + traceback.format_exc())) from error return out def check_output(*args, **kwargs): @@ -90,15 +84,12 @@ def check_output(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): - raise RuntimeError( \ - "Full traceback: {} \n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) from error - else: - raise RuntimeError("Full trace: {}, \ - output of the failed child process {} "\ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, + traceback.format_exc())) from error return out def Popen(*args, **kwargs): diff --git a/sdks/python/apache_beam/utils/processes_test.py b/sdks/python/apache_beam/utils/processes_test.py index 13425550dbbe..6330631afdeb 100644 --- a/sdks/python/apache_beam/utils/processes_test.py +++ b/sdks/python/apache_beam/utils/processes_test.py @@ -121,15 +121,25 @@ def test_check_call_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.check_call(cmd) + processes.check_call(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.check_call") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) + + def test_check_call_pip_short_command_no_index_error(self): + """Short pip command (e.g. pip install pkg) must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version that satisfies" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.check_call(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) class TestErrorHandlingCheckOutput(unittest.TestCase): @@ -162,15 +172,25 @@ def test_check_output_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.check_output(cmd) + processes.check_output(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.check_output") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) + + def test_check_output_pip_short_command_no_index_error(self): + """Short pip command must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.check_output(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) class TestErrorHandlingCall(unittest.TestCase): @@ -193,7 +213,7 @@ def test_oserror_check_output_message(self): self.assertIn('Executable {} not found'.format(str(cmd)),\ error.args[0]) - def test_check_output_pip_install_non_existing_package(self): + def test_call_pip_install_non_existing_package(self): returncode = 1 package = "non-exsisting-package" cmd = ['python', '-m', 'pip', 'download', '--dest', '/var',\ @@ -203,15 +223,25 @@ def test_check_output_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.call(cmd) + processes.call(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.call") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) + + def test_call_pip_short_command_no_index_error(self): + """Short pip command must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.call(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) if __name__ == '__main__':