Skip to content

Commit bb834da

Browse files
committed
fix: stop reconnecting after terminal SSE drain errors
1 parent edeb3e0 commit bb834da

2 files changed

Lines changed: 78 additions & 2 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,9 @@ async def _handle_sse_response(
342342
assert isinstance(ctx.session_message.message, JSONRPCRequest)
343343
original_request_id = ctx.session_message.message.id
344344

345+
response_complete = False
345346
try:
346347
event_source = EventSource(response)
347-
response_complete = False
348348
async for sse in event_source.aiter_sse(): # pragma: no branch
349349
# Track last event ID for potential reconnection
350350
if sse.id:
@@ -372,6 +372,8 @@ async def _handle_sse_response(
372372
return # Normal completion, no reconnect needed
373373
except Exception:
374374
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
375+
if response_complete:
376+
return
375377

376378
# Stream ended without response - reconnect if we received an event with ID
377379
if last_event_id is not None: # pragma: no branch
@@ -403,6 +405,7 @@ async def _handle_reconnection(
403405
if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch
404406
original_request_id = ctx.session_message.message.id
405407

408+
response_complete = False
406409
try:
407410
async with aconnect_sse(ctx.client, "GET", self.url, headers=headers) as event_source:
408411
event_source.response.raise_for_status()
@@ -411,7 +414,6 @@ async def _handle_reconnection(
411414
# Track for potential further reconnection
412415
reconnect_last_event_id: str = last_event_id
413416
reconnect_retry_ms = retry_interval_ms
414-
response_complete = False
415417

416418
async for sse in event_source.aiter_sse():
417419
if sse.id: # pragma: no branch
@@ -438,6 +440,8 @@ async def _handle_reconnection(
438440
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
439441
except Exception as e:
440442
logger.debug(f"Reconnection failed: {e}")
443+
if response_complete:
444+
return
441445
# Try to reconnect again if we still have an event ID
442446
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)
443447

tests/shared/test_streamable_http.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1878,6 +1878,40 @@ async def fail_reconnect(*args: Any, **kwargs: Any) -> None: # pragma: no cover
18781878
await read_stream.aclose()
18791879

18801880

1881+
@pytest.mark.anyio
1882+
async def test_sse_response_does_not_reconnect_after_terminal_then_drain_error(monkeypatch: pytest.MonkeyPatch):
1883+
transport = StreamableHTTPTransport(url="http://localhost:8000/mcp")
1884+
response = _FakeStreamResponse()
1885+
1886+
class FakeEventSource:
1887+
def __init__(self, response: _FakeStreamResponse) -> None:
1888+
self.response = response
1889+
1890+
async def aiter_sse(self):
1891+
yield _response_sse(1)
1892+
raise RuntimeError("drain failed after terminal response")
1893+
1894+
async def fail_reconnect(*args: Any, **kwargs: Any) -> None: # pragma: no cover
1895+
raise AssertionError("completed responses should not reconnect after drain errors")
1896+
1897+
monkeypatch.setattr(streamable_http_module, "EventSource", FakeEventSource)
1898+
monkeypatch.setattr(transport, "_handle_reconnection", fail_reconnect)
1899+
1900+
write_stream, read_stream = create_context_streams[SessionMessage | Exception](2)
1901+
async with httpx.AsyncClient() as client:
1902+
try:
1903+
ctx = _make_streamable_http_request_context(1, client, write_stream)
1904+
await transport._handle_sse_response(response, ctx)
1905+
1906+
message = await read_stream.receive()
1907+
assert isinstance(message, SessionMessage)
1908+
assert isinstance(message.message, types.JSONRPCResponse)
1909+
assert message.message.id == 1
1910+
finally:
1911+
await write_stream.aclose()
1912+
await read_stream.aclose()
1913+
1914+
18811915
@pytest.mark.anyio
18821916
async def test_reconnection_drains_after_terminal_response(monkeypatch: pytest.MonkeyPatch):
18831917
"""Resumed GET responses use EOF draining instead of response.aclose()."""
@@ -1914,6 +1948,44 @@ async def fake_aconnect_sse(*args: Any, **kwargs: Any):
19141948
await read_stream.aclose()
19151949

19161950

1951+
@pytest.mark.anyio
1952+
async def test_reconnection_does_not_retry_after_terminal_then_drain_error(monkeypatch: pytest.MonkeyPatch):
1953+
transport = StreamableHTTPTransport(url="http://localhost:8000/mcp")
1954+
response = _FakeStreamResponse()
1955+
attempts = 0
1956+
1957+
class FakeReconnectionEventSource:
1958+
def __init__(self, response: _FakeStreamResponse) -> None:
1959+
self.response = response
1960+
1961+
async def aiter_sse(self):
1962+
yield _response_sse("abc")
1963+
raise RuntimeError("drain failed after terminal response")
1964+
1965+
@asynccontextmanager
1966+
async def fake_aconnect_sse(*args: Any, **kwargs: Any):
1967+
nonlocal attempts
1968+
attempts += 1
1969+
yield FakeReconnectionEventSource(response)
1970+
1971+
monkeypatch.setattr(streamable_http_module, "aconnect_sse", fake_aconnect_sse)
1972+
1973+
write_stream, read_stream = create_context_streams[SessionMessage | Exception](2)
1974+
async with httpx.AsyncClient() as client:
1975+
try:
1976+
ctx = _make_streamable_http_request_context("abc", client, write_stream)
1977+
await transport._handle_reconnection(ctx, "previous-event", retry_interval_ms=0)
1978+
1979+
assert attempts == 1
1980+
message = await read_stream.receive()
1981+
assert isinstance(message, SessionMessage)
1982+
assert isinstance(message.message, types.JSONRPCResponse)
1983+
assert message.message.id == "abc"
1984+
finally:
1985+
await write_stream.aclose()
1986+
await read_stream.aclose()
1987+
1988+
19171989
@pytest.mark.anyio
19181990
async def test_reconnection_retries_after_failed_resume(monkeypatch: pytest.MonkeyPatch):
19191991
"""A failed resume attempt falls back to the next reconnection attempt."""

0 commit comments

Comments
 (0)