fix(async-streaming): harden context preservation#1576
fix(async-streaming): harden context preservation#1576
Conversation
|
@claude review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: edf45e3a7e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if self._finalized: | ||
| return |
There was a problem hiding this comment.
Close async response even when stream already finalized
LangfuseResponseGeneratorAsync.__aiter__ always runs _finalize() in its finally, so after any async for exit (including an early break) _finalized is already True; with this new guard, a later explicit await stream.aclose() becomes a no-op and never reaches self.response.aclose()/close(). That regresses resource cleanup for partial-consumption flows and can leave the underlying HTTP stream open, especially in patterns that break early and then call aclose() to release the connection.
Useful? React with 👍 / 👎.
| self.transform_fn = transform_fn | ||
| self._finalized = False |
There was a problem hiding this comment.
🔴 The _ContextPreservedAsyncGeneratorWrapper.__aiter__ method returns self, making it a plain async iterator rather than an async generator, so Python will not auto-call aclose() when a caller breaks out of async for item in wrapper: early, leaving the span permanently unfinalized. The sync wrapper was correctly hardened in this PR by converting __iter__ into a generator function with a finally: self.close() block, but the async counterpart was not given the same treatment; callers must explicitly use aclosing(wrapper) to avoid the leak, as the new test itself demonstrates.
Extended reasoning...
What the bug is and how it manifests
_ContextPreservedAsyncGeneratorWrapper implements __aiter__ by returning self, which makes instances plain async iterators, not async generators. Python's async for loop only automatically calls aclose() on objects whose runtime type is types.AsyncGeneratorType (produced by calling an async def function that contains yield). For plain async iterators that merely implement __aiter__/__anext__, Python places cleanup responsibility entirely on the caller.
The specific code path that triggers it
When user code does async for item in async_gen_wrapper: process(item); break, Python calls __aiter__() once (returns self), then calls __anext__() in a loop. On break, the loop simply stops -- it does NOT call aclose() because the wrapper is not a real async generator. None of _finalize()'s three call-sites are reached: exhaustion (StopAsyncIteration), exception, or explicit aclose().
Why existing code doesn't prevent it
The _finalize() method and _finalized guard are both correct and idempotent. The problem is structural: _finalize() is never invoked in the early-break scenario because the wrapper is a plain async iterator, not an async generator, so the Python runtime provides no automatic cleanup hook.
What the impact would be
Every early break from async for item in wrapper: that does not explicitly call await wrapper.aclose() leaves the associated Langfuse span permanently open. span.end() is never called, no output is recorded. This is a silent observability and resource leak affecting any caller of an @observe-decorated async generator function.
How to fix it
Apply the same pattern used for the sync wrapper: convert __aiter__ into an async generator method with a finally block:
async def __aiter__(self):
try:
while True:
yield await self.__anext__()
except StopAsyncIteration:
return
finally:
if not self._finalized:
await self.aclose()This returns an actual async generator object, so Python automatically calls aclose() on early break.
Step-by-step proof
@observewraps an async generator;_wrap_async_generator_resultreturns a_ContextPreservedAsyncGeneratorWrapperinstancew.- Caller does
async for item in w: break. - Python calls
w.__aiter__()-- returnswitself (type is_ContextPreservedAsyncGeneratorWrapper, notAsyncGeneratorType). - Python calls
await w.__anext__()-- yields the first item. breakexits the loop. Python does NOT callawait w.aclose()--wis a plain async iterator.w._finalize()is never called.w.span.end()is never called. Span remains open indefinitely.- The new test
test_async_generator_wrapper_aclose_finalizes_partial_outputexplicitly usesasync with aclosing(wrapper) as stream:as a workaround -- direct evidence that withoutaclosing, the span is not finalized.
|
|
||
| if self.transform_fn is not None: | ||
| output = self.transform_fn(self.items) | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| self._finalize(error=e) | ||
| raise | ||
|
|
||
| elif all(isinstance(item, str) for item in self.items): | ||
| output = "".join(self.items) | ||
| def close(self) -> None: | ||
| if self._finalized: | ||
| return | ||
|
|
||
| self.span.update(output=output).end() | ||
| try: | ||
| close_method = getattr(self.generator, "close", None) | ||
| if callable(close_method): | ||
| self.context.run(close_method) | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| self._finalize(error=e) |
There was a problem hiding this comment.
🟡 In _ContextPreservedSyncGeneratorWrapper.close(), _finalize() is only called if close_method() either succeeds or raises Exception/asyncio.CancelledError; if it raises a BaseException not matched by that handler (e.g. KeyboardInterrupt, SystemExit), the trailing self._finalize() is never reached and the span leaks open forever. The same PR introduces LangfuseResponseGeneratorSync.close() which correctly uses try/finally to guarantee finalization — _ContextPreservedSyncGeneratorWrapper.close() should be updated to match.
Extended reasoning...
What the bug is
In _ContextPreservedSyncGeneratorWrapper.close() (lines 600-614 of langfuse/_client/observe.py), the call to self.context.run(close_method) is inside a try/except (Exception, asyncio.CancelledError) block. The span-finalizing call self._finalize() sits after that block, outside any finally clause:
def close(self) -> None:
if self._finalized:
return
try:
close_method = getattr(self.generator, 'close', None)
if callable(close_method):
self.context.run(close_method)
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
raise
self._finalize() # only reached if no uncaught BaseException propagatesThe specific code path that triggers it
If close_method() raises a BaseException subclass not covered by the except clause — for example KeyboardInterrupt or SystemExit — the handler is bypassed, and execution unwinds past the try/except block without ever reaching the trailing self._finalize(). The span is left open with _finalized = False.
Why existing code does not prevent it
The __iter__ method does have a finally block that calls self.close() on early loop exit, which handles the for-loop break case. However, once control is inside close() itself, if close_method() raises a BaseException, the __iter__ finally has already fired (or is not on the call stack). There is no further safety net. _finalized remains False and the span is genuinely leaked.
Comparison with the correct pattern introduced in this same PR
LangfuseResponseGeneratorSync.close() (also new in this PR) uses try/finally correctly:
def close(self) -> None:
if self._finalized:
return
close_method = getattr(self.response, 'close', None)
if callable(close_method):
try:
close_method()
finally:
self._finalize() # always runs regardless of exception type
return
self._finalize()_ContextPreservedAsyncGeneratorWrapper.aclose() also uses try/finally. The asymmetry is limited to _ContextPreservedSyncGeneratorWrapper.close().
Impact
If KeyboardInterrupt or SystemExit fires during generator teardown, the Langfuse span is never ended. This is a rare edge case (signal or interpreter shutdown during active generator close), but it is a real span leak that contradicts the always-finalize invariant the PR is carefully establishing everywhere else.
How to fix
Use try/finally while preserving error annotation on caught exceptions:
def close(self) -> None:
if self._finalized:
return
error = None
try:
close_method = getattr(self.generator, 'close', None)
if callable(close_method):
self.context.run(close_method)
except (Exception, asyncio.CancelledError) as e:
error = e
raise
finally:
self._finalize(error=error)Step-by-step proof
wrapper = _ContextPreservedSyncGeneratorWrapper(g, ctx, span, None)— span is open,_finalized=False.- Consumer iterates
for item in wrapperthen breaks —__iter__'sfinallyclause callswrapper.close(). - Inside
close():_finalizedisFalse, we enter thetryblock and callself.context.run(close_method). - The underlying generator's
close()executes and aKeyboardInterruptis raised (e.g. via OS signal handler active at that moment). KeyboardInterruptis not anExceptionorasyncio.CancelledError; theexceptclause does not match it.- The exception propagates out of the
try/exceptentirely. The lineself._finalize()after the block is never executed. _finalizedstaysFalse;span.end()is never called; the span is leaked indefinitely.
Disclaimer: Experimental PR review
Greptile Summary
This PR hardens context preservation for async and sync streaming by ensuring generator wrappers properly finalize their Langfuse spans on early consumer break, exception, explicit
close()/aclose(), and throw operations. It also fixes the previously broken context-manager protocol onLangfuseResponseGeneratorSyncandLangfuseResponseGeneratorAsync(__enter__was returning an iterator instead ofself;__exit__was a no-op).Key changes:
_ContextPreservedSyncGeneratorWrapper.__iter__is converted to a generator with afinally: close()guard, correctly draining and closing the inner generator on early break._finalizedidempotency flags, extracted_finalize()methods, and explicitclose()/throw()(sync) andaclose()/athrow()(async) methods._run_in_preserved_context()is refactored to avoid the repeatedtry/except TypeErrorfor Python < 3.10.LangfuseResponseGeneratorSync/Asynccontext-manager entry now correctly returnsselfand exit now callsclose()/aclose().create_taskfallback.One gap remains: in
LangfuseResponseGeneratorSync.__iter__(line 1024–1025) andLangfuseResponseGeneratorAsync.__aiter__(line 1115–1116), thefinallyblock callsself._finalize()directly rather thanself.close(). This sets_finalized = Truebefore the underlyingresponse.close()/aclose()is called. If the stream is abandoned mid-iteration through theforloop (as opposed to through__next__) and thenclose()is invoked (e.g. via__exit__),close()returns early and the underlying HTTP connection is never explicitly closed.Confidence Score: 4/5
__iter__/__aiter__finallycalling_finalize()instead ofclose(), which can leave HTTP connections unclosed on early break._ContextPreservedSyncGeneratorWrapperearly-break path is handled correctly. The one concrete issue —LangfuseResponseGeneratorSync.__iter__andLangfuseResponseGeneratorAsync.__aiter__bypassingresponse.close()— is a resource-management gap introduced in this PR, but it requires a specific usage pattern (context manager + for-loop + break) and Python's GC provides a safety net. The fix is a one-line change in each class.langfuse/openai.py—__iter__and__aiter__finallyblocks (lines 1024–1025 and 1115–1116)Important Files Changed
_finalizedidempotency, properclose()/throw()delegation, and early-breakfinallyguard in__iter__for the sync wrapper. The async wrapper gainsaclose(),athrow(), and a cleaner_run_in_preserved_context(). Logic is sound and the_ContextPreservedSyncGeneratorWrapperearly-break path is correctly handled._finalizedguard, fixes context-manager entry/exit, and introducesclose()/aclose()for both generator classes. The__iter__/__aiter__finallyblocks still call_finalize()directly instead ofclose(), leaving the underlying response unclosed when the stream is abandoned mid-iteration through the for-loop path.aclosing()pattern, and Python < 3.10create_taskfallback. Good coverage of the new wrapper methods using a lightweight_FakeObservationstub.close()/aclose()on partial streams. Tests only exercise thenext()/__anext__()path, not thefor chunk in stream: breakpath through__iter__/__aiter__, so the resource-leak edge case goes untested.Sequence Diagram
sequenceDiagram participant C as Consumer participant W as Generator Wrapper participant G as Inner Generator / Response participant S as Langfuse Span Note over C,S: Happy path – full consumption C->>W: for item in wrapper / next() W->>G: context.run(next, generator) G-->>W: item W-->>C: item G-->>W: StopIteration / StopAsyncIteration W->>W: _finalize() W->>S: span.update(output=...).end() Note over C,S: Early break – sync (_ContextPreservedSyncGeneratorWrapper) C->>W: for item in wrapper → break W->>W: __iter__ finally: if not _finalized → close() W->>G: context.run(generator.close) W->>W: _finalize() W->>S: span.update(output=partial).end() Note over C,S: Early break – LangfuseResponseGeneratorSync (gap) C->>W: for chunk in stream → break W->>W: __iter__ finally: _finalize() [sets _finalized=True] W->>S: generation.end() C->>W: close() via __exit__ W->>W: _finalized==True → return early W--xG: response.close() NEVER calledComments Outside Diff (1)
langfuse/openai.py, line 1024-1025 (link)close()skipped when iterated via__iter__with early breakWhen consumers use the
for chunk in stream:path (which goes through__iter__), thefinallyblock callsself._finalize(), setting_finalized = True. If iteration is abandoned early (e.g.break), the__exit__method then callsclose(), butclose()sees_finalized == Trueand returns immediately — soself.response.close()is never called, leaving the underlying HTTP connection open.The new tests only exercise the
next(stream)path (__next__), which does not trigger this issue. A representative scenario:The same problem exists in
LangfuseResponseGeneratorAsync.__aiter__(line 1115–1116), wherefinally: await self._finalize()sets the flag beforeaclose()can close the underlying response.Changing
__iter__'sfinallyto callself.close()instead ofself._finalize()would fix both paths, sinceclose()is already idempotent via the_finalizedguard, and_finalize()is called insideclose().Reviews (1): Last reviewed commit: "push;" | Re-trigger Greptile