-
Notifications
You must be signed in to change notification settings - Fork 587
Streamable HTTP resumability + redelivery + SSE polling via server-side disconnect #1077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,8 @@ internal sealed partial class StreamableHttpClientSessionTransport : TransportBa | |
| private static readonly MediaTypeWithQualityHeaderValue s_applicationJsonMediaType = new("application/json"); | ||
| private static readonly MediaTypeWithQualityHeaderValue s_textEventStreamMediaType = new("text/event-stream"); | ||
|
|
||
| private static readonly TimeSpan s_defaultReconnectionDelay = TimeSpan.FromSeconds(1); | ||
|
|
||
| private readonly McpHttpClient _httpClient; | ||
| private readonly HttpClientTransportOptions _options; | ||
| private readonly CancellationTokenSource _connectionCts; | ||
|
|
@@ -106,7 +108,17 @@ internal async Task<HttpResponseMessage> SendHttpRequestAsync(JsonRpcMessage mes | |
| else if (response.Content.Headers.ContentType?.MediaType == "text/event-stream") | ||
| { | ||
| using var responseBodyStream = await response.Content.ReadAsStreamAsync(cancellationToken); | ||
| rpcResponseOrError = await ProcessSseResponseAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false); | ||
| var sseState = await ProcessSseResponseAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false); | ||
| rpcResponseOrError = sseState.Response; | ||
|
|
||
| // Resumability: If POST SSE stream ended without a response but we have a Last-Event-ID (from priming), | ||
| // attempt to resume by sending a GET request with Last-Event-ID header. The server will replay | ||
| // events from the event store, allowing us to receive the pending response. | ||
| if (rpcResponseOrError is null && rpcRequest is not null && sseState.LastEventId is not null) | ||
| { | ||
| var resumeResult = await SendGetSseRequestWithRetriesAsync(rpcRequest, sseState, cancellationToken).ConfigureAwait(false); | ||
| rpcResponseOrError = resumeResult.Response; | ||
| } | ||
| } | ||
|
|
||
| if (rpcRequest is null) | ||
|
|
@@ -188,54 +200,135 @@ public override async ValueTask DisposeAsync() | |
|
|
||
| private async Task ReceiveUnsolicitedMessagesAsync() | ||
| { | ||
| // Send a GET request to handle any unsolicited messages not sent over a POST response. | ||
| using var request = new HttpRequestMessage(HttpMethod.Get, _options.Endpoint); | ||
| request.Headers.Accept.Add(s_textEventStreamMediaType); | ||
| CopyAdditionalHeaders(request.Headers, _options.AdditionalHeaders, SessionId, _negotiatedProtocolVersion); | ||
| var state = new SseStreamState(); | ||
|
|
||
| // Server support for the GET request is optional. If it fails, we don't care. It just means we won't receive unsolicited messages. | ||
| HttpResponseMessage response; | ||
| try | ||
| { | ||
| response = await _httpClient.SendAsync(request, message: null, _connectionCts.Token).ConfigureAwait(false); | ||
| } | ||
| catch (HttpRequestException) | ||
| // Continuously receive unsolicited messages until cancelled | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: canceled |
||
| while (!_connectionCts.Token.IsCancellationRequested) | ||
| { | ||
| return; | ||
| var result = await SendGetSseRequestWithRetriesAsync( | ||
| relatedRpcRequest: null, | ||
| state, | ||
| _connectionCts.Token).ConfigureAwait(false); | ||
|
|
||
| // Update state for next reconnection attempt | ||
| state.UpdateFrom(result); | ||
|
|
||
| // If we exhausted retries without receiving any events, stop trying | ||
| if (result.LastEventId is null) | ||
| { | ||
| return; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sends a GET request for SSE with retry logic and resumability support. | ||
| /// </summary> | ||
| private async Task<SseStreamState> SendGetSseRequestWithRetriesAsync( | ||
| JsonRpcRequest? relatedRpcRequest, | ||
| SseStreamState state, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| int attempt = 0; | ||
|
|
||
| // Delay before first attempt if we're reconnecting (have a Last-Event-ID) | ||
| bool shouldDelay = state.LastEventId is not null; | ||
|
|
||
| using (response) | ||
| while (attempt < _options.MaxReconnectionAttempts) | ||
| { | ||
| if (!response.IsSuccessStatusCode) | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| if (shouldDelay) | ||
| { | ||
| return; | ||
| var delay = state.RetryInterval ?? s_defaultReconnectionDelay; | ||
| await Task.Delay(delay, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| shouldDelay = true; | ||
|
|
||
| using var request = new HttpRequestMessage(HttpMethod.Get, _options.Endpoint); | ||
| request.Headers.Accept.Add(s_textEventStreamMediaType); | ||
| CopyAdditionalHeaders(request.Headers, _options.AdditionalHeaders, SessionId, _negotiatedProtocolVersion, state.LastEventId); | ||
|
|
||
| HttpResponseMessage response; | ||
| try | ||
| { | ||
| response = await _httpClient.SendAsync(request, message: null, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| catch (HttpRequestException) | ||
| { | ||
| attempt++; | ||
| continue; | ||
| } | ||
|
|
||
| using (response) | ||
| { | ||
| if (!response.IsSuccessStatusCode) | ||
| { | ||
| attempt++; | ||
| continue; | ||
| } | ||
|
|
||
| using var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); | ||
| var result = await ProcessSseResponseAsync(responseStream, relatedRpcRequest, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| state.UpdateFrom(result); | ||
|
|
||
| if (result.Response is not null) | ||
| { | ||
| return state; | ||
| } | ||
|
|
||
| using var responseStream = await response.Content.ReadAsStreamAsync(_connectionCts.Token).ConfigureAwait(false); | ||
| await ProcessSseResponseAsync(responseStream, relatedRpcRequest: null, _connectionCts.Token).ConfigureAwait(false); | ||
| // Stream closed without the response | ||
| if (state.LastEventId is null) | ||
| { | ||
| // No event ID means server may not support resumability - don't retry indefinitely | ||
| attempt++; | ||
| } | ||
| else | ||
| { | ||
| // We have an event ID, so reconnection should work - reset attempts | ||
| attempt = 0; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What prevents us from ending up in an infinite loop? |
||
| } | ||
| } | ||
| } | ||
|
|
||
| return state; | ||
| } | ||
|
|
||
| private async Task<JsonRpcMessageWithId?> ProcessSseResponseAsync(Stream responseStream, JsonRpcRequest? relatedRpcRequest, CancellationToken cancellationToken) | ||
| private async Task<SseStreamState> ProcessSseResponseAsync( | ||
| Stream responseStream, | ||
| JsonRpcRequest? relatedRpcRequest, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| var state = new SseStreamState(); | ||
|
|
||
| await foreach (SseItem<string> sseEvent in SseParser.Create(responseStream).EnumerateAsync(cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| if (sseEvent.EventType != "message") | ||
| // Track event ID and retry interval for resumability | ||
| if (!string.IsNullOrEmpty(sseEvent.EventId)) | ||
| { | ||
| state.LastEventId = sseEvent.EventId; | ||
| } | ||
| if (sseEvent.ReconnectionInterval.HasValue) | ||
| { | ||
| state.RetryInterval = sseEvent.ReconnectionInterval.Value; | ||
| } | ||
|
|
||
| // Skip events with empty data (priming events, keep-alives) | ||
| if (string.IsNullOrEmpty(sseEvent.Data) || sseEvent.EventType != "message") | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| var rpcResponseOrError = await ProcessMessageAsync(sseEvent.Data, relatedRpcRequest, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // The server SHOULD end the HTTP response body here anyway, but we won't leave it to chance. This transport makes | ||
| // a GET request for any notifications that might need to be sent after the completion of each POST. | ||
| if (rpcResponseOrError is not null) | ||
| { | ||
| return rpcResponseOrError; | ||
| state.Response = rpcResponseOrError; | ||
| return state; | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| return state; | ||
| } | ||
|
|
||
| private async Task<JsonRpcMessageWithId?> ProcessMessageAsync(string data, JsonRpcRequest? relatedRpcRequest, CancellationToken cancellationToken) | ||
|
|
@@ -292,7 +385,8 @@ internal static void CopyAdditionalHeaders( | |
| HttpRequestHeaders headers, | ||
| IDictionary<string, string>? additionalHeaders, | ||
| string? sessionId, | ||
| string? protocolVersion) | ||
| string? protocolVersion, | ||
| string? lastEventId = null) | ||
| { | ||
| if (sessionId is not null) | ||
| { | ||
|
|
@@ -304,6 +398,11 @@ internal static void CopyAdditionalHeaders( | |
| headers.Add("MCP-Protocol-Version", protocolVersion); | ||
| } | ||
|
|
||
| if (lastEventId is not null) | ||
| { | ||
| headers.Add("Last-Event-ID", lastEventId); | ||
| } | ||
|
|
||
| if (additionalHeaders is null) | ||
| { | ||
| return; | ||
|
|
@@ -317,4 +416,21 @@ internal static void CopyAdditionalHeaders( | |
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Tracks state across SSE stream connections. | ||
| /// </summary> | ||
| private struct SseStreamState | ||
| { | ||
| public JsonRpcMessageWithId? Response; | ||
| public string? LastEventId; | ||
| public TimeSpan? RetryInterval; | ||
|
|
||
| public void UpdateFrom(SseStreamState other) | ||
| { | ||
| Response ??= other.Response; | ||
| LastEventId ??= other.LastEventId; | ||
| RetryInterval ??= other.RetryInterval; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,16 +29,32 @@ internal sealed partial class McpSessionHandler : IAsyncDisposable | |
| "mcp.server.operation.duration", "Measures the duration of inbound message processing.", longBuckets: false); | ||
|
|
||
| /// <summary>The latest version of the protocol supported by this implementation.</summary> | ||
| internal const string LatestProtocolVersion = "2025-06-18"; | ||
| internal const string LatestProtocolVersion = "2025-11-25"; | ||
|
|
||
| /// <summary>All protocol versions supported by this implementation.</summary> | ||
| internal static readonly string[] SupportedProtocolVersions = | ||
| [ | ||
| "2024-11-05", | ||
| "2025-03-26", | ||
| "2025-06-18", | ||
| LatestProtocolVersion, | ||
| ]; | ||
|
|
||
| /// <summary> | ||
| /// Checks if the given protocol version supports resumability (priming events). | ||
| /// </summary> | ||
| /// <param name="protocolVersion">The protocol version to check.</param> | ||
| /// <returns>True if the protocol version supports resumability.</returns> | ||
| /// <remarks> | ||
| /// Priming events are only supported in protocol version >= 2025-11-25. | ||
| /// Older clients may crash when receiving SSE events with empty data. | ||
| /// </remarks> | ||
| internal static bool SupportsResumability(string? protocolVersion) | ||
|
||
| { | ||
| const string MinResumabilityProtocolVersion = "2025-11-25"; | ||
| return string.Compare(protocolVersion, MinResumabilityProtocolVersion, StringComparison.Ordinal) >= 0; | ||
| } | ||
|
|
||
| private readonly bool _isServer; | ||
| private readonly string _transportKind; | ||
| private readonly ITransport _transport; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec states "it SHOULD send an SSE event with a standard
retryfield". Does that mean our default is going against the spec's recommendation?