Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from livekit.plugins.google.realtime.api_proto import ClientEvents, LiveAPIModels, Voice

from ..log import logger
from ..utils import create_tools_config, get_tool_results_for_realtime
from ..utils import create_function_response, create_tools_config, get_tool_results_for_realtime
from ..version import __version__

INPUT_AUDIO_SAMPLE_RATE = 16000
Expand Down Expand Up @@ -148,6 +148,7 @@ class _RealtimeOptions:
api_version: NotGivenOr[str] = NOT_GIVEN
tool_behavior: NotGivenOr[types.Behavior] = NOT_GIVEN
tool_response_scheduling: NotGivenOr[types.FunctionResponseScheduling] = NOT_GIVEN
tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN
thinking_config: NotGivenOr[types.ThinkingConfig] = NOT_GIVEN
session_resumption: NotGivenOr[types.SessionResumptionConfig] = NOT_GIVEN
credentials: google.auth.credentials.Credentials | None = None
Expand Down Expand Up @@ -488,6 +489,9 @@ def __init__(self, realtime_model: RealtimeModel) -> None:
self._session_should_close = asyncio.Event()
self._response_created_futures: dict[str, asyncio.Future[llm.GenerationCreatedEvent]] = {}
self._pending_generation_fut: asyncio.Future[llm.GenerationCreatedEvent] | None = None
# set while a rejected tool_choice="none" turn is draining its trailing events
# (which have no generation to attach to), cleared when the next generation starts.
self._rejected_tool_turn = False

self._session_resumption_handle: str | None = (
self._opts.session_resumption.handle
Expand Down Expand Up @@ -554,7 +558,19 @@ def update_options(
# no need to restart

if is_given(tool_choice):
logger.warning("tool_choice is not supported by the Google Realtime API.")
# no per-response tool_choice on Gemini; "none" is emulated by rejecting any tool
# call emitted during the turn (see _reject_tool_calls).
self._opts.tool_choice = tool_choice
if tool_choice == "none":
logger.warning(
"the Google Realtime API has no tool_choice='none'; tool calls emitted "
"this turn will be rejected so the model replies directly."
)
elif tool_choice not in (None, "auto"):
logger.warning(
f"tool_choice='{tool_choice}' is not supported by the Google Realtime API, "
"falling back to 'auto'."
)

if should_restart:
self._mark_restart_needed()
Expand Down Expand Up @@ -1016,6 +1032,13 @@ async def _recv_task(self, session: AsyncSession) -> None:
part["inline_data"] = "<audio>"
logger.debug("<<< received response", extra={"response": resp_copy})

if response.tool_call and self._opts.tool_choice == "none":
# reject without opening a generation, so the pending generate_reply
# stays bound to the model's eventual reply and tools stay suppressed
# for the whole turn.
self._reject_tool_calls(response.tool_call.function_calls or [])
continue

if not self._current_generation or self._current_generation._done:
if (sc := response.server_content) and sc.interrupted:
# two cases an interrupted event is sent without an active generation
Expand Down Expand Up @@ -1133,6 +1156,7 @@ def _build_connect_config(self) -> types.LiveConnectConfig:
return conf

def _start_new_generation(self) -> None:
self._rejected_tool_turn = False
if self._current_generation and not self._current_generation._done:
logger.warning("starting new generation while another is active. Finalizing previous.")
self._mark_current_generation_done()
Expand Down Expand Up @@ -1184,7 +1208,13 @@ def _start_new_generation(self) -> None:
def _handle_server_content(self, server_content: types.LiveServerContent) -> None:
current_gen = self._current_generation
if not current_gen:
logger.warning("received server content but no active generation.")
if self._rejected_tool_turn:
logger.debug(
"ignoring server content from a rejected tool call turn",
extra={"server_content": server_content.model_dump_json(exclude_none=True)},
)
else:
logger.warning("received server content but no active generation.")
return

if model_turn := server_content.model_turn:
Expand Down Expand Up @@ -1302,6 +1332,30 @@ def _handle_input_speech_stopped(self) -> None:
llm.InputSpeechStoppedEvent(user_transcription_enabled=False),
)

def _reject_tool_calls(self, function_calls: list[types.FunctionCall]) -> None:
responses = [
create_function_response(
llm.FunctionCallOutput(
name=fnc_call.name or "",
call_id=fnc_call.id or "",
output="Tool calls are disabled for this turn, respond to the user directly.",
is_error=True,
),
vertexai=self._opts.vertexai,
tool_response_scheduling=self._opts.tool_response_scheduling,
)
for fnc_call in function_calls
]
if not responses:
return

logger.warning(
"rejecting tool call requested while tool_choice='none'",
extra={"functions": [fnc_call.name for fnc_call in function_calls]},
)
self._send_client_event(types.LiveClientToolResponse(function_responses=responses))
self._rejected_tool_turn = True

def _handle_tool_calls(self, tool_call: types.LiveServerToolCall) -> None:
if not self._current_generation:
logger.warning("received tool call but no active generation.")
Expand Down Expand Up @@ -1331,7 +1385,10 @@ def _handle_tool_call_cancellation(
def _handle_usage_metadata(self, usage_metadata: types.UsageMetadata) -> None:
current_gen = self._current_generation
if not current_gen:
logger.warning("no active generation to report metrics for")
if self._rejected_tool_turn:
logger.debug("ignoring usage metadata from a rejected tool call turn")
else:
logger.warning("no active generation to report metrics for")
return

ttft = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,40 @@ def create_tools_config(
return gemini_tools


def create_function_response(
output: llm.FunctionCallOutput,
*,
vertexai: bool = False,
tool_response_scheduling: NotGivenOr[types.FunctionResponseScheduling] = NOT_GIVEN,
) -> types.FunctionResponse:
res = types.FunctionResponse(
name=output.name,
response={"error": output.output} if output.is_error else {"output": output.output},
Comment thread
longcw marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Behavioral change in error response format for get_tool_results_for_realtime

The refactoring of create_function_response introduces a behavioral change: when is_error=True, the function response dict key changes from {"output": msg} (old behavior in get_tool_results_for_realtime) to {"error": msg} (new behavior). This affects all tool execution failures sent via update_chat_ctxget_tool_results_for_realtime. While likely intentional (and arguably more correct since it signals errors differently to the model), this is a semantic change to an existing code path that could subtly affect model behavior for error-case tool responses. The Gemini API's FunctionResponse.response field is a generic dict, so the key name is what the model "sees" — changing it from "output" to "error" may change how the model interprets failed tool calls.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

)
if is_given(tool_response_scheduling):
# vertexai currently doesn't support the scheduling parameter, gemini api defaults to idle
# it's the user's responsibility to avoid this parameter when using vertexai
res.scheduling = tool_response_scheduling
if not vertexai:
# vertexai does not support id in FunctionResponse
# see: https://github.com/googleapis/python-genai/blob/85e00bc/google/genai/_live_converters.py#L1435
res.id = output.call_id
return res


def get_tool_results_for_realtime(
chat_ctx: llm.ChatContext,
*,
vertexai: bool = False,
tool_response_scheduling: NotGivenOr[types.FunctionResponseScheduling] = NOT_GIVEN,
) -> types.LiveClientToolResponse | None:
function_responses: list[types.FunctionResponse] = []
for msg in chat_ctx.items:
if msg.type == "function_call_output":
res = types.FunctionResponse(
name=msg.name,
response={"output": msg.output},
)
if is_given(tool_response_scheduling):
# vertexai currently doesn't support the scheduling parameter, gemini api defaults to idle
# it's the user's responsibility to avoid this parameter when using vertexai
res.scheduling = tool_response_scheduling
if not vertexai:
# vertexai does not support id in FunctionResponse
# see: https://github.com/googleapis/python-genai/blob/85e00bc/google/genai/_live_converters.py#L1435
res.id = msg.call_id
function_responses.append(res)
function_responses = [
create_function_response(
msg, vertexai=vertexai, tool_response_scheduling=tool_response_scheduling
)
for msg in chat_ctx.items
if msg.type == "function_call_output"
]
return (
types.LiveClientToolResponse(function_responses=function_responses)
if function_responses
Expand Down