Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 143 additions & 31 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ def _wrap_async_generator_result(
observe = _decorator.observe


def _get_generator_output(
items: List[Any],
transform_fn: Optional[Callable[[Iterable], str]],
) -> Any:
output: Any = items

if transform_fn is not None:
output = transform_fn(items)
elif all(isinstance(item, str) for item in items):
output = "".join(items)

return output


class _ContextPreservedSyncGeneratorWrapper:
"""Sync generator wrapper that ensures each iteration runs in preserved context."""

Expand All @@ -560,9 +574,17 @@ def __init__(
self.items: List[Any] = []
self.span = span
self.transform_fn = transform_fn
self._finalized = False

def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
return self
def __iter__(self) -> Generator[Any, None, None]:
try:
while True:
yield self.__next__()
except StopIteration:
return
finally:
if not self._finalized:
self.close()

def __next__(self) -> Any:
try:
Expand All @@ -573,25 +595,65 @@ def __next__(self) -> Any:
return item

except StopIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items
self._finalize()
raise # Re-raise StopIteration

if self.transform_fn is not None:
output = self.transform_fn(self.items)
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
raise

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
def close(self) -> None:
if self._finalized:
return

self.span.update(output=output).end()
try:
close_method = getattr(self.generator, "close", None)
if callable(close_method):
self.context.run(close_method)
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
Comment on lines 600 to +614
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 In _ContextPreservedSyncGeneratorWrapper.close(), _finalize() is only called if close_method() either succeeds or raises Exception/asyncio.CancelledError; if it raises a BaseException not matched by that handler (e.g. KeyboardInterrupt, SystemExit), the trailing self._finalize() is never reached and the span leaks open forever. The same PR introduces LangfuseResponseGeneratorSync.close() which correctly uses try/finally to guarantee finalization — _ContextPreservedSyncGeneratorWrapper.close() should be updated to match.

Extended reasoning...

What the bug is

In _ContextPreservedSyncGeneratorWrapper.close() (lines 600-614 of langfuse/_client/observe.py), the call to self.context.run(close_method) is inside a try/except (Exception, asyncio.CancelledError) block. The span-finalizing call self._finalize() sits after that block, outside any finally clause:

def close(self) -> None:
    if self._finalized:
        return
    try:
        close_method = getattr(self.generator, 'close', None)
        if callable(close_method):
            self.context.run(close_method)
    except (Exception, asyncio.CancelledError) as e:
        self._finalize(error=e)
        raise
    self._finalize()  # only reached if no uncaught BaseException propagates

The specific code path that triggers it

If close_method() raises a BaseException subclass not covered by the except clause — for example KeyboardInterrupt or SystemExit — the handler is bypassed, and execution unwinds past the try/except block without ever reaching the trailing self._finalize(). The span is left open with _finalized = False.

Why existing code does not prevent it

The __iter__ method does have a finally block that calls self.close() on early loop exit, which handles the for-loop break case. However, once control is inside close() itself, if close_method() raises a BaseException, the __iter__ finally has already fired (or is not on the call stack). There is no further safety net. _finalized remains False and the span is genuinely leaked.

Comparison with the correct pattern introduced in this same PR

LangfuseResponseGeneratorSync.close() (also new in this PR) uses try/finally correctly:

def close(self) -> None:
    if self._finalized:
        return
    close_method = getattr(self.response, 'close', None)
    if callable(close_method):
        try:
            close_method()
        finally:
            self._finalize()  # always runs regardless of exception type
        return
    self._finalize()

_ContextPreservedAsyncGeneratorWrapper.aclose() also uses try/finally. The asymmetry is limited to _ContextPreservedSyncGeneratorWrapper.close().

Impact

If KeyboardInterrupt or SystemExit fires during generator teardown, the Langfuse span is never ended. This is a rare edge case (signal or interpreter shutdown during active generator close), but it is a real span leak that contradicts the always-finalize invariant the PR is carefully establishing everywhere else.

How to fix

Use try/finally while preserving error annotation on caught exceptions:

def close(self) -> None:
    if self._finalized:
        return
    error = None
    try:
        close_method = getattr(self.generator, 'close', None)
        if callable(close_method):
            self.context.run(close_method)
    except (Exception, asyncio.CancelledError) as e:
        error = e
        raise
    finally:
        self._finalize(error=error)

Step-by-step proof

  1. wrapper = _ContextPreservedSyncGeneratorWrapper(g, ctx, span, None) — span is open, _finalized=False.
  2. Consumer iterates for item in wrapper then breaks — __iter__'s finally clause calls wrapper.close().
  3. Inside close(): _finalized is False, we enter the try block and call self.context.run(close_method).
  4. The underlying generator's close() executes and a KeyboardInterrupt is raised (e.g. via OS signal handler active at that moment).
  5. KeyboardInterrupt is not an Exception or asyncio.CancelledError; the except clause does not match it.
  6. The exception propagates out of the try/except entirely. The line self._finalize() after the block is never executed.
  7. _finalized stays False; span.end() is never called; the span is leaked indefinitely.

raise

raise # Re-raise StopIteration
self._finalize()

def throw(self, typ: Any, val: Any = None, tb: Any = None) -> Any:
throw_method = getattr(self.generator, "throw", None)
if not callable(throw_method):
raise AttributeError("Wrapped generator does not support throw()")

try:
if tb is not None:
item = self.context.run(throw_method, typ, val, tb)
elif val is not None:
item = self.context.run(throw_method, typ, val)
else:
item = self.context.run(throw_method, typ)

self.items.append(item)

return item
except StopIteration:
self._finalize()
raise
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
raise

def _finalize(self, error: Optional[BaseException] = None) -> None:
if self._finalized:
return

self._finalized = True

if error is not None:
self.span.update(
level="ERROR", status_message=str(e) or type(e).__name__
level="ERROR", status_message=str(error) or type(error).__name__
).end()
return

raise
self.span.update(
output=_get_generator_output(self.items, self.transform_fn)
).end()


class _ContextPreservedAsyncGeneratorWrapper:
Expand Down Expand Up @@ -619,43 +681,93 @@ def __init__(
self.items: List[Any] = []
self.span = span
self.transform_fn = transform_fn
self._finalized = False
Comment on lines 683 to +684
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The _ContextPreservedAsyncGeneratorWrapper.__aiter__ method returns self, making it a plain async iterator rather than an async generator, so Python will not auto-call aclose() when a caller breaks out of async for item in wrapper: early, leaving the span permanently unfinalized. The sync wrapper was correctly hardened in this PR by converting __iter__ into a generator function with a finally: self.close() block, but the async counterpart was not given the same treatment; callers must explicitly use aclosing(wrapper) to avoid the leak, as the new test itself demonstrates.

Extended reasoning...

What the bug is and how it manifests

_ContextPreservedAsyncGeneratorWrapper implements __aiter__ by returning self, which makes instances plain async iterators, not async generators. Python's async for loop only automatically calls aclose() on objects whose runtime type is types.AsyncGeneratorType (produced by calling an async def function that contains yield). For plain async iterators that merely implement __aiter__/__anext__, Python places cleanup responsibility entirely on the caller.

The specific code path that triggers it

When user code does async for item in async_gen_wrapper: process(item); break, Python calls __aiter__() once (returns self), then calls __anext__() in a loop. On break, the loop simply stops -- it does NOT call aclose() because the wrapper is not a real async generator. None of _finalize()'s three call-sites are reached: exhaustion (StopAsyncIteration), exception, or explicit aclose().

Why existing code doesn't prevent it

The _finalize() method and _finalized guard are both correct and idempotent. The problem is structural: _finalize() is never invoked in the early-break scenario because the wrapper is a plain async iterator, not an async generator, so the Python runtime provides no automatic cleanup hook.

What the impact would be

Every early break from async for item in wrapper: that does not explicitly call await wrapper.aclose() leaves the associated Langfuse span permanently open. span.end() is never called, no output is recorded. This is a silent observability and resource leak affecting any caller of an @observe-decorated async generator function.

How to fix it

Apply the same pattern used for the sync wrapper: convert __aiter__ into an async generator method with a finally block:

async def __aiter__(self):
    try:
        while True:
            yield await self.__anext__()
    except StopAsyncIteration:
        return
    finally:
        if not self._finalized:
            await self.aclose()

This returns an actual async generator object, so Python automatically calls aclose() on early break.

Step-by-step proof

  1. @observe wraps an async generator; _wrap_async_generator_result returns a _ContextPreservedAsyncGeneratorWrapper instance w.
  2. Caller does async for item in w: break.
  3. Python calls w.__aiter__() -- returns w itself (type is _ContextPreservedAsyncGeneratorWrapper, not AsyncGeneratorType).
  4. Python calls await w.__anext__() -- yields the first item.
  5. break exits the loop. Python does NOT call await w.aclose() -- w is a plain async iterator.
  6. w._finalize() is never called. w.span.end() is never called. Span remains open indefinitely.
  7. The new test test_async_generator_wrapper_aclose_finalizes_partial_output explicitly uses async with aclosing(wrapper) as stream: as a workaround -- direct evidence that without aclosing, the span is not finalized.


def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
return self

async def __anext__(self) -> Any:
try:
# Run the generator's __anext__ in the preserved context
try:
# Python 3.10+ approach with context parameter
item = await asyncio.create_task(
self.generator.__anext__(), # type: ignore
context=self.context,
) # type: ignore
except TypeError:
# Python < 3.10 fallback - context parameter not supported
item = await self.generator.__anext__()
item = await self._run_in_preserved_context(self.generator.__anext__)

self.items.append(item)

return item

except StopAsyncIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items
self._finalize()
raise # Re-raise StopAsyncIteration
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
raise

if self.transform_fn is not None:
output = self.transform_fn(self.items)
async def close(self) -> None:
await self.aclose()

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
async def aclose(self) -> None:
if self._finalized:
return

self.span.update(output=output).end()
try:
close_method = getattr(self.generator, "aclose", None)
if callable(close_method):
await self._run_in_preserved_context(close_method)
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
raise

raise # Re-raise StopAsyncIteration
self._finalize()

async def athrow(self, typ: Any, val: Any = None, tb: Any = None) -> Any:
throw_method = getattr(self.generator, "athrow", None)
if not callable(throw_method):
raise AttributeError("Wrapped async generator does not support athrow()")

try:
if tb is not None:
item = await self._run_in_preserved_context(
lambda: throw_method(typ, val, tb)
)
elif val is not None:
item = await self._run_in_preserved_context(
lambda: throw_method(typ, val)
)
else:
item = await self._run_in_preserved_context(lambda: throw_method(typ))

self.items.append(item)

return item
except StopAsyncIteration:
self._finalize()
raise
except (Exception, asyncio.CancelledError) as e:
self._finalize(error=e)
raise

async def _run_in_preserved_context(self, factory: Callable[[], Any]) -> Any:
awaitable = self.context.run(factory)

try:
task = asyncio.create_task(awaitable, context=self.context) # type: ignore[call-arg]
except TypeError:
task = self.context.run(asyncio.create_task, awaitable)

return await task

def _finalize(self, error: Optional[BaseException] = None) -> None:
if self._finalized:
return

self._finalized = True

if error is not None:
self.span.update(
level="ERROR", status_message=str(e) or type(e).__name__
level="ERROR", status_message=str(error) or type(error).__name__
).end()
return

raise
self.span.update(
output=_get_generator_output(self.items, self.transform_fn)
).end()
6 changes: 3 additions & 3 deletions langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,10 +1057,10 @@ def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]:
and len(message.tool_calls) > 0
):
message_dict["tool_calls"] = message.tool_calls

if (
hasattr(message, "invalid_tool_calls")
and message.invalid_tool_calls is not None
hasattr(message, "invalid_tool_calls")
and message.invalid_tool_calls is not None
and len(message.invalid_tool_calls) > 0
):
message_dict["invalid_tool_calls"] = message.invalid_tool_calls
Expand Down
67 changes: 61 additions & 6 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ def __init__(
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._finalized = False

def __iter__(self) -> Any:
try:
Expand Down Expand Up @@ -1039,12 +1040,31 @@ def __next__(self) -> Any:
raise

def __enter__(self) -> Any:
return self.__iter__()
return self

def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass
self.close()

def close(self) -> None:
if self._finalized:
return

close_method = getattr(self.response, "close", None)
if callable(close_method):
try:
close_method()
finally:
self._finalize()
return

self._finalize()

def _finalize(self) -> None:
if self._finalized:
return

self._finalized = True

try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
Expand Down Expand Up @@ -1081,6 +1101,7 @@ def __init__(
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._finalized = False

async def __aiter__(self) -> Any:
try:
Expand Down Expand Up @@ -1110,12 +1131,17 @@ async def __anext__(self) -> Any:
raise

async def __aenter__(self) -> Any:
return self.__aiter__()
return self

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass
await self.aclose()

async def _finalize(self) -> None:
if self._finalized:
return

self._finalized = True

try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
Expand All @@ -1142,11 +1168,40 @@ async def close(self) -> None:

Automatically called if the response body is read to completion.
"""
await self.response.close()
if self._finalized:
return

close_method = getattr(self.response, "close", None)
if callable(close_method):
try:
await close_method()
finally:
await self._finalize()
return

await self._finalize()

async def aclose(self) -> None:
"""Close the response and release the connection.

Automatically called if the response body is read to completion.
"""
await self.response.aclose()
if self._finalized:
return
Comment on lines +1189 to +1190
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close async response even when stream already finalized

LangfuseResponseGeneratorAsync.__aiter__ always runs _finalize() in its finally, so after any async for exit (including an early break) _finalized is already True; with this new guard, a later explicit await stream.aclose() becomes a no-op and never reaches self.response.aclose()/close(). That regresses resource cleanup for partial-consumption flows and can leave the underlying HTTP stream open, especially in patterns that break early and then call aclose() to release the connection.

Useful? React with 👍 / 👎.


close_method = getattr(self.response, "aclose", None)
if callable(close_method):
try:
await close_method()
finally:
await self._finalize()
else:
close_method = getattr(self.response, "close", None)
if callable(close_method):
try:
await close_method()
finally:
await self._finalize()
return

await self._finalize()
Loading
Loading