From dd775a0a17f4fac8f08e6fb91aeb5e31e3fa6621 Mon Sep 17 00:00:00 2001 From: taobaorun Date: Sun, 14 Dec 2025 14:29:45 +0800 Subject: [PATCH 1/6] fix close_wait and tomcat connection keeps growing --- .../server/transport/WebMvcSseServerTransportProvider.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java index 6c35de56d..550fde7d3 100644 --- a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java @@ -263,10 +263,13 @@ private ServerResponse handleSseConnection(ServerRequest request) { logger.debug("Creating new SSE connection for session: {}", sessionId); sseBuilder.onComplete(() -> { logger.debug("SSE connection completed for session: {}", sessionId); + // explicitly close the session when the SSE connection is completed + session.close(); sessions.remove(sessionId); }); sseBuilder.onTimeout(() -> { logger.debug("SSE connection timed out for session: {}", sessionId); + session.close(); sessions.remove(sessionId); }); this.sessions.put(sessionId, session); From 32562f3d5370202fd33b4f3691cb62702eb58d20 Mon Sep 17 00:00:00 2001 From: taobaorun Date: Mon, 15 Dec 2025 11:41:11 +0800 Subject: [PATCH 2/6] sse mvc session close async when complete or timeout --- .../server/transport/WebMvcSseServerTransportProvider.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java index 550fde7d3..79b5daf3e 100644 --- a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java @@ -264,13 +264,11 @@ private ServerResponse handleSseConnection(ServerRequest request) { sseBuilder.onComplete(() -> { logger.debug("SSE connection completed for session: {}", sessionId); // explicitly close the session when the SSE connection is completed - session.close(); - sessions.remove(sessionId); + session.closeGracefully().doOnSuccess(v -> sessions.remove(sessionId)).subscribe(); }); sseBuilder.onTimeout(() -> { logger.debug("SSE connection timed out for session: {}", sessionId); - session.close(); - sessions.remove(sessionId); + session.closeGracefully().doOnSuccess(v -> sessions.remove(sessionId)).subscribe(); }); this.sessions.put(sessionId, session); From 2a9765669a27c88c219abac6e3b611d304bad3e4 Mon Sep 17 00:00:00 2001 From: taobaorun Date: Mon, 15 Dec 2025 14:38:28 +0800 Subject: [PATCH 3/6] sse mvc session close sync when complete or timeout. --- .../server/transport/WebMvcSseServerTransportProvider.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java index 79b5daf3e..550fde7d3 100644 --- a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java @@ -264,11 +264,13 @@ private ServerResponse handleSseConnection(ServerRequest request) { sseBuilder.onComplete(() -> { logger.debug("SSE connection completed for session: {}", sessionId); // explicitly close the session when the SSE connection is completed - session.closeGracefully().doOnSuccess(v -> sessions.remove(sessionId)).subscribe(); + session.close(); + sessions.remove(sessionId); }); sseBuilder.onTimeout(() -> { logger.debug("SSE connection timed out for session: {}", sessionId); - session.closeGracefully().doOnSuccess(v -> sessions.remove(sessionId)).subscribe(); + session.close(); + sessions.remove(sessionId); }); this.sessions.put(sessionId, session); From 73abd117932c16b2276f80f39283cd0d28517f5d Mon Sep 17 00:00:00 2001 From: taobaorun Date: Mon, 15 Dec 2025 20:24:54 +0800 Subject: [PATCH 4/6] Optimize logging output --- .../server/transport/WebMvcSseServerTransportProvider.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java index 550fde7d3..807ccc0fe 100644 --- a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java @@ -386,6 +386,12 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { String jsonText = jsonMapper.writeValueAsString(message); sseBuilder.event(MESSAGE_EVENT_TYPE).data(jsonText); } + catch (IOException e) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to send message: {}", e.getMessage()); + } + sseBuilder.error(e); + } catch (Exception e) { logger.error("Failed to send message: {}", e.getMessage()); sseBuilder.error(e); From 7a07c751171c2eb468987aceea7be8553fa1c59d Mon Sep 17 00:00:00 2001 From: taobaorun Date: Tue, 16 Dec 2025 08:11:32 +0800 Subject: [PATCH 5/6] mcp server session add closed flag --- .../spec/McpServerSession.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index 241f7d8b5..650fdfac5 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -7,6 +7,7 @@ import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -65,6 +66,8 @@ public class McpServerSession implements McpLoggableSession { private volatile McpSchema.LoggingLevel minLoggingLevel = McpSchema.LoggingLevel.INFO; + private volatile AtomicBoolean closed = new AtomicBoolean(false); + /** * Creates a new server session with the given parameters and the transport to use. * @param id session id @@ -346,13 +349,20 @@ private MethodNotFoundError getMethodNotFoundError(String method) { @Override public Mono closeGracefully() { // TODO: clear pendingResponses and emit errors? - return this.transport.closeGracefully(); + if (this.closed.compareAndSet(false, true)) { + return this.transport.closeGracefully(); + } + else { + return Mono.empty(); + } } @Override public void close() { // TODO: clear pendingResponses and emit errors? - this.transport.close(); + if (this.closed.compareAndSet(false, true)) { + this.transport.close(); + } } /** From 9072fb65811135bc9f770e1d25bbe1c427ea3fb4 Mon Sep 17 00:00:00 2001 From: taobaorun Date: Tue, 16 Dec 2025 08:15:56 +0800 Subject: [PATCH 6/6] fix todo clear pending responses --- .../spec/McpServerSession.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index 650fdfac5..5cfdbe7e0 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -4,20 +4,12 @@ package io.modelcontextprotocol.spec; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.TypeRef; import io.modelcontextprotocol.server.McpAsyncServerExchange; import io.modelcontextprotocol.server.McpInitRequestHandler; import io.modelcontextprotocol.server.McpNotificationHandler; import io.modelcontextprotocol.server.McpRequestHandler; -import io.modelcontextprotocol.json.TypeRef; import io.modelcontextprotocol.util.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +17,14 @@ import reactor.core.publisher.MonoSink; import reactor.core.publisher.Sinks; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + /** * Represents a Model Context Protocol (MCP) session on the server side. It manages * bidirectional JSON-RPC communication with the client. @@ -37,7 +37,9 @@ public class McpServerSession implements McpLoggableSession { private final String id; - /** Duration to wait for request responses before timing out */ + /** + * Duration to wait for request responses before timing out + */ private final Duration requestTimeout; private final AtomicLong requestCounter = new AtomicLong(0); @@ -348,8 +350,9 @@ private MethodNotFoundError getMethodNotFoundError(String method) { @Override public Mono closeGracefully() { - // TODO: clear pendingResponses and emit errors? if (this.closed.compareAndSet(false, true)) { + this.pendingResponses.forEach((id, response) -> response.error(new RuntimeException("Session closed"))); + this.pendingResponses.clear(); return this.transport.closeGracefully(); } else { @@ -359,8 +362,9 @@ public Mono closeGracefully() { @Override public void close() { - // TODO: clear pendingResponses and emit errors? if (this.closed.compareAndSet(false, true)) { + this.pendingResponses.forEach((id, response) -> response.error(new RuntimeException("Session closed"))); + this.pendingResponses.clear(); this.transport.close(); } }