Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 80 additions & 15 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,30 @@ function remoteURL(key: string, value: string) {
log.warn("invalid remote mcp url", { key })
}

// Convert MCP tool definition to AI SDK Tool type
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
// Detect server-side session-expiration errors so we can transparently
// reconnect the streamable-HTTP transport on the next tool call.
//
// Streamable HTTP servers may invalidate an `mcp-session-id` for any reason
// (server restart, idle timeout, periodic rotation). The SDK surfaces this as
// a wrapped error like `Error POSTing to endpoint: {"error":"Session not
// found"}`. Without recovery, every subsequent tool call on that server fails
// until the user manually disconnects and reconnects.
export function isSessionExpiredError(err: unknown): boolean {
const msg = err instanceof Error ? err.message : String(err ?? "")
return /session\s*not\s*found/i.test(msg) || /invalid\s*session/i.test(msg) || /mcp-session-id/i.test(msg)
}

// Convert MCP tool definition to AI SDK Tool type. The execute path resolves
// the live client by name on every invocation so that, after a reconnect, the
// next call hits the freshly-stored client rather than a stale closure
// reference.
function convertMcpTool(
mcpTool: MCPToolDef,
serverName: string,
getClient: () => MCPClient | undefined,
reconnect: () => Promise<MCPClient | undefined>,
timeout?: number,
): Tool {
const inputSchema = mcpTool.inputSchema

// Spread first, then override type to ensure it's always "object"
Expand All @@ -135,17 +157,31 @@ function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number
description: mcpTool.description ?? "",
inputSchema: jsonSchema(schema),
execute: async (args: unknown) => {
return client.callTool(
{
name: mcpTool.name,
arguments: (args || {}) as Record<string, unknown>,
},
CallToolResultSchema,
{
resetTimeoutOnProgress: true,
timeout,
},
)
const call = (c: MCPClient) =>
c.callTool(
{
name: mcpTool.name,
arguments: (args || {}) as Record<string, unknown>,
},
CallToolResultSchema,
{
resetTimeoutOnProgress: true,
timeout,
},
)

const initial = getClient()
if (!initial) throw new Error(`MCP server "${serverName}" is not connected`)
return call(initial).catch(async (err) => {
if (!isSessionExpiredError(err)) throw err
log.info("mcp session expired, reconnecting", {
server: serverName,
tool: mcpTool.name,
})
const fresh = await reconnect()
if (!fresh) throw err // reconnect failed — surface the original error
return call(fresh) // single retry; do not loop
})
},
})
}
Expand Down Expand Up @@ -630,6 +666,10 @@ export const layer = Layer.effect(
const tools = Effect.fn("MCP.tools")(function* () {
const result: Record<string, Tool> = {}
const s = yield* InstanceState.get(state)
// Capture the current context so the per-tool reconnect callback can
// re-enter Effect-land from plain async code (the AI SDK calls Tool.execute
// outside of any Effect runtime).
const ctx = yield* Effect.context<never>()

const cfg = yield* cfgSvc.get()
const config = cfg.mcp ?? {}
Expand All @@ -641,7 +681,7 @@ export const layer = Layer.effect(

yield* Effect.forEach(
connectedClients,
([clientName, client]) =>
([clientName]) =>
Effect.gen(function* () {
const mcpConfig = config[clientName]
const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : undefined
Expand All @@ -652,9 +692,34 @@ export const layer = Layer.effect(
return
}

// Resolver and reconnect closures shared across every tool from
// this server. The resolver reads the live `s.clients` map (the
// same object reference used by storeClient/closeClient) so it
// always observes the latest client after a reconnect.
const getClient = () => s.clients[clientName]
const reconnect = async () => {
if (!entry) return undefined // no config available — can't reconnect
try {
await Effect.runPromiseWith(ctx)(createAndStore(clientName, entry))
} catch (e) {
log.warn("mcp reconnect failed", {
server: clientName,
error: e instanceof Error ? e.message : String(e),
})
return undefined
}
return s.clients[clientName]
}

const timeout = entry?.timeout ?? defaultTimeout
for (const mcpTool of listed) {
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(
mcpTool,
clientName,
getClient,
reconnect,
timeout,
)
}
}),
{ concurrency: "unbounded" },
Expand Down
Loading
Loading