From 4553bdbae6b247d44a4b71a4ed9e174cc6250183 Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Sat, 16 May 2026 02:05:49 +0800 Subject: [PATCH 1/2] fix: fail fast after SSE reconnect exhaustion --- .changeset/quiet-cups-retry.md | 5 ++ packages/client/src/client/streamableHttp.ts | 67 ++++++++++++++----- .../client/test/client/streamableHttp.test.ts | 27 +++++++- 3 files changed, 82 insertions(+), 17 deletions(-) create mode 100644 .changeset/quiet-cups-retry.md diff --git a/.changeset/quiet-cups-retry.md b/.changeset/quiet-cups-retry.md new file mode 100644 index 0000000000..94078c3cc0 --- /dev/null +++ b/.changeset/quiet-cups-retry.md @@ -0,0 +1,5 @@ +--- +"@modelcontextprotocol/client": patch +--- + +Make StreamableHTTPClientTransport retry standalone SSE streams longer and fail fast after reconnection exhaustion instead of allowing later request responses to disappear behind a dead SSE channel. SSE open failures now include a fallback HTTP status when statusText is empty. diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index 3b8ddafe5a..e0b1daccdb 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -23,9 +23,34 @@ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOp initialReconnectionDelay: 1000, maxReconnectionDelay: 30_000, reconnectionDelayGrowFactor: 1.5, - maxRetries: 2 + maxRetries: 10 }; +function errorMessage(error: unknown): string { + if (error instanceof Error && error.message) { + return error.message; + } + + if (typeof error === 'object' && error) { + const maybeError = error as { name?: string; status?: number; statusText?: string }; + if (maybeError.statusText) { + return maybeError.statusText; + } + if (maybeError.status !== undefined) { + return `HTTP ${maybeError.status}`; + } + if (maybeError.name) { + return maybeError.name; + } + } + + if (typeof error === 'string' && error) { + return error; + } + + return 'unknown'; +} + /** * Options for starting or authenticating an SSE connection */ @@ -75,7 +100,7 @@ export interface StreamableHTTPReconnectionOptions { /** * Maximum number of reconnection attempts before giving up. - * Default is 2. + * Default is 10. */ maxRetries: number; } @@ -185,6 +210,7 @@ export class StreamableHTTPClientTransport implements Transport { private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field private readonly _reconnectionScheduler?: ReconnectionScheduler; private _cancelReconnection?: () => void; + private _standaloneSseReconnectError?: Error; onclose?: () => void; onerror?: (error: Error) => void; @@ -290,12 +316,14 @@ export class StreamableHTTPClientTransport implements Transport { return; } - throw new SdkHttpError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${response.statusText}`, { + const statusText = response.statusText || `HTTP ${response.status}`; + throw new SdkHttpError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${statusText}`, { status: response.status, statusText: response.statusText }); } + this._standaloneSseReconnectError = undefined; this._handleSseStream(response.body, options, true); } catch (error) { this.onerror?.(error as Error); @@ -330,13 +358,17 @@ export class StreamableHTTPClientTransport implements Transport { * @param lastEventId The ID of the last received event for resumability * @param attemptCount Current reconnection attempt count for this specific stream */ - private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void { + private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0, failFutureRequests = false): void { // Use provided options or default options const maxRetries = this._reconnectionOptions.maxRetries; // Check if we've exceeded maximum retry attempts if (attemptCount >= maxRetries) { - this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`)); + const error = new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`); + if (failFutureRequests) { + this._standaloneSseReconnectError = error; + } + this.onerror?.(error); return; } @@ -347,9 +379,9 @@ export class StreamableHTTPClientTransport implements Transport { this._cancelReconnection = undefined; if (this._abortController?.signal.aborted) return; this._startOrAuthSse(options).catch(error => { - this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); + this.onerror?.(new Error(`Failed to reconnect SSE stream: ${errorMessage(error)}`)); try { - this._scheduleReconnection(options, attemptCount + 1); + this._scheduleReconnection(options, attemptCount + 1, failFutureRequests); } catch (scheduleError) { this.onerror?.(scheduleError instanceof Error ? scheduleError : new Error(String(scheduleError))); } @@ -445,12 +477,13 @@ export class StreamableHTTPClientTransport implements Transport { onresumptiontoken, replayMessageId }, - 0 + 0, + isReconnectable ); } } catch (error) { // Handle stream errors - likely a network disconnect - this.onerror?.(new Error(`SSE stream disconnected: ${error}`)); + this.onerror?.(new Error(`SSE stream disconnected: ${errorMessage(error)}`)); // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) @@ -466,10 +499,11 @@ export class StreamableHTTPClientTransport implements Transport { onresumptiontoken, replayMessageId }, - 0 + 0, + isReconnectable ); } catch (error) { - this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); + this.onerror?.(new Error(`Failed to reconnect: ${errorMessage(error)}`)); } } } @@ -540,6 +574,12 @@ export class StreamableHTTPClientTransport implements Transport { return; } + const messages = Array.isArray(message) ? message : [message]; + const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined); + if (hasRequests && this._standaloneSseReconnectError) { + throw new Error(`SSE stream reconnection failed: ${this._standaloneSseReconnectError.message}`); + } + const headers = await this._commonHeaders(); headers.set('content-type', 'application/json'); const userAccept = headers.get('accept'); @@ -654,11 +694,6 @@ export class StreamableHTTPClientTransport implements Transport { return; } - // Get original message(s) for detecting request IDs - const messages = Array.isArray(message) ? message : [message]; - - const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined); - // Check the response type const contentType = response.headers.get('content-type'); diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index 0edf8b75ac..ce26fa8979 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -928,7 +928,7 @@ describe('StreamableHTTPClientTransport', () => { // ASSERT expect(errorSpy).toHaveBeenCalledWith( expect.objectContaining({ - message: expect.stringContaining('SSE stream disconnected: Error: Network failure') + message: expect.stringContaining('SSE stream disconnected: Network failure') }) ); // THE KEY ASSERTION: A second fetch call proves reconnection was attempted. @@ -1811,6 +1811,31 @@ describe('StreamableHTTPClientTransport', () => { // Clean up the pending reconnection to avoid test pollution transport['_cancelReconnection']?.(); }); + + it('should fail future requests after standalone SSE reconnect attempts are exhausted', async () => { + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxRetries: 0, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + transport['_scheduleReconnection']({}, 0, true); + + const message: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'tools/call', + params: {}, + id: 'request-after-dead-sse' + }; + + await expect(transport.send(message)).rejects.toThrow( + 'SSE stream reconnection failed: Maximum reconnection attempts (0) exceeded.' + ); + expect(globalThis.fetch).not.toHaveBeenCalled(); + }); }); describe('prevent infinite recursion when server returns 401 after successful auth', () => { From 1355348d523817dad0ec31cb3a372f2669547b33 Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Sat, 30 May 2026 22:10:25 +0800 Subject: [PATCH 2/2] fix: allow direct responses after SSE exhaustion Signed-off-by: Yufeng He <40085740+he-yufeng@users.noreply.github.com> --- packages/client/src/client/streamableHttp.ts | 12 ++++++------ packages/client/test/client/streamableHttp.test.ts | 11 +++++++++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index e0b1daccdb..23c55e6431 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -574,12 +574,6 @@ export class StreamableHTTPClientTransport implements Transport { return; } - const messages = Array.isArray(message) ? message : [message]; - const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined); - if (hasRequests && this._standaloneSseReconnectError) { - throw new Error(`SSE stream reconnection failed: ${this._standaloneSseReconnectError.message}`); - } - const headers = await this._commonHeaders(); headers.set('content-type', 'application/json'); const userAccept = headers.get('accept'); @@ -682,9 +676,15 @@ export class StreamableHTTPClientTransport implements Transport { this._lastUpscopingHeader = undefined; + const messages = Array.isArray(message) ? message : [message]; + const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined); + // If the response is 202 Accepted, there's no body to process if (response.status === 202) { await response.text?.().catch(() => {}); + if (hasRequests && this._standaloneSseReconnectError) { + throw new Error(`SSE stream reconnection failed: ${this._standaloneSseReconnectError.message}`); + } // if the accepted notification is initialized, we start the SSE stream // if it's supported by the server if (isInitializedNotification(message)) { diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index ce26fa8979..96d7df7fc6 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -1812,7 +1812,7 @@ describe('StreamableHTTPClientTransport', () => { transport['_cancelReconnection']?.(); }); - it('should fail future requests after standalone SSE reconnect attempts are exhausted', async () => { + it('should fail 202 responses after standalone SSE reconnect attempts are exhausted', async () => { transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { reconnectionOptions: { initialReconnectionDelay: 10, @@ -1831,10 +1831,17 @@ describe('StreamableHTTPClientTransport', () => { id: 'request-after-dead-sse' }; + (globalThis.fetch as Mock).mockResolvedValueOnce({ + ok: true, + status: 202, + headers: new Headers(), + text: vi.fn().mockResolvedValue('') + }); + await expect(transport.send(message)).rejects.toThrow( 'SSE stream reconnection failed: Maximum reconnection attempts (0) exceeded.' ); - expect(globalThis.fetch).not.toHaveBeenCalled(); + expect(globalThis.fetch).toHaveBeenCalledTimes(1); }); });