From 01f50fc69a2c7a2641b833bdc05bc376fbdae1a4 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 1 Apr 2026 19:52:04 +0400 Subject: [PATCH 1/6] fix: stop video drain when real subscriber arrives Enable drain by default and clean up blackhole when add_track_subscriber is called, preventing stale drains from competing with actual consumers. --- getstream/video/rtc/pc.py | 8 ++++- tests/rtc/test_subscriber_drain.py | 50 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 tests/rtc/test_subscriber_drain.py diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index a07d5840..cccdf62e 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -131,7 +131,7 @@ def __init__( self, connection, configuration: aiortc.RTCConfiguration, - drain_video_frames: bool = False, + drain_video_frames: bool = True, ) -> None: logger.info( f"creating subscriber peer connection with configuration: {configuration}" @@ -208,6 +208,12 @@ def add_track_subscriber( """Add a new subscriber to an existing track's MediaRelay.""" track_data = self.track_map.get(track_id) + self._video_drain_tasks.pop(track_id, None) + blackhole = self._video_blackholes.pop(track_id, None) + + if blackhole: + asyncio.ensure_future(blackhole.stop()) + if track_data: relay, original_track = track_data return relay.subscribe(original_track, buffered=False) diff --git a/tests/rtc/test_subscriber_drain.py b/tests/rtc/test_subscriber_drain.py new file mode 100644 index 00000000..ab98798c --- /dev/null +++ b/tests/rtc/test_subscriber_drain.py @@ -0,0 +1,50 @@ +"""Tests for SubscriberPeerConnection video drain behavior.""" + +from unittest.mock import AsyncMock, Mock + +import pytest +from aiortc.contrib.media import MediaRelay + +from getstream.video.rtc.pc import SubscriberPeerConnection + + +@pytest.fixture +def subscriber_pc(): + """Create a SubscriberPeerConnection bypassing heavy parent inits.""" + pc = SubscriberPeerConnection.__new__(SubscriberPeerConnection) + pc.connection = Mock() + pc._drain_video_frames = True + pc.track_map = {} + pc.video_frame_trackers = {} + pc._video_blackholes = {} + pc._video_drain_tasks = {} + pc._listeners = {} + return pc + + +class TestAddTrackSubscriberStopsDrain: + def test_blackhole_stopped_when_subscriber_added(self, subscriber_pc): + track_id = "user123:video:0" + relay = MediaRelay() + original_track = Mock() + subscriber_pc.track_map[track_id] = (relay, original_track) + + blackhole = Mock() + blackhole.stop = AsyncMock() + subscriber_pc._video_blackholes[track_id] = blackhole + subscriber_pc._video_drain_tasks[track_id] = Mock() + + subscriber_pc.add_track_subscriber(track_id) + + blackhole.stop.assert_called_once() + assert track_id not in subscriber_pc._video_blackholes + assert track_id not in subscriber_pc._video_drain_tasks + + def test_no_error_when_no_drain_exists(self, subscriber_pc): + track_id = "user123:video:0" + relay = MediaRelay() + original_track = Mock() + subscriber_pc.track_map[track_id] = (relay, original_track) + + result = subscriber_pc.add_track_subscriber(track_id) + assert result is not None From 9ea7275d22400184c1c70f20f73404d7cc74366a Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 1 Apr 2026 20:13:51 +0400 Subject: [PATCH 2/6] fix: prevent GC from collecting blackhole stop task Replace ensure_future with create_task and hold a strong reference in _background_tasks set to avoid silent task cancellation by GC. --- getstream/video/rtc/pc.py | 5 ++++- tests/rtc/test_subscriber_drain.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index cccdf62e..753af184 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -144,6 +144,7 @@ def __init__( self.video_frame_trackers = {} # track_id -> VideoFrameTracker self._video_blackholes: dict[str, MediaBlackhole] = {} self._video_drain_tasks: dict[str, asyncio.Task] = {} + self._background_tasks: set[asyncio.Task] = set() @self.on("track") async def on_track(track: aiortc.mediastreams.MediaStreamTrack): @@ -212,7 +213,9 @@ def add_track_subscriber( blackhole = self._video_blackholes.pop(track_id, None) if blackhole: - asyncio.ensure_future(blackhole.stop()) + task = asyncio.create_task(blackhole.stop()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) if track_data: relay, original_track = track_data diff --git a/tests/rtc/test_subscriber_drain.py b/tests/rtc/test_subscriber_drain.py index ab98798c..a43add56 100644 --- a/tests/rtc/test_subscriber_drain.py +++ b/tests/rtc/test_subscriber_drain.py @@ -18,12 +18,14 @@ def subscriber_pc(): pc.video_frame_trackers = {} pc._video_blackholes = {} pc._video_drain_tasks = {} + pc._background_tasks = set() pc._listeners = {} return pc class TestAddTrackSubscriberStopsDrain: - def test_blackhole_stopped_when_subscriber_added(self, subscriber_pc): + @pytest.mark.asyncio + async def test_blackhole_stopped_when_subscriber_added(self, subscriber_pc): track_id = "user123:video:0" relay = MediaRelay() original_track = Mock() From 8efddcdc3fe5682d3426b370a305767f6d421377 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 1 Apr 2026 20:34:17 +0400 Subject: [PATCH 3/6] docs: document drain_video_frames and align defaults across layers ConnectionManager and PeerConnectionManager had drain_video_frames defaulting to False, overriding the True default in SubscriberPeerConnection. Aligned all three layers and added a docstring explaining the drain lifecycle. --- getstream/video/rtc/connection_manager.py | 2 +- getstream/video/rtc/pc.py | 9 +++++++++ getstream/video/rtc/peer_connection.py | 2 +- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 434a7a4e..d06a069b 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -58,7 +58,7 @@ def __init__( create: bool = True, subscription_config: Optional[SubscriptionConfig] = None, max_join_retries: int = 3, - drain_video_frames: bool = False, + drain_video_frames: bool = True, **kwargs: Any, ): super().__init__() diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 753af184..757ea7ac 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -133,6 +133,15 @@ def __init__( configuration: aiortc.RTCConfiguration, drain_video_frames: bool = True, ) -> None: + """ + Args: + drain_video_frames: When True, attaches a MediaBlackhole to each + incoming video track so unconsumed frames are drained + automatically. This prevents unbounded queue growth in + RTCRtpReceiver when no subscriber is consuming the track. + The drain is stopped once a real subscriber is added via + add_track_subscriber. + """ logger.info( f"creating subscriber peer connection with configuration: {configuration}" ) diff --git a/getstream/video/rtc/peer_connection.py b/getstream/video/rtc/peer_connection.py index 26e89df0..685de269 100644 --- a/getstream/video/rtc/peer_connection.py +++ b/getstream/video/rtc/peer_connection.py @@ -28,7 +28,7 @@ class PeerConnectionManager: """Manages WebRTC peer connections for publishing and subscribing.""" - def __init__(self, connection_manager, drain_video_frames: bool = False): + def __init__(self, connection_manager, drain_video_frames: bool = True): self.connection_manager = connection_manager self._drain_video_frames = drain_video_frames self.publisher_pc: Optional[PublisherPeerConnection] = None From 032e3de26384d034f8de04c77782acd68b0f30bd Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 1 Apr 2026 21:05:34 +0400 Subject: [PATCH 4/6] docs: move drain_video_frames docstring to ConnectionManager SubscriberPeerConnection is internal; ConnectionManager is the public entry point users actually interact with. --- getstream/video/rtc/connection_manager.py | 9 +++++++++ getstream/video/rtc/pc.py | 9 --------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index d06a069b..b8d992f1 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -61,6 +61,15 @@ def __init__( drain_video_frames: bool = True, **kwargs: Any, ): + """ + Args: + drain_video_frames: When True, attaches a MediaBlackhole to each + incoming video track so unconsumed frames are drained + automatically. This prevents unbounded queue growth in + RTCRtpReceiver when no subscriber is consuming the track. + The drain is stopped once a real subscriber is added via + add_track_subscriber. + """ super().__init__() # Public attributes diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 757ea7ac..753af184 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -133,15 +133,6 @@ def __init__( configuration: aiortc.RTCConfiguration, drain_video_frames: bool = True, ) -> None: - """ - Args: - drain_video_frames: When True, attaches a MediaBlackhole to each - incoming video track so unconsumed frames are drained - automatically. This prevents unbounded queue growth in - RTCRtpReceiver when no subscriber is consuming the track. - The drain is stopped once a real subscriber is added via - add_track_subscriber. - """ logger.info( f"creating subscriber peer connection with configuration: {configuration}" ) From d1ca6fba4a4bfb0e8d12d053a2cd295e5ec1c229 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 2 Apr 2026 16:56:08 +0400 Subject: [PATCH 5/6] refactor: merge blackhole and drain task into single dict Two separate dicts (_video_blackholes, _video_drain_tasks) tracked the same lifecycle, risking desync. Store them as a tuple in one dict. --- getstream/video/rtc/pc.py | 13 ++++--------- tests/rtc/test_subscriber_drain.py | 5 +---- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 753af184..72bd723f 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -142,8 +142,7 @@ def __init__( self.track_map = {} # track_id -> (MediaRelay, original_track) self.video_frame_trackers = {} # track_id -> VideoFrameTracker - self._video_blackholes: dict[str, MediaBlackhole] = {} - self._video_drain_tasks: dict[str, asyncio.Task] = {} + self._video_blackholes: dict[str, tuple[MediaBlackhole, asyncio.Task]] = {} self._background_tasks: set[asyncio.Task] = set() @self.on("track") @@ -190,11 +189,8 @@ def _emit_pcm(pcm: PcmData): drain_proxy = relay.subscribe(tracked_track) blackhole = MediaBlackhole() blackhole.addTrack(drain_proxy) - self._video_blackholes[track.id] = blackhole - self._video_drain_tasks[track.id] = asyncio.create_task( - blackhole.start() - ) - + drain_task = asyncio.create_task(blackhole.start()) + self._video_blackholes[track.id] = (blackhole, drain_task) self.emit("track_added", proxy, user) @self.on("icegatheringstatechange") @@ -209,8 +205,7 @@ def add_track_subscriber( """Add a new subscriber to an existing track's MediaRelay.""" track_data = self.track_map.get(track_id) - self._video_drain_tasks.pop(track_id, None) - blackhole = self._video_blackholes.pop(track_id, None) + blackhole, _ = self._video_blackholes.pop(track_id, (None, None)) if blackhole: task = asyncio.create_task(blackhole.stop()) diff --git a/tests/rtc/test_subscriber_drain.py b/tests/rtc/test_subscriber_drain.py index a43add56..08b86f51 100644 --- a/tests/rtc/test_subscriber_drain.py +++ b/tests/rtc/test_subscriber_drain.py @@ -17,7 +17,6 @@ def subscriber_pc(): pc.track_map = {} pc.video_frame_trackers = {} pc._video_blackholes = {} - pc._video_drain_tasks = {} pc._background_tasks = set() pc._listeners = {} return pc @@ -33,14 +32,12 @@ async def test_blackhole_stopped_when_subscriber_added(self, subscriber_pc): blackhole = Mock() blackhole.stop = AsyncMock() - subscriber_pc._video_blackholes[track_id] = blackhole - subscriber_pc._video_drain_tasks[track_id] = Mock() + subscriber_pc._video_blackholes[track_id] = (blackhole, Mock()) subscriber_pc.add_track_subscriber(track_id) blackhole.stop.assert_called_once() assert track_id not in subscriber_pc._video_blackholes - assert track_id not in subscriber_pc._video_drain_tasks def test_no_error_when_no_drain_exists(self, subscriber_pc): track_id = "user123:video:0" From ba064949103e73eb85c19c20edb978521d43bddb Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 2 Apr 2026 17:21:49 +0400 Subject: [PATCH 6/6] fix: explicitly cancel drain_task on subscriber arrival stop() handles current cleanup, but if aiortc changes start() to be long-lived, the drain_task could leak. Cancel it defensively. --- getstream/video/rtc/pc.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 72bd723f..ab7302eb 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -205,10 +205,11 @@ def add_track_subscriber( """Add a new subscriber to an existing track's MediaRelay.""" track_data = self.track_map.get(track_id) - blackhole, _ = self._video_blackholes.pop(track_id, (None, None)) + blackhole, drain_task = self._video_blackholes.pop(track_id, (None, None)) - if blackhole: + if blackhole and drain_task: task = asyncio.create_task(blackhole.stop()) + drain_task.cancel() # safety net if start() becomes long-lived in future aiortc self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard)