diff --git a/backend/app/services/llm/caller.py b/backend/app/services/llm/caller.py index d3a3b66d1..f8235a546 100644 --- a/backend/app/services/llm/caller.py +++ b/backend/app/services/llm/caller.py @@ -15,7 +15,7 @@ import json import uuid from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from loguru import logger from sqlalchemy import select @@ -27,6 +27,8 @@ from .client import LLMError from .failover import classify_error, FailoverErrorType +from .json_recovery import canonicalize_tool_arguments +from .tool_result_shaping import shape_tool_result from .utils import LLMMessage, create_llm_client, get_max_tokens, get_model_api_key if TYPE_CHECKING: @@ -39,6 +41,10 @@ "send_message_to_agent", "send_feishu_message", "send_email" }) +# Cap for any single tool-result entry sent into LLM history. +# Phase 1 uses a constant; Phase 2 will make this per-agent configurable. +TOOL_RESULT_MAX_CHARS = 20_000 + # ═══════════════════════════════════════════════════════════════════════════════ # Failover Guard @@ -193,6 +199,47 @@ def _check_tool_requires_args(tool_name: str, args: dict) -> tuple[bool, str]: return True, "" +def _canonicalize_tc_arguments(tc: dict, session_id: str) -> dict[str, Any]: + """Canonicalize ``tc['function']['arguments']`` in place and return the parsed dict. + + The canonical JSON is written back to ``tc['function']['arguments']`` so that + any subsequent LLM round receiving this ``tc`` in conversation history will + pass DashScope's ``function.arguments must be in JSON format`` validation. + Used by both in-flight tool loops (_process_tool_call and _try_model). + """ + fn = tc["function"] + tool_name = fn["name"] + raw_args = fn.get("arguments", "{}") + args, canonical_args, repair_method = canonicalize_tool_arguments(raw_args) + fn["arguments"] = canonical_args + if repair_method != "clean": + logger.warning( + f"[LLM] tool_call args repaired: tool={tool_name} method={repair_method} " + f"orig_len={len(raw_args)} new_len={len(canonical_args)} session={session_id}" + ) + return args + + +def _shape_tool_content_for_context(tool_content, tool_name: str, session_id: str): + """Return tool_content capped at TOOL_RESULT_MAX_CHARS (string content only). + + Vision content is always a list[dict] per vision_inject.try_inject_screenshot_vision — + we pass those through unchanged to preserve base64 image data. + """ + # Invariant: vision content is always a list[dict]; see vision_inject.try_inject_screenshot_vision. + if not isinstance(tool_content, str): + return tool_content + shaped, was_truncated = shape_tool_result(tool_content, TOOL_RESULT_MAX_CHARS) + if was_truncated: + dropped = len(tool_content) - len(shaped) + logger.warning( + f"[LLM] tool_result truncated: tool={tool_name} " + f"orig_len={len(tool_content)} new_len={len(shaped)} " + f"dropped={dropped} session={session_id}" + ) + return shaped + + async def _process_tool_call( tc: dict, api_messages: list, @@ -204,15 +251,11 @@ async def _process_tool_call( full_reasoning_content: str, ) -> str: """Process a single tool call and return result.""" - fn = tc["function"] - tool_name = fn["name"] - raw_args = fn.get("arguments", "{}") - logger.info(f"[LLM] Calling tool: {tool_name}({json.dumps(raw_args, ensure_ascii=False)[:100]})") + raw_args = tc["function"].get("arguments", "{}") + logger.info(f"[LLM] Calling tool: {tc['function']['name']}({json.dumps(raw_args, ensure_ascii=False)[:100]})") - try: - args = json.loads(raw_args) if raw_args else {} - except json.JSONDecodeError: - args = {} + args = _canonicalize_tc_arguments(tc, session_id) + tool_name = tc["function"]["name"] # Guard: check if tool requires arguments should_execute, error_msg = _check_tool_requires_args(tool_name, args) @@ -268,6 +311,8 @@ async def _process_tool_call( except Exception: pass + tool_content = _shape_tool_content_for_context(tool_content, tool_name, session_id) + api_messages.append(LLMMessage( role="tool", tool_call_id=tc["id"], @@ -404,6 +449,9 @@ async def call_llm( logger.info(f"[LLM] Round {round_i+1}: {len(response.tool_calls)} tool call(s)") # Add assistant message with tool calls + # NB: tc["function"] is shared by reference with _canonicalize_tc_arguments's + # in-place canonicalization — must stay as a reference (no deepcopy), or + # history entries will carry the pre-repair malformed arguments. api_messages.append(LLMMessage( role="assistant", content=response.content or None, @@ -734,9 +782,12 @@ async def _try_model(model: LLMModel) -> tuple[str, bool, bool]: if agent_id and _accumulated_tokens > 0: await record_token_usage(agent_id, _accumulated_tokens) await client.close() - return response.content or "[Empty response]", True + return response.content or "[Empty response]", True, tool_executed # Execute tool calls + # NB: tc["function"] is shared by reference with _canonicalize_tc_arguments's + # in-place canonicalization — must stay as a reference (no deepcopy), or + # history entries will carry the pre-repair malformed arguments. api_messages.append(LLMMessage( role="assistant", content=response.content or None, @@ -749,13 +800,8 @@ async def _try_model(model: LLMModel) -> tuple[str, bool, bool]: )) for tc in response.tool_calls: - fn = tc["function"] - tool_name = fn["name"] - raw_args = fn.get("arguments", "{}") - try: - args = json.loads(raw_args) if raw_args else {} - except json.JSONDecodeError: - args = {} + args = _canonicalize_tc_arguments(tc, session_id) + tool_name = tc["function"]["name"] tool_executed = True result = await execute_tool( @@ -764,10 +810,11 @@ async def _try_model(model: LLMModel) -> tuple[str, bool, bool]: user_id=agent.creator_id, session_id=session_id, ) + shaped_content = _shape_tool_content_for_context(str(result), tool_name, session_id) api_messages.append(LLMMessage( role="tool", tool_call_id=tc["id"], - content=str(result), + content=shaped_content, )) if agent_id and _accumulated_tokens > 0: diff --git a/backend/app/services/llm/json_recovery.py b/backend/app/services/llm/json_recovery.py new file mode 100644 index 000000000..f98d9741a --- /dev/null +++ b/backend/app/services/llm/json_recovery.py @@ -0,0 +1,161 @@ +"""Tool-call JSON argument recovery and canonicalization. + +LLM streaming sometimes produces slightly malformed JSON for tool_call.arguments: +trailing commas, unescaped control characters in string values, truncated tokens. +DashScope validates this field strictly server-side and rejects the request with +HTTP 400 `function.arguments parameter must be in JSON format` on the NEXT round. + +`canonicalize_tool_arguments` accepts any raw string and returns a parsed dict +plus a canonical JSON string that is guaranteed to round-trip through +`json.loads`. It never raises. + +Repair methods reported back to callers: + +- ``"clean"`` — ``json.loads`` succeeded on the raw input and it was a dict. +- ``"trailing_comma"`` — succeeded after stripping trailing commas before + ``}`` or ``]`` (string-aware so commas inside string literals are kept). +- ``"control_char_escape"`` — succeeded after escaping unescaped control + characters inside JSON string values. +- ``"non_dict_coerced"`` — a parse attempt succeeded but produced a non-dict + top-level value (list, scalar, ``null``). Coerced to ``{}``. Callers + should log/alert on this because real user data was dropped. +- ``"failed"`` — every repair attempt raised ``json.JSONDecodeError``. + Returns ``{}`` / ``"{}"``. +""" +from __future__ import annotations + +import json +from typing import Any + + +def _strip_trailing_commas(s: str) -> str: + """Remove trailing commas before } or ] — but only when OUTSIDE a JSON string. + + Walks the input char by char so that a comma inside a string literal + (e.g. `"hello,}"`) is not confused with a trailing comma in the outer + structure. + """ + out: list[str] = [] + in_string = False + escape_next = False + i = 0 + n = len(s) + while i < n: + ch = s[i] + if escape_next: + out.append(ch) + escape_next = False + i += 1 + continue + if ch == '\\' and in_string: + out.append(ch) + escape_next = True + i += 1 + continue + if ch == '"': + in_string = not in_string + out.append(ch) + i += 1 + continue + if not in_string and ch == ',': + # Peek ahead past whitespace to see if next non-ws is } or ] + j = i + 1 + while j < n and s[j] in ' \t\n\r': + j += 1 + if j < n and s[j] in '}]': + # Drop the comma, keep the whitespace + i += 1 + continue + out.append(ch) + i += 1 + return ''.join(out) + + +def _escape_control_chars_in_strings(s: str) -> str: + """Scan through string and escape unescaped control chars inside JSON string values. + + We can't do this by simple regex because we only want to escape control + chars *inside string values*, not outside. Walk char by char tracking + whether we're inside a string. + """ + out: list[str] = [] + in_string = False + escape_next = False + for ch in s: + if escape_next: + out.append(ch) + escape_next = False + continue + if ch == '\\' and in_string: + out.append(ch) + escape_next = True + continue + if ch == '"': + in_string = not in_string + out.append(ch) + continue + if in_string and ord(ch) < 0x20: + # Escape control chars per JSON spec + if ch == '\n': + out.append('\\n') + elif ch == '\r': + out.append('\\r') + elif ch == '\t': + out.append('\\t') + elif ch == '\b': + out.append('\\b') + elif ch == '\f': + out.append('\\f') + else: + out.append(f'\\u{ord(ch):04x}') + continue + out.append(ch) + return ''.join(out) + + +def canonicalize_tool_arguments(raw: str) -> tuple[dict[str, Any], str, str]: + """Parse and canonicalize a raw tool_call.arguments string. + + Returns: + (parsed_dict, canonical_json_string, repair_method) + + repair_method is one of: "clean", "trailing_comma", "control_char_escape", + "non_dict_coerced", "failed". Never raises. + """ + if not raw: + return {}, "{}", "clean" + + # Attempt 1: clean parse + try: + parsed = json.loads(raw) + if not isinstance(parsed, dict): + return {}, "{}", "non_dict_coerced" + canonical = json.dumps(parsed, ensure_ascii=False) + return parsed, canonical, "clean" + except json.JSONDecodeError: + pass + + # Attempt 2: strip trailing commas + cleaned = _strip_trailing_commas(raw) + try: + parsed = json.loads(cleaned) + if not isinstance(parsed, dict): + return {}, "{}", "non_dict_coerced" + canonical = json.dumps(parsed, ensure_ascii=False) + return parsed, canonical, "trailing_comma" + except json.JSONDecodeError: + pass + + # Attempt 3: escape unescaped control chars in strings, then retry + escaped = _escape_control_chars_in_strings(cleaned) + try: + parsed = json.loads(escaped) + if not isinstance(parsed, dict): + return {}, "{}", "non_dict_coerced" + canonical = json.dumps(parsed, ensure_ascii=False) + return parsed, canonical, "control_char_escape" + except json.JSONDecodeError: + pass + + # Gave up — return safe empty + return {}, "{}", "failed" diff --git a/backend/app/services/llm/tool_result_shaping.py b/backend/app/services/llm/tool_result_shaping.py new file mode 100644 index 000000000..de7dbd74c --- /dev/null +++ b/backend/app/services/llm/tool_result_shaping.py @@ -0,0 +1,43 @@ +"""Shape oversized tool results to stay within per-call size budget. + +A single tool result (e.g. a long a long tool output JSON, or an +`execute_code` stdout dump) can exceed 50KB. Accumulating many such results +across 10+ tool rounds blows past Qwen3.5-plus's ~983k-char input limit and +causes `HTTP 400: Range of input length should be [1, 983616]`. + +This module applies a head+tail truncation with an explicit marker so the +LLM can see that truncation happened and ask for more if needed. + +A degenerate budget (``max_chars <= 0``) returns an empty string, with +``was_truncated=True`` iff the input was non-empty. +""" +from __future__ import annotations + + +def shape_tool_result(result, max_chars: int) -> tuple[str, bool]: + """Return (possibly-truncated string, was_truncated). + + Strategy for oversized results: keep ~60% head and ~30% tail, with a + marker in between describing how much was dropped. Total output stays + within max_chars plus a small marker overhead (~120 chars). + + Edge case: if ``max_chars <= 0`` the budget is degenerate — there is no + room for any content (nor for the marker itself), so an empty string is + returned, with ``was_truncated=True`` iff the input was non-empty. + """ + s = str(result) if not isinstance(result, str) else result + if max_chars <= 0: + # Degenerate budget — treat as "drop everything", no marker (it would + # exceed max_chars itself). was_truncated reflects whether any content + # was actually dropped. + return "", len(s) > 0 + if len(s) <= max_chars: + return s, False + + # Budget split: 60% head, 30% tail, 10% safety + head_budget = int(max_chars * 0.60) + tail_budget = int(max_chars * 0.30) + dropped = len(s) - head_budget - tail_budget + marker = f"\n\n[... truncated: {dropped:,} chars omitted (head {head_budget:,} + tail {tail_budget:,} kept) ...]\n\n" + + return s[:head_budget] + marker + s[-tail_budget:], True diff --git a/backend/tests/test_llm_caller_integration.py b/backend/tests/test_llm_caller_integration.py new file mode 100644 index 000000000..c7abcc411 --- /dev/null +++ b/backend/tests/test_llm_caller_integration.py @@ -0,0 +1,140 @@ +"""Integration tests for caller._process_tool_call normalization.""" +import json +import pytest +from unittest.mock import patch + +from app.services.llm.caller import _process_tool_call + + +@pytest.mark.asyncio +async def test_process_tool_call_canonicalizes_malformed_arguments(): + """Malformed arguments (trailing comma) must be rewritten to valid JSON + on tc['function']['arguments'] so later LLM rounds get clean history.""" + tc = { + "id": "call_1", + "function": { + "name": "read_file", + # Trailing comma — Qwen streaming produces this sometimes + "arguments": '{"path": "foo.md",}', + }, + } + api_messages: list = [] + + async def fake_execute_tool(name, args, **kwargs): + assert name == "read_file" + assert args == {"path": "foo.md"} + return "file contents here" + + with patch("app.services.llm.caller.execute_tool", side_effect=fake_execute_tool): + await _process_tool_call( + tc=tc, + api_messages=api_messages, + agent_id="agent-1", + user_id="user-1", + session_id="sess-1", + supports_vision=False, + on_tool_call=None, + full_reasoning_content="", + ) + + # CRITICAL: arguments on the tc object must now be valid JSON + repaired = tc["function"]["arguments"] + parsed = json.loads(repaired) + assert parsed == {"path": "foo.md"} + # And it must not have a trailing comma + assert ",}" not in repaired.replace(" ", "") + + +@pytest.mark.asyncio +async def test_process_tool_call_truncates_oversized_result(): + """Tool results over the cap must be head+tail truncated in the + api_messages tool-result entry.""" + tc = { + "id": "call_1", + "function": {"name": "example_tool", "arguments": '{"command": ["report", "list"]}'}, + } + api_messages: list = [] + huge_result = "A" * 200_000 # 200KB result + + async def fake_execute_tool(name, args, **kwargs): + return huge_result + + with patch("app.services.llm.caller.execute_tool", side_effect=fake_execute_tool): + await _process_tool_call( + tc=tc, + api_messages=api_messages, + agent_id="agent-1", + user_id="user-1", + session_id="sess-1", + supports_vision=False, + on_tool_call=None, + full_reasoning_content="", + ) + + tool_msg = api_messages[-1] + # Stored content should be capped and contain the truncation marker + content = tool_msg.content if isinstance(tool_msg.content, str) else str(tool_msg.content) + from app.services.llm.caller import TOOL_RESULT_MAX_CHARS + # Cap is TOOL_RESULT_MAX_CHARS + small marker overhead (~120 chars). + assert len(content) <= TOOL_RESULT_MAX_CHARS + 500 + assert "truncated" in content.lower() + + +@pytest.mark.asyncio +async def test_process_tool_call_clean_arguments_pass_through_unchanged_semantic(): + """Clean JSON must still work exactly as before (backwards compat).""" + tc = { + "id": "call_1", + "function": { + "name": "read_file", + "arguments": '{"path": "foo.md"}', + }, + } + api_messages: list = [] + + async def fake_execute_tool(name, args, **kwargs): + assert args == {"path": "foo.md"} + return "ok" + + with patch("app.services.llm.caller.execute_tool", side_effect=fake_execute_tool): + await _process_tool_call( + tc=tc, api_messages=api_messages, + agent_id="agent-1", user_id="user-1", session_id="sess-1", + supports_vision=False, on_tool_call=None, full_reasoning_content="", + ) + + # Semantic equivalence (key order / spacing may differ) + assert json.loads(tc["function"]["arguments"]) == {"path": "foo.md"} + + +def test_canonicalize_tc_arguments_helper_rewrites_tc_inplace(): + """Unit test the helper directly — exercised by both _process_tool_call + and call_agent_llm_with_tools._try_model.""" + from app.services.llm.caller import _canonicalize_tc_arguments + tc = { + "id": "call_1", + "function": {"name": "read_file", "arguments": '{"path": "foo.md",}'}, + } + args = _canonicalize_tc_arguments(tc, session_id="sess-x") + assert args == {"path": "foo.md"} + # In-place mutation: tc now carries canonical JSON + import json + parsed = json.loads(tc["function"]["arguments"]) + assert parsed == {"path": "foo.md"} + + +def test_shape_tool_content_for_context_bypasses_list_content(): + """Vision content (list of parts) must pass through untouched.""" + from app.services.llm.caller import _shape_tool_content_for_context + vision_content = [{"type": "text", "text": "see image"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}] + out = _shape_tool_content_for_context(vision_content, "screenshot", "sess-x") + assert out is vision_content # identity — no copy, no mutation + + +def test_shape_tool_content_for_context_truncates_long_string(): + """Oversized string content is capped.""" + from app.services.llm.caller import _shape_tool_content_for_context, TOOL_RESULT_MAX_CHARS + out = _shape_tool_content_for_context("A" * 200_000, "example_tool", "sess-x") + assert isinstance(out, str) + assert len(out) <= TOOL_RESULT_MAX_CHARS + 500 + assert "truncated" in out.lower() diff --git a/backend/tests/test_llm_json_recovery.py b/backend/tests/test_llm_json_recovery.py new file mode 100644 index 000000000..a82f89429 --- /dev/null +++ b/backend/tests/test_llm_json_recovery.py @@ -0,0 +1,118 @@ +"""Unit tests for tool_call.arguments JSON recovery helpers.""" +from app.services.llm.json_recovery import canonicalize_tool_arguments + + +def test_clean_json_passes_through(): + raw = '{"path": "foo.md", "content": "hello"}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"path": "foo.md", "content": "hello"} + assert method == "clean" + # canonical is still valid JSON and round-trips + import json + assert json.loads(canonical) == parsed + + +def test_trailing_comma_in_object_is_repaired(): + raw = '{"path": "foo.md", "content": "hi",}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"path": "foo.md", "content": "hi"} + assert method == "trailing_comma" + import json + assert json.loads(canonical) == parsed + + +def test_trailing_comma_in_array_is_repaired(): + raw = '{"items": [1, 2, 3,]}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"items": [1, 2, 3]} + assert method == "trailing_comma" + + +def test_unescaped_newline_inside_string_is_repaired(): + # Qwen streaming sometimes produces raw \n inside a string value + raw = '{"content": "line1\nline2"}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"content": "line1\nline2"} + assert method == "control_char_escape" + import json + # canonical round-trip preserves semantic content + assert json.loads(canonical)["content"] == "line1\nline2" + + +def test_unescaped_tab_inside_string_is_repaired(): + raw = '{"content": "a\tb"}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"content": "a\tb"} + assert method == "control_char_escape" + + +def test_unicode_is_preserved_without_escaping(): + raw = '{"content": "你好世界测试"}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"content": "你好世界测试"} + assert method == "clean" + # canonical must keep Chinese chars unescaped (ensure_ascii=False) + assert "你好" in canonical + + +def test_empty_string_yields_empty_dict(): + parsed, canonical, method = canonicalize_tool_arguments("") + assert parsed == {} + assert canonical == "{}" + assert method == "clean" + + +def test_hopelessly_broken_returns_failed(): + raw = '{"path": "foo" "content": }' # totally broken + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {} + assert canonical == "{}" + assert method == "failed" + + +def test_canonical_is_always_valid_json_even_on_failure(): + """Invariant: canonical output must always be parseable JSON.""" + import json + for raw in [ + '', + '{"a": 1}', + '{"a": 1,}', + '{"a": "b\nc"}', + 'not json at all', + '{"broken', + None, + ]: + _, canonical, _ = canonicalize_tool_arguments(raw or "") + # Must not raise + json.loads(canonical) + + +def test_trailing_comma_inside_string_value_is_not_stripped(): + """Regression: regex-based stripping would silently corrupt + `{"a": "hello,}", "b": 1,}` by eating the comma inside the string value. + The string-aware walker must preserve string content exactly.""" + raw = '{"a": "hello,}", "b": 1,}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"a": "hello,}", "b": 1} + assert method == "trailing_comma" + import json + assert json.loads(canonical) == parsed + + +def test_trailing_comma_inside_escaped_string_not_stripped(): + """Escaped quote inside a string must not end the string prematurely.""" + raw = '{"a": "he said \\"hi,}\\"", "b": 1,}' + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {"a": 'he said "hi,}"', "b": 1} + assert method == "trailing_comma" + + +def test_non_dict_top_level_is_coerced_with_explicit_method(): + """A top-level array or scalar is not a valid tool_call args object. + Must be coerced to {} AND reported as non_dict_coerced (not "clean") + so observability can flag the data-loss event.""" + for raw in ['[1, 2, 3]', '"just a string"', '42', 'null']: + parsed, canonical, method = canonicalize_tool_arguments(raw) + assert parsed == {} + assert canonical == "{}" + assert method == "non_dict_coerced", f"raw={raw!r} got method={method}" diff --git a/backend/tests/test_llm_tool_result_shaping.py b/backend/tests/test_llm_tool_result_shaping.py new file mode 100644 index 000000000..4dcada4b2 --- /dev/null +++ b/backend/tests/test_llm_tool_result_shaping.py @@ -0,0 +1,80 @@ +"""Unit tests for tool result size-shaping.""" +from app.services.llm.tool_result_shaping import shape_tool_result + + +def test_short_result_passes_through_unchanged(): + short = "hello world" + out, truncated = shape_tool_result(short, max_chars=1000) + assert out == short + assert truncated is False + + +def test_exactly_at_limit_passes_through(): + s = "x" * 1000 + out, truncated = shape_tool_result(s, max_chars=1000) + assert out == s + assert truncated is False + + +def test_oversized_result_is_truncated_with_marker(): + s = "A" * 500 + "B" * 2000 + "C" * 500 + out, truncated = shape_tool_result(s, max_chars=1000) + assert truncated is True + assert len(out) < len(s) + # Marker is present and mentions how much was dropped + assert "truncated" in out.lower() + # Head (starts with A) and tail (ends with C) both preserved + assert out.startswith("A") + assert out.endswith("C") + + +def test_marker_reports_dropped_char_count(): + s = "A" * 10_000 + out, truncated = shape_tool_result(s, max_chars=1000) + assert truncated is True + # The marker should contain the number of dropped characters + assert "9" in out # ~9000 dropped + + +def test_zero_budget_returns_empty(): + """max_chars=0 is a degenerate budget; return empty string and + report truncation when any content was dropped.""" + out, truncated = shape_tool_result("hello", 0) + assert out == "" + assert truncated is True + + +def test_zero_budget_with_empty_input(): + """Empty input under a zero budget is still not truncation.""" + out, truncated = shape_tool_result("", 0) + assert out == "" + assert truncated is False + + +def test_negative_budget_degenerates_gracefully(): + """Negative max_chars should not silently produce garbage. + Regression: prior regex-free slicing produced overlapping slices + with a lying marker (output longer than input).""" + out, truncated = shape_tool_result("hello world", -5) + assert out == "" + assert truncated is True + + +def test_output_length_respects_budget(): + s = "x" * 100_000 + out, truncated = shape_tool_result(s, max_chars=1000) + # Output should be <= max_chars + reasonable marker overhead (~200 chars) + assert len(out) <= 1000 + 200 + assert truncated is True + + +def test_empty_result(): + out, truncated = shape_tool_result("", max_chars=1000) + assert out == "" + assert truncated is False + + +def test_non_string_coerced_to_string(): + out, truncated = shape_tool_result(12345, max_chars=1000) + assert out == "12345" + assert truncated is False