Skip to content

Python: Context duplication in multi-turn conversations when restoring from checkpoint with HandoffUserInputRequest #2667

@lbbniu

Description

@lbbniu

Summary

When using the Handoff workflow pattern with checkpointing enabled, restoring from a checkpoint causes duplicate conversation
context in multi-turn conversations. The root cause is that HandoffUserInputRequest.conversation stores the full conversation
history, which gets serialized into pending_agent_requests during checkpoint save. Upon restoration, this creates duplicate
messages when combined with the conversation history restored from AgentThread.

Environment

  • Agent Framework Version: [Your version]
  • Python Version: 3.10
  • Workflow Pattern: Handoff with checkpointing

Root Cause Analysis

  1. Checkpoint Save (AgentExecutor.on_checkpoint_save, line 220)

return {
"cache": encode_chat_messages(self._cache),
"agent_thread": serialized_thread,
"pending_agent_requests": encode_checkpoint_value(self._pending_agent_requests), # ⚠️ Includes full conversation
"pending_responses_to_agent": encode_checkpoint_value(self._pending_responses_to_agent),
}

The _pending_agent_requests dict contains HandoffUserInputRequest objects, which include the full conversation history in their
conversation field.

  1. HandoffUserInputRequest Structure (_handoff.py, line 127-134)

@DataClass
class HandoffUserInputRequest:
"""Request message emitted when the workflow needs fresh user input."""

  conversation: list[ChatMessage]  # ⚠️ Full conversation history
  awaiting_agent_id: str
  prompt: str
  source_executor_id: str
  1. Checkpoint Restore (AgentExecutor.on_checkpoint_restore, line 247 & 257)

Restore AgentThread (contains full conversation history)

self._agent_thread = await AgentThread.deserialize(thread_payload)

Restore pending_agent_requests (also contains full conversation history)

self._pending_agent_requests = decode_checkpoint_value(pending_requests_payload)

  1. Result: Duplicate Context

Final Context = pending_agent_requests.conversation + AgentThread.messages
↓ ↓
(Full conversation history) (Full conversation history)
= DUPLICATED MESSAGES

Steps to Reproduce

  1. Create a Handoff workflow with checkpointing enabled:
    from agent_framework import HandoffBuilder, InMemoryCheckpointStorage

storage = InMemoryCheckpointStorage()
workflow = (
HandoffBuilder(participants=[coordinator, specialist])
.set_coordinator("coordinator")
.with_checkpointing(storage)
.build()
)
2. Run a multi-turn conversation with user input requests:
async for event in workflow.run_stream("Initial message", session_id="test_session"):
if isinstance(event, RequestInfoEvent):
# Trigger checkpoint save with pending HandoffUserInputRequest
await workflow.send_response(event.data.request_id, "User response")
3. Restore from checkpoint and continue the conversation:

Checkpoint restore happens automatically with same session_id

async for event in workflow.run_stream("Next message", session_id="test_session"):
# Context now contains duplicate messages
pass
4. Observe: The restored conversation context contains duplicate messages from both pending_agent_requests.conversation and
AgentThread.

Expected Behavior

The conversation context should contain each message exactly once after checkpoint restoration.

Actual Behavior

The conversation context contains duplicate messages: once from HandoffUserInputRequest.conversation (stored in
pending_agent_requests) and once from AgentThread.

Proposed Solutions

Option 1: Exclude conversation from checkpoint serialization (Recommended)

Modify checkpoint save logic to exclude the conversation field from HandoffUserInputRequest:

async def on_checkpoint_save(self) -> dict[str, Any]:
# Custom serialization for HandoffUserInputRequest
serialized_requests = {}
for req_id, request in self._pending_agent_requests.items():
if isinstance(request, HandoffUserInputRequest):
serialized_requests[req_id] = {
"type": "HandoffUserInputRequest",
"awaiting_agent_id": request.awaiting_agent_id,
"prompt": request.prompt,
"source_executor_id": request.source_executor_id,
# ✅ Exclude conversation - will be restored from AgentThread
}
else:
serialized_requests[req_id] = encode_checkpoint_value(request)

  return {
      "cache": encode_chat_messages(self._cache),
      "agent_thread": serialized_thread,
      "pending_agent_requests": serialized_requests,
      "pending_responses_to_agent": encode_checkpoint_value(self._pending_responses_to_agent),
  }

On restore, reconstruct the conversation from AgentThread:

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
# ... restore cache and thread ...

  # Restore pending_agent_requests
  pending_requests_payload = state.get("pending_agent_requests")
  if pending_requests_payload:
      for req_id, req_data in pending_requests_payload.items():
          if req_data.get("type") == "HandoffUserInputRequest":
              # Reconstruct conversation from AgentThread
              conversation = await self._agent_thread.get_messages()  # or similar
              self._pending_agent_requests[req_id] = HandoffUserInputRequest(
                  conversation=conversation,
                  awaiting_agent_id=req_data["awaiting_agent_id"],
                  prompt=req_data["prompt"],
                  source_executor_id=req_data["source_executor_id"],
              )

Option 2: Use message references instead of full messages

Change HandoffUserInputRequest.conversation to store message IDs instead of full ChatMessage objects:

@DataClass
class HandoffUserInputRequest:
conversation_message_ids: list[str] # Store IDs instead of full messages
awaiting_agent_id: str
prompt: str
source_executor_id: str

Option 3: Deduplicate on restore

Add deduplication logic in on_checkpoint_restore:

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
# ... restore cache and thread ...

  pending_requests_payload = state.get("pending_agent_requests")
  if pending_requests_payload:
      self._pending_agent_requests = decode_checkpoint_value(pending_requests_payload)

      # Clear conversation from HandoffUserInputRequest to avoid duplication
      for request in self._pending_agent_requests.values():
          if isinstance(request, HandoffUserInputRequest):
              request.conversation = []  # Will be retrieved from AgentThread

Impact

  • Severity: High - Causes incorrect behavior in production multi-turn conversations
  • Affected Components:
    • AgentExecutor.on_checkpoint_save / on_checkpoint_restore
    • HandoffUserInputRequest dataclass
    • _UserInputGateway.request_input

Additional Context

This issue specifically affects the Handoff workflow pattern because HandoffUserInputRequest is the only user input request type
that stores the full conversation history. Other request types (e.g., FunctionApprovalRequestContent) don't have this problem.

The duplication can cause:

  • Increased token usage (duplicate messages sent to LLM)
  • Incorrect agent behavior (confused by repeated messages)
  • Memory bloat in long conversations

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions