Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"livekit-agents": patch
"livekit-plugins-openai": patch
---

Add `add_to_chat_ctx: bool = True` parameter to `RealtimeSession.generate_reply` and `AgentSession.generate_reply`, plus a `RealtimeCapabilities.ephemeral_response` capability flag plugins use to declare honor of the parameter. When `False` against a plugin that declares the capability (currently the OpenAI plugin on the public endpoint), the rendered assistant turn does not enter the substrate's persistent conversation state and is not written to the agent's local chat context. Plugins that do not declare the capability emit a `UserWarning` and fall back to the legacy add-to-context path. Default `add_to_chat_ctx=True` preserves all existing behavior.

The OpenAI plugin enforces a single-isolated-call serialization contract: a second `generate_reply(add_to_chat_ctx=False)` issued while the first is in flight raises `RuntimeError` with diagnostic context (`client_event_id`, `response_id`, elapsed-since-issue, docstring §Concurrency reference). Default `add_to_chat_ctx=True` calls retain their existing concurrency semantics.

Includes concurrency hardening for the substrate's parallel out-of-band code path (orphan filter at `response.created` and nine bare-assert handler conversions to early-return on `_current_generation is None`) and wires `interrupt()` with the active server-assigned `response_id` so cancel actually stops in-flight isolated responses.
31 changes: 30 additions & 1 deletion livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class RealtimeCapabilities:
"""Whether the tool and tool choice can be specified per response"""
supports_say: bool = False
"""Whether the model supports session.say()"""
ephemeral_response: bool = False
"""Whether the model honors ``add_to_chat_ctx=False`` on ``generate_reply``
by producing the response without polluting the substrate's persistent
conversation state. Plugins that set this to ``True`` must also suppress
local-context writes for the assistant turn (handled in
``AgentActivity._realtime_generation_task_impl``).
"""


class RealtimeError(Exception):
Expand Down Expand Up @@ -218,9 +225,31 @@ def generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
add_to_chat_ctx: bool = True,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
tools: NotGivenOr[list[Tool]] = NOT_GIVEN,
) -> asyncio.Future[GenerationCreatedEvent]: ... # can raise RealtimeError on Timeout
) -> asyncio.Future[GenerationCreatedEvent]:
"""Trigger a model response.

When ``add_to_chat_ctx`` is False, the resulting assistant turn is not
written to the agent's local chat context AND, for plugins that
declare ``RealtimeCapabilities.ephemeral_response=True``, is generated
in a way that does not pollute the substrate's persistent conversation
state. Plugins that do not declare the capability emit a
``DeprecationWarning`` from the dispatcher and fall back to
``add_to_chat_ctx=True`` behavior.

Concurrency
-----------
A ``RealtimeSession`` allows at most one in-flight isolated call
(``add_to_chat_ctx=False``) at a time. Issuing a second isolated call
while the first is in flight raises ``RuntimeError`` with diagnostic
context (in-flight ``client_event_id``, ``response_id``, and
elapsed-since-issue); callers MUST await the prior call's future
before issuing the next. Default ``add_to_chat_ctx=True`` calls
retain their existing concurrency semantics.
""" # can raise RealtimeError on Timeout
...

# commit the input audio buffer to the server
@abstractmethod
Expand Down
86 changes: 77 additions & 9 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import heapq
import json
import time
import warnings
from collections.abc import AsyncIterable, Coroutine, Sequence
from dataclasses import dataclass
from functools import partial
Expand Down Expand Up @@ -1113,6 +1114,7 @@ def _generate_reply(
user_message: NotGivenOr[llm.ChatMessage | None] = NOT_GIVEN,
chat_ctx: NotGivenOr[llm.ChatContext | None] = NOT_GIVEN,
instructions: NotGivenOr[str | Instructions] = NOT_GIVEN,
add_to_chat_ctx: bool = True,
tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
tools: NotGivenOr[list[str]] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
Expand Down Expand Up @@ -1192,6 +1194,7 @@ def _generate_reply(
# TODO(theomonnom): support llm.ChatMessage for the realtime model
user_input=user_message.text_content if user_message else None,
instructions=instructions or None,
add_to_chat_ctx=add_to_chat_ctx,
tools=resolved_tools if is_given(resolved_tools) else None,
model_settings=ModelSettings(tool_choice=tool_choice),
),
Expand Down Expand Up @@ -1445,6 +1448,16 @@ def _on_metrics_collected(
)

def _on_remote_item_added(self, ev: llm.RemoteItemAddedEvent) -> None:
# Skip items correlated with an in-flight ephemeral response. The
# OpenAI plugin already drops these at the source in
# `_handle_conversion_item_added`; this is a defensive second layer
# that also covers any future plugin that exposes `ephemeral_response`
# without intercepting at the wrapper layer. Duck-typed lookup so
# plugins without the attribute behave normally.
ephemeral_ids: set[str] = getattr(self._rt_session, "_ephemeral_remote_item_ids", set())
if ev.item.id in ephemeral_ids:
return

# add the remote item to the local chat context as a placeholder
local_chat_ctx = self._agent._chat_ctx
if local_chat_ctx.get_by_id(ev.item.id) is not None:
Expand Down Expand Up @@ -2834,10 +2847,39 @@ async def _realtime_reply_task(
tools: list[llm.Tool | llm.Toolset] | None = None,
user_input: str | None = None,
instructions: str | None = None,
add_to_chat_ctx: bool = True,
tool_reply: bool = False,
text: str | AsyncIterable[str] | None = None,
) -> None:
assert self._rt_session is not None, "rt_session is not available"
# Capability gate: only forward `add_to_chat_ctx` to plugins that declare
# `RealtimeCapabilities.ephemeral_response=True`. Plugins without the capability
# keep their existing 3-kwarg signature; emit a `UserWarning` and fall
# back to add_to_chat_ctx=True.
effective_add_to_chat_ctx = add_to_chat_ctx
rt_caps = self._rt_session.realtime_model.capabilities
if not add_to_chat_ctx and not rt_caps.ephemeral_response:
plugin_module = type(self._rt_session.realtime_model).__module__
warnings.warn(
f"plugin {plugin_module} does not declare "
"RealtimeCapabilities.ephemeral_response=True; "
"add_to_chat_ctx=False falls back to True (no isolation). The text "
"passed to generate_reply will be added to the chat context and "
"visible to the model on subsequent turns. Use a plugin that "
"declares ephemeral_response=True (currently: OpenAI plugin, "
"non-Azure endpoint).",
UserWarning,
stacklevel=2,
)
effective_add_to_chat_ctx = True
if not add_to_chat_ctx and (is_given(model_settings.tool_choice) or tools):
warnings.warn(
"add_to_chat_ctx=False forbids tool invocation; tools and "
"tool_choice provided to generate_reply will be discarded for "
"this turn.",
UserWarning,
stacklevel=2,
)
# realtime_reply_task is called only when there's text input, native audio input is handled by _realtime_generation_task
authorization_tasks: list[asyncio.Future[Any]] = [
asyncio.ensure_future(speech_handle._wait_for_authorization()),
Expand Down Expand Up @@ -2891,17 +2933,22 @@ async def _realtime_reply_task(
await self._rt_session.update_tools(llm.ToolContext(tools).flatten())

try:
generation_ev = await self._rt_session.generate_reply(
instructions=instructions or NOT_GIVEN,
tool_choice=(
rt_kwargs: dict[str, Any] = {
"instructions": instructions or NOT_GIVEN,
"tool_choice": (
model_settings.tool_choice if per_response_tool_choice else NOT_GIVEN
),
tools=(
"tools": (
llm.ToolContext(tools).flatten()
if per_response_tool_choice and tools is not None
else NOT_GIVEN
),
)
}
# Only forward `add_to_chat_ctx` to plugins that declare the capability;
# plugins without it keep the existing 3-kwarg signature.
if rt_caps.ephemeral_response:
rt_kwargs["add_to_chat_ctx"] = effective_add_to_chat_ctx
generation_ev = await self._rt_session.generate_reply(**rt_kwargs)
except llm.RealtimeError as e:
logger.error(
"failed to generate a reply%s: %s",
Expand All @@ -2917,6 +2964,7 @@ async def _realtime_reply_task(
generation_ev=generation_ev,
model_settings=model_settings,
instructions=instructions,
add_to_chat_ctx=effective_add_to_chat_ctx,
)
finally:
# reset tool_choice and tools
Expand All @@ -2940,6 +2988,7 @@ async def _realtime_generation_task(
generation_ev: llm.GenerationCreatedEvent,
model_settings: ModelSettings,
instructions: str | None = None,
add_to_chat_ctx: bool = True,
) -> None:
with tracer.start_as_current_span(
"agent_turn", context=self._session._root_span_context
Expand All @@ -2954,6 +3003,7 @@ async def _realtime_generation_task(
generation_ev=generation_ev,
model_settings=model_settings,
instructions=instructions,
add_to_chat_ctx=add_to_chat_ctx,
)

async def _realtime_generation_task_impl(
Expand All @@ -2963,6 +3013,7 @@ async def _realtime_generation_task_impl(
generation_ev: llm.GenerationCreatedEvent,
model_settings: ModelSettings,
instructions: str | None = None,
add_to_chat_ctx: bool = True,
) -> None:
current_span = trace.get_current_span(context=speech_handle._agent_turn_context)
current_span.set_attribute(trace_types.ATTR_SPEECH_ID, speech_handle.id)
Expand Down Expand Up @@ -3180,6 +3231,12 @@ async def _read_fnc_stream() -> None:
)

def _tool_execution_started_cb(fnc_call: llm.FunctionCall) -> None:
# When add_to_chat_ctx=False the OpenAI plugin forces tools=[] /
# tool_choice="none", so this callback should not fire for an
# isolated turn. Gate as defense-in-depth in case a future plugin
# implementation does not honor the tool override.
if not add_to_chat_ctx:
return
speech_handle._item_added([fnc_call])
self._agent._chat_ctx._upsert_item(fnc_call)
self._session._tool_items_added([fnc_call])
Expand Down Expand Up @@ -3288,7 +3345,13 @@ def _create_assistant_message(
"`use_tts_aligned_transcript` is enabled but no agent transcript was returned from tts"
)

if msg_gen and forwarded_text:
if msg_gen and forwarded_text and add_to_chat_ctx:
# Suppressed for add_to_chat_ctx=False: the rendered text must not
# enter agent._chat_ctx (next-turn model context), the session's
# public conversation_item_added stream, the speech_handle's
# observable item list, or the OTel response-text span attribute
# that tracing backends would log. The audio output above is
# intentionally unaffected — rendering to the user is the point.
msg = _create_assistant_message(
message_id=msg_gen.message_id,
forwarded_text=forwarded_text,
Expand Down Expand Up @@ -3345,9 +3408,14 @@ def _create_assistant_message(
generate_tool_reply = True
fnc_executed_ev._reply_required = True

# add tool output to the chat context
self._agent._chat_ctx._upsert_item(sanitized_out.fnc_call_out)
self._session._tool_items_added([sanitized_out.fnc_call_out])
# add tool output to the chat context (suppressed for
# add_to_chat_ctx=False; tools are forced off at the plugin
# layer for isolated turns, so this branch is defense-in-
# depth in case a future plugin does not honor the
# tool-override contract).
if add_to_chat_ctx:
self._agent._chat_ctx._upsert_item(sanitized_out.fnc_call_out)
self._session._tool_items_added([sanitized_out.fnc_call_out])

if new_agent_task is not None and sanitized_out.agent_task is not None:
logger.error(
Expand Down
20 changes: 20 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,7 @@ def generate_reply(
*,
user_input: NotGivenOr[str | llm.ChatMessage] = NOT_GIVEN,
instructions: NotGivenOr[str | Instructions] = NOT_GIVEN,
add_to_chat_ctx: bool = True,
tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
tools: NotGivenOr[list[str]] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
Expand All @@ -1145,6 +1146,17 @@ def generate_reply(
user_input (NotGivenOr[str | llm.ChatMessage], optional): The user's input that may influence the reply,
such as answering a question.
instructions (NotGivenOr[str], optional): Additional instructions for generating the reply.
add_to_chat_ctx (bool, optional): When ``False`` and the underlying LLM is a
``RealtimeModel`` whose plugin declares ``RealtimeCapabilities.ephemeral_response=True``,
the rendered assistant turn is not written to the agent's local chat context AND is
generated in a way that does not pollute the substrate's persistent conversation state.
Plugins that do not declare the capability emit a ``DeprecationWarning`` and fall back to
``add_to_chat_ctx=True`` behavior. Combining ``add_to_chat_ctx=False`` with a non-realtime
LLM raises ``NotImplementedError``. Default ``True`` preserves existing behavior.

Concurrency: a session allows at most one in-flight isolated call at a time. Issuing a
second isolated call while the first is in flight raises ``RuntimeError``; await the
prior call's future first.
tool_choice (NotGivenOr[llm.ToolChoice], optional): Specifies the external tool to use when
generating the reply. If generate_reply is invoked within a function_tool, defaults to "none".
tools (NotGivenOr[list[str]], optional): List of tool IDs to make available for this response.
Expand All @@ -1159,6 +1171,13 @@ def generate_reply(
Returns:
SpeechHandle: A handle to the generated reply.
""" # noqa: E501
if not add_to_chat_ctx and not isinstance(self._llm, llm.RealtimeModel):
raise NotImplementedError(
"add_to_chat_ctx=False is only supported for RealtimeModel "
"(out-of-band response generation). Pipeline LLM dispatch with "
"add_to_chat_ctx=False is not yet implemented; file an issue if "
"you need this."
)
if self._activity is None:
raise RuntimeError("AgentSession isn't running")

Expand All @@ -1183,6 +1202,7 @@ def generate_reply(
handle = activity._generate_reply(
user_message=user_message if user_message else None,
instructions=instructions,
add_to_chat_ctx=add_to_chat_ctx,
tool_choice=tool_choice,
tools=tools,
allow_interruptions=allow_interruptions,
Expand Down
Loading
Loading