Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions backend/alembic/versions/add_context_window_tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""add agents.context_window_tokens for token-aware history truncation

Revision ID: add_context_window_tokens
Revises: rm_agent_credential_secrets
Create Date: 2026-04-27
"""

from typing import Sequence, Union

from alembic import op


# revision identifiers, used by Alembic.
revision: str = "add_context_window_tokens"
down_revision: Union[str, Sequence[str], None] = "rm_agent_credential_secrets"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add context_window_tokens with a DDL default of 50000.

The four-step pattern is required because earlier in the migration chain,
``alembic/versions/0000_initial_schema.py`` calls
``Base.metadata.create_all(checkfirst=True)``, which creates ``agents``
from the *current* model state — including any new columns. SQLAlchemy's
Python-side ``default=`` does NOT translate to a DDL ``DEFAULT`` clause,
so the column ends up ``NOT NULL`` with no default, and a naive
``ADD COLUMN IF NOT EXISTS ... DEFAULT 50000`` short-circuits and never
sets the default.

This four-step approach is idempotent regardless of pre-existing state:
- column missing → created (nullable, no default initially)
- column present without default → default set
- any rows with NULL → backfilled to 50000
- column made NOT NULL

Re-runnable: ALTER SET DEFAULT to the same value is a no-op; UPDATE
affecting 0 rows is a no-op; ALTER SET NOT NULL on an already-NOT-NULL
column is a no-op.
"""
# 1. Add the column if missing — do NOT specify NOT NULL or DEFAULT here,
# so existing rows (if any from create_all) aren't blocked.
op.execute(
"ALTER TABLE agents ADD COLUMN IF NOT EXISTS context_window_tokens INTEGER"
)
# 2. Ensure the DDL default is set so future inserts that omit this
# column (raw SQL, restored backups, manual migrations) get 50000.
op.execute(
"ALTER TABLE agents ALTER COLUMN context_window_tokens SET DEFAULT 50000"
)
# 3. Backfill any rows that were created before the default landed.
op.execute(
"UPDATE agents SET context_window_tokens = 50000 "
"WHERE context_window_tokens IS NULL"
)
# 4. Now safe to enforce NOT NULL.
op.execute(
"ALTER TABLE agents ALTER COLUMN context_window_tokens SET NOT NULL"
)


def downgrade() -> None:
# Downgrade omitted — dropping the column would lose per-tenant tuning.
pass
20 changes: 16 additions & 4 deletions backend/app/api/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from app.models.identity import IdentityProvider
from app.schemas.schemas import ChannelConfigCreate, ChannelConfigOut, TokenResponse, UserOut
from app.services.feishu_service import feishu_service
from app.services.history_window import truncate_by_token_budget

router = APIRouter(tags=["feishu"])

Expand Down Expand Up @@ -656,11 +657,12 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession
)
_pre_sess = _pre_sess_r.scalar_one_or_none()
_history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id
# Load extra raw material so app-level token-aware helper has room to choose
history_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
.limit(max(ctx_size, 500))
)
history_msgs = history_result.scalars().all()
history = _build_llm_history_from_chat_messages(list(reversed(history_msgs)))
Expand Down Expand Up @@ -1373,11 +1375,12 @@ async def _handle_feishu_file(
# Load conversation history for LLM context
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE
# Load extra raw material so app-level token-aware helper has room to choose
_hist_r = await db.execute(
_select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
.limit(max(ctx_size, 500))
)
_history = _build_llm_history_from_chat_messages(list(reversed(_hist_r.scalars().all())))

Expand Down Expand Up @@ -1631,10 +1634,19 @@ async def _call_agent_llm(

# Build conversation messages (without system prompt — call_llm adds it)
messages: list[dict] = []
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE, DEFAULT_CONTEXT_WINDOW_TOKENS
ctx_size = agent.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE
tok_budget = getattr(agent, "context_window_tokens", None) or DEFAULT_CONTEXT_WINDOW_TOKENS
if history:
messages.extend(_normalize_history_messages(history)[-ctx_size:])
# Pair-aware truncation: token budget primary, message count as safety cap.
# Today _normalize_history_messages drops DB role="tool_call" rows, so this
# path has no tool messages and the pair guard is a no-op; the safety kicks
# in once a feishu reorganization helper exists.
messages.extend(
truncate_by_token_budget(
_normalize_history_messages(history), tok_budget, message_cap=ctx_size,
)
)
messages.append({"role": "user", "content": user_text})

# Use actual user_id so the system prompt knows who it's chatting with
Expand Down
38 changes: 32 additions & 6 deletions backend/app/api/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from app.models.llm import LLMModel
from app.models.user import User
from app.services.chat_session_service import ensure_primary_platform_session
from app.services.history_window import truncate_by_token_budget
from app.services.llm import call_llm, call_llm_with_failover

router = APIRouter(tags=["websocket"])
Expand Down Expand Up @@ -213,7 +214,9 @@ async def websocket_chat(
role_description = agent.role_description or ""
welcome_message = agent.welcome_message or ""
ctx_size = agent.context_window_size or 100
logger.info(f"[WS] Agent: {agent_name}, type: {agent_type}, model_id: {agent.primary_model_id}, ctx: {ctx_size}")
from app.models.agent import DEFAULT_CONTEXT_WINDOW_TOKENS
tok_budget = getattr(agent, "context_window_tokens", None) or DEFAULT_CONTEXT_WINDOW_TOKENS
logger.info(f"[WS] Agent: {agent_name}, type: {agent_type}, model_id: {agent.primary_model_id}, ctx: {ctx_size}msg/{tok_budget}tok")

# Load the agent's primary model
if agent.primary_model_id:
Expand Down Expand Up @@ -299,11 +302,14 @@ async def websocket_chat(
logger.info(f"[WS] Selected primary session {conv_id}")

try:
# Load extra raw material so the app-level token-aware helper
# (truncate_by_token_budget below) has room to choose from.
_db_load_cap = max(ctx_size, 500)
history_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
.limit(_db_load_cap)
)
history_messages = list(reversed(history_result.scalars().all()))
logger.info(f"[WS] Loaded {len(history_messages)} history messages for session {conv_id}")
Expand Down Expand Up @@ -662,10 +668,30 @@ async def _call_with_failover():
async def _on_failover(reason: str):
await websocket.send_json({"type": "info", "content": f"Primary model error, {reason}"})

# To prevent tool call message pairs(assistant + tool) from being broken down.
_truncated = conversation[-ctx_size:]
while _truncated and _truncated[0].get("role") == "tool":
_truncated.pop(0)
# Pair-aware truncation with a token budget plus a message-count
# safety cap. Either bound stops the walk; pairs (assistant.tool_calls
# ↔ role=tool) are kept atomic. Token budget protects against
# one-tool-result-eats-the-window scenarios; message cap protects
# against pathological tiny-message floods. The pair guard fixes
# the orphan-tool failure mode reported in #446.
#
# The current user message (just appended at line ~416) is excluded
# from truncation and re-appended after — otherwise a single huge
# input (large paste, base64 image_data) could push past the budget
# and cause the helper to drop the very message we're answering.
# If the input itself exceeds the model's context, the provider will
# surface a clear error rather than silently dropping it here.
_current = (
conversation[-1]
if conversation and conversation[-1].get("role") == "user"
else None
)
_history = conversation[:-1] if _current is not None else conversation
_truncated = truncate_by_token_budget(
_history, tok_budget, message_cap=ctx_size,
)
if _current is not None:
_truncated.append(_current)

return await call_llm_with_failover(
primary_model=llm_model,
Expand Down
23 changes: 23 additions & 0 deletions backend/app/models/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
# (see: https://github.com/dataelement/Clawith/issues/238).
DEFAULT_CONTEXT_WINDOW_SIZE = 100

# Default token budget for in-context history. Conservative for 128K-context
# models after system prompt + soul/memory injection (~5-15K tokens). Per-agent
# override via Agent.context_window_tokens.
DEFAULT_CONTEXT_WINDOW_TOKENS = 50000


class Agent(Base):
"""Digital employee (Agent) instance.
Expand Down Expand Up @@ -81,6 +86,24 @@ class Agent(Base):
last_monthly_reset: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
tokens_used_total: Mapped[int] = mapped_column(Integer, default=0)
context_window_size: Mapped[int] = mapped_column(Integer, default=100)
# Token-aware secondary bound on history sent to the LLM. Truncation uses
# the smaller of context_window_size (message count) and this token budget,
# preserving assistant.tool_calls ↔ role=tool pairs intact.
#
# ``server_default`` matters: alembic/versions/0000_initial_schema.py uses
# ``Base.metadata.create_all`` which reads model state at runtime. Without
# a server_default, fresh-DB bootstrap would create this column NOT NULL
# without a DDL DEFAULT — and the ``ADD COLUMN IF NOT EXISTS`` migration
# later in the chain would short-circuit, leaving direct-SQL inserts
# broken. ``server_default="50000"`` ensures the DDL has the default
# whether the column was created by create_all or by the explicit
# migration.
context_window_tokens: Mapped[int] = mapped_column(
Integer,
default=DEFAULT_CONTEXT_WINDOW_TOKENS,
server_default=str(DEFAULT_CONTEXT_WINDOW_TOKENS),
nullable=False,
)
max_tool_rounds: Mapped[int] = mapped_column(Integer, default=50)

# Trigger limits (per-agent, configurable from Settings UI)
Expand Down
2 changes: 2 additions & 0 deletions backend/app/schemas/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ class AgentOut(BaseModel):
max_tokens_per_day: int | None = None
max_tokens_per_month: int | None = None
context_window_size: int = 100
context_window_tokens: int = 50000
max_tool_rounds: int = 50
max_triggers: int = 20
min_poll_interval_min: int = 5
Expand Down Expand Up @@ -286,6 +287,7 @@ class AgentUpdate(BaseModel):
primary_model_id: uuid.UUID | None = None
fallback_model_id: uuid.UUID | None = None
context_window_size: int | None = Field(default=None, ge=1, le=500)
context_window_tokens: int | None = Field(default=None, ge=1000, le=500000)
max_tokens_per_day: int | None = None
max_tokens_per_month: int | None = None
max_tool_rounds: int | None = None
Expand Down
12 changes: 12 additions & 0 deletions backend/app/services/agent_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,18 @@ async def build_agent_context(agent_id: uuid.UUID, agent_name: str, role_descrip
3. **NEVER fabricate file contents or tool results from memory.**
Even if you saw a file before, you MUST call the tool again to get current data.

**Handling truncated tool results:** When a tool returns a large payload, the system
may truncate the in-context view and save the full output to your workspace. You will
see a marker like:

`[truncated. Full output (12453 tokens) saved to _tool_results/<call_id>.txt under your
workspace — use the read_file tool to retrieve specific sections]`

When you see this marker, the head excerpt above it is enough for an overview. If you
need the full content (specific search hit, page from a PDF, item N of a list), call
`read_file` with the path shown in the marker. Do NOT fabricate the missing content
from your prior knowledge — the saved file is the ground truth.

4. **Use `write_file` to update memory/memory.md with important information.**

5. **Use `write_file` to update focus.md with your current focus items.**
Expand Down
70 changes: 22 additions & 48 deletions backend/app/services/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def _execute_heartbeat(agent_id: uuid.UUID):

# Call LLM with tools using unified client
from app.services.llm import create_llm_client, get_max_tokens, LLMMessage, LLMError, get_model_api_key
from app.services.agent_tools import execute_tool, get_agent_tools_for_llm
from app.services.agent_tools import get_agent_tools_for_llm

try:
client = create_llm_client(
Expand Down Expand Up @@ -324,57 +324,31 @@ async def _execute_heartbeat(agent_id: uuid.UUID):
reasoning_content=response.reasoning_content,
))

# Tools that require arguments — if LLM sends empty args, skip and ask to retry
# (aligned with call_llm in websocket.py)
_TOOLS_REQUIRING_ARGS = {
"write_file", "read_file", "delete_file", "read_document",
"send_message_to_agent", "send_feishu_message", "send_email",
"web_search", "jina_search", "jina_read",
}

for tc in response.tool_calls:
fn = tc["function"]
tool_name = fn["name"]
raw_args = fn.get("arguments", "{}")
logger.info(f"[Heartbeat] Raw arguments for {tool_name} (len={len(raw_args) if raw_args else 0}): {repr(raw_args[:300]) if raw_args else 'None'}")
try:
args = json.loads(raw_args) if raw_args else {}
except json.JSONDecodeError as je:
logger.warning(f"[Heartbeat] JSON parse failed for {tool_name}: {je}. Raw: {repr(raw_args[:200])}")
args = {}

# Guard: if a tool that requires arguments received empty args,
# return an error to LLM instead of executing
if not args and tool_name in _TOOLS_REQUIRING_ARGS:
logger.warning(f"[Heartbeat] Empty arguments for {tool_name}, asking LLM to retry")
llm_messages.append(LLMMessage(
role="tool",
tool_call_id=tc["id"],
content=f"Error: {tool_name} was called with empty arguments. You must provide the required parameters. Please retry with the correct arguments.",
))
continue

# ── Hard rate limits for plaza actions ──
# Plaza rate-limiting: per-tick budget enforced via pre-execute hook.
# Plaza tools are NOT in TOOLS_PARALLELIZABLE so they run serially —
# closure-captured counters are race-free.
def _hb_plaza_hook(tool_name: str, _args: dict) -> str | None:
nonlocal plaza_posts_made, plaza_comments_made
if tool_name == "plaza_create_post":
if plaza_posts_made >= 1:
tool_result = "[BLOCKED] You have already made 1 plaza post this heartbeat. Do not post again."
else:
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
plaza_posts_made += 1
return "[BLOCKED] You have already made 1 plaza post this heartbeat. Do not post again."
plaza_posts_made += 1
elif tool_name == "plaza_add_comment":
if plaza_comments_made >= 2:
tool_result = "[BLOCKED] You have already made 2 comments this heartbeat. Do not comment again."
else:
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
plaza_comments_made += 1
else:
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)

llm_messages.append(LLMMessage(
role="tool",
tool_call_id=tc["id"],
content=str(tool_result),
))
return "[BLOCKED] You have already made 2 comments this heartbeat. Do not comment again."
plaza_comments_made += 1
Comment on lines +335 to +339
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Increment plaza quotas only after successful execution

The heartbeat plaza hook increments plaza_posts_made/plaza_comments_made before execute_tool runs, so a failed plaza_create_post or plaza_add_comment attempt still consumes quota and blocks retries in the same heartbeat tick. Previously counters were incremented only after a successful tool call, so this change can turn transient tool failures into hard false rate-limit blocks.

Useful? React with 👍 / 👎.

return None

from app.services.tool_loop import ToolExecutionContext, execute_tool_calls
_hb_tool_messages = await execute_tool_calls(
response.tool_calls,
ToolExecutionContext(
agent_id=agent_id,
user_id=agent_creator_id,
pre_execute_hook=_hb_plaza_hook,
),
)
llm_messages.extend(_hb_tool_messages)
else:
reply = response.content or ""
break
Expand Down
Loading