Skip to content

Commit 58bf5a5

Browse files
committed
fix: stop reconnecting after terminal SSE drain errors
1 parent 27f6789 commit 58bf5a5

2 files changed

Lines changed: 114 additions & 7 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,9 @@ async def _handle_sse_response(
342342
assert isinstance(ctx.session_message.message, JSONRPCRequest)
343343
original_request_id = ctx.session_message.message.id
344344

345+
response_complete = False
345346
try:
346347
event_source = EventSource(response)
347-
response_complete = False
348348
async for sse in event_source.aiter_sse(): # pragma: no branch
349349
# Track last event ID for potential reconnection
350350
if sse.id:
@@ -372,6 +372,8 @@ async def _handle_sse_response(
372372
return # Normal completion, no reconnect needed
373373
except Exception:
374374
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
375+
if response_complete:
376+
return
375377

376378
# Stream ended without response - reconnect if we received an event with ID
377379
if last_event_id is not None: # pragma: no branch
@@ -403,6 +405,7 @@ async def _handle_reconnection(
403405
if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch
404406
original_request_id = ctx.session_message.message.id
405407

408+
response_complete = False
406409
try:
407410
async with aconnect_sse(ctx.client, "GET", self.url, headers=headers) as event_source:
408411
event_source.response.raise_for_status()
@@ -411,7 +414,6 @@ async def _handle_reconnection(
411414
# Track for potential further reconnection
412415
reconnect_last_event_id: str = last_event_id
413416
reconnect_retry_ms = retry_interval_ms
414-
response_complete = False
415417

416418
async for sse in event_source.aiter_sse():
417419
if sse.id: # pragma: no branch
@@ -438,6 +440,8 @@ async def _handle_reconnection(
438440
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
439441
except Exception as e:
440442
logger.debug(f"Reconnection failed: {e}")
443+
if response_complete:
444+
return
441445
# Try to reconnect again if we still have an event ID
442446
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)
443447

tests/shared/test_streamable_http.py

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,6 @@
4949
from mcp.server.transport_security import TransportSecuritySettings
5050
from mcp.shared._context import RequestContext
5151
from mcp.shared._context_streams import ContextSendStream, create_context_streams
52-
from mcp.shared._httpx_utils import (
53-
MCP_DEFAULT_SSE_READ_TIMEOUT,
54-
MCP_DEFAULT_TIMEOUT,
55-
create_mcp_http_client,
56-
)
5752
from mcp.shared.message import ClientMessageMetadata, ServerMessageMetadata, SessionMessage
5853
from mcp.shared.session import RequestResponder
5954
from mcp.types import (
@@ -1663,6 +1658,76 @@ async def fail_reconnect(*args: Any, **kwargs: Any) -> None: # pragma: no cover
16631658
await read_stream.aclose()
16641659

16651660

1661+
@pytest.mark.anyio
1662+
async def test_sse_response_does_not_reconnect_after_terminal_then_drain_error(monkeypatch: pytest.MonkeyPatch):
1663+
transport = StreamableHTTPTransport(url="http://localhost:8000/mcp")
1664+
response = _FakeStreamResponse()
1665+
1666+
class FakeEventSource:
1667+
def __init__(self, response: _FakeStreamResponse) -> None:
1668+
self.response = response
1669+
1670+
async def aiter_sse(self):
1671+
yield _response_sse(1)
1672+
raise RuntimeError("drain failed after terminal response")
1673+
1674+
async def fail_reconnect(*args: Any, **kwargs: Any) -> None: # pragma: no cover
1675+
raise AssertionError("completed responses should not reconnect after drain errors")
1676+
1677+
monkeypatch.setattr(streamable_http_module, "EventSource", FakeEventSource)
1678+
monkeypatch.setattr(transport, "_handle_reconnection", fail_reconnect)
1679+
1680+
write_stream, read_stream = create_context_streams[SessionMessage | Exception](2)
1681+
async with httpx.AsyncClient() as client:
1682+
try:
1683+
ctx = _make_streamable_http_request_context(1, client, write_stream)
1684+
await transport._handle_sse_response(response, ctx)
1685+
1686+
message = await read_stream.receive()
1687+
assert isinstance(message, SessionMessage)
1688+
assert isinstance(message.message, types.JSONRPCResponse)
1689+
assert message.message.id == 1
1690+
finally:
1691+
await write_stream.aclose()
1692+
await read_stream.aclose()
1693+
1694+
1695+
@pytest.mark.anyio
1696+
async def test_sse_response_reconnects_after_pre_terminal_drain_error(monkeypatch: pytest.MonkeyPatch):
1697+
transport = StreamableHTTPTransport(url="http://localhost:8000/mcp")
1698+
response = _FakeStreamResponse()
1699+
reconnects: list[tuple[str, int | None]] = []
1700+
1701+
class FakeEventSource:
1702+
def __init__(self, response: _FakeStreamResponse) -> None:
1703+
self.response = response
1704+
1705+
async def aiter_sse(self):
1706+
yield ServerSentEvent(event="message", data="", id="resume-from-here")
1707+
raise RuntimeError("stream failed before terminal response")
1708+
1709+
async def record_reconnect(
1710+
ctx: StreamableHTTPClientRequestContext,
1711+
last_event_id: str,
1712+
retry_interval_ms: int | None = None,
1713+
) -> None:
1714+
reconnects.append((last_event_id, retry_interval_ms))
1715+
1716+
monkeypatch.setattr(streamable_http_module, "EventSource", FakeEventSource)
1717+
monkeypatch.setattr(transport, "_handle_reconnection", record_reconnect)
1718+
1719+
write_stream, read_stream = create_context_streams[SessionMessage | Exception](2)
1720+
async with httpx.AsyncClient() as client:
1721+
try:
1722+
ctx = _make_streamable_http_request_context(1, client, write_stream)
1723+
await transport._handle_sse_response(response, ctx)
1724+
1725+
assert reconnects == [("resume-from-here", None)]
1726+
finally:
1727+
await write_stream.aclose()
1728+
await read_stream.aclose()
1729+
1730+
16661731
@pytest.mark.anyio
16671732
async def test_reconnection_drains_after_terminal_response(monkeypatch: pytest.MonkeyPatch):
16681733
"""Resumed GET responses use EOF draining instead of response.aclose()."""
@@ -1699,6 +1764,44 @@ async def fake_aconnect_sse(*args: Any, **kwargs: Any):
16991764
await read_stream.aclose()
17001765

17011766

1767+
@pytest.mark.anyio
1768+
async def test_reconnection_does_not_retry_after_terminal_then_drain_error(monkeypatch: pytest.MonkeyPatch):
1769+
transport = StreamableHTTPTransport(url="http://localhost:8000/mcp")
1770+
response = _FakeStreamResponse()
1771+
attempts = 0
1772+
1773+
class FakeReconnectionEventSource:
1774+
def __init__(self, response: _FakeStreamResponse) -> None:
1775+
self.response = response
1776+
1777+
async def aiter_sse(self):
1778+
yield _response_sse("abc")
1779+
raise RuntimeError("drain failed after terminal response")
1780+
1781+
@asynccontextmanager
1782+
async def fake_aconnect_sse(*args: Any, **kwargs: Any):
1783+
nonlocal attempts
1784+
attempts += 1
1785+
yield FakeReconnectionEventSource(response)
1786+
1787+
monkeypatch.setattr(streamable_http_module, "aconnect_sse", fake_aconnect_sse)
1788+
1789+
write_stream, read_stream = create_context_streams[SessionMessage | Exception](2)
1790+
async with httpx.AsyncClient() as client:
1791+
try:
1792+
ctx = _make_streamable_http_request_context("abc", client, write_stream)
1793+
await transport._handle_reconnection(ctx, "previous-event", retry_interval_ms=0)
1794+
1795+
assert attempts == 1
1796+
message = await read_stream.receive()
1797+
assert isinstance(message, SessionMessage)
1798+
assert isinstance(message.message, types.JSONRPCResponse)
1799+
assert message.message.id == "abc"
1800+
finally:
1801+
await write_stream.aclose()
1802+
await read_stream.aclose()
1803+
1804+
17021805
@pytest.mark.anyio
17031806
async def test_reconnection_retries_after_failed_resume(monkeypatch: pytest.MonkeyPatch):
17041807
"""A failed resume attempt falls back to the next reconnection attempt."""

0 commit comments

Comments
 (0)