Skip to content

Commit cefbbd7

Browse files
committed
feat(service): add close() method for graceful connection shutdown
This PR primarily fixes #572 by enabling graceful shutdown without consuming self. While implementing this, I noticed delete_session() is spawned as a background task, which means close() may return before HTTP session cleanup completes. Since this is part of the same shutdown lifecycle and can cause resource leaks/races, I'm including a small, localized fix to ensure cleanup is completed before close() returns. If maintainers prefer, I can split the cleanup timing change into a follow-up PR. Changes: - Add close(&mut self) for graceful shutdown without consuming - Add close_with_timeout() for bounded shutdown operations - Add is_closed() to check connection state - Move HTTP delete_session from background spawn to inline cleanup - Add Drop impl with debug log if dropped without explicit close Fixes #572
1 parent f20ed20 commit cefbbd7

File tree

3 files changed

+264
-45
lines changed

3 files changed

+264
-45
lines changed

crates/rmcp/src/service.rs

Lines changed: 98 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ impl<R: ServiceRole> Peer<R> {
434434
pub struct RunningService<R: ServiceRole, S: Service<R>> {
435435
service: Arc<S>,
436436
peer: Peer<R>,
437-
handle: tokio::task::JoinHandle<QuitReason>,
437+
handle: Option<tokio::task::JoinHandle<QuitReason>>,
438438
cancellation_token: CancellationToken,
439439
dg: DropGuard,
440440
}
@@ -459,14 +459,104 @@ impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
459459
pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
460460
RunningServiceCancellationToken(self.cancellation_token.clone())
461461
}
462+
463+
/// Returns true if the service has been closed or cancelled.
462464
#[inline]
463-
pub async fn waiting(self) -> Result<QuitReason, tokio::task::JoinError> {
464-
self.handle.await
465+
pub fn is_closed(&self) -> bool {
466+
self.handle.is_none() || self.cancellation_token.is_cancelled()
467+
}
468+
469+
/// Wait for the service to complete.
470+
///
471+
/// This will block until the service loop terminates (either due to
472+
/// cancellation, transport closure, or an error).
473+
#[inline]
474+
pub async fn waiting(mut self) -> Result<QuitReason, tokio::task::JoinError> {
475+
match self.handle.take() {
476+
Some(handle) => handle.await,
477+
None => Ok(QuitReason::Closed),
478+
}
479+
}
480+
481+
/// Gracefully close the connection and wait for cleanup to complete.
482+
///
483+
/// This method cancels the service, waits for the background task to finish
484+
/// (which includes calling `transport.close()`), and ensures all cleanup
485+
/// operations complete before returning.
486+
///
487+
/// Unlike [`cancel`](Self::cancel), this method takes `&mut self` and can be
488+
/// called without consuming the `RunningService`. After calling this method,
489+
/// the service is considered closed and subsequent operations will fail.
490+
///
491+
/// # Example
492+
///
493+
/// ```rust,ignore
494+
/// let mut client = ().serve(transport).await?;
495+
/// // ... use the client ...
496+
/// client.close().await?;
497+
/// ```
498+
pub async fn close(&mut self) -> Result<QuitReason, tokio::task::JoinError> {
499+
if let Some(handle) = self.handle.take() {
500+
// Disarm the drop guard so it doesn't try to cancel again
501+
// We need to cancel manually and wait for completion
502+
self.cancellation_token.cancel();
503+
handle.await
504+
} else {
505+
// Already closed
506+
Ok(QuitReason::Closed)
507+
}
465508
}
466-
pub async fn cancel(self) -> Result<QuitReason, tokio::task::JoinError> {
467-
let RunningService { dg, handle, .. } = self;
468-
dg.disarm().cancel();
469-
handle.await
509+
510+
/// Gracefully close the connection with a timeout.
511+
///
512+
/// Similar to [`close`](Self::close), but returns after the specified timeout
513+
/// if the cleanup doesn't complete in time. This is useful for ensuring
514+
/// a bounded shutdown time.
515+
///
516+
/// Returns `Ok(Some(reason))` if shutdown completed within the timeout,
517+
/// `Ok(None)` if the timeout was reached, or `Err` if there was a join error.
518+
pub async fn close_with_timeout(
519+
&mut self,
520+
timeout: Duration,
521+
) -> Result<Option<QuitReason>, tokio::task::JoinError> {
522+
if let Some(handle) = self.handle.take() {
523+
self.cancellation_token.cancel();
524+
match tokio::time::timeout(timeout, handle).await {
525+
Ok(result) => result.map(Some),
526+
Err(_elapsed) => {
527+
tracing::warn!(
528+
"close_with_timeout: cleanup did not complete within {:?}",
529+
timeout
530+
);
531+
Ok(None)
532+
}
533+
}
534+
} else {
535+
Ok(Some(QuitReason::Closed))
536+
}
537+
}
538+
539+
/// Cancel the service and wait for cleanup to complete.
540+
///
541+
/// This consumes the `RunningService` and ensures the connection is properly
542+
/// closed. For a non-consuming alternative, see [`close`](Self::close).
543+
pub async fn cancel(mut self) -> Result<QuitReason, tokio::task::JoinError> {
544+
// Disarm the drop guard since we're handling cancellation explicitly
545+
let _ = std::mem::replace(&mut self.dg, self.cancellation_token.clone().drop_guard());
546+
self.close().await
547+
}
548+
}
549+
550+
impl<R: ServiceRole, S: Service<R>> Drop for RunningService<R, S> {
551+
fn drop(&mut self) {
552+
if self.handle.is_some() && !self.cancellation_token.is_cancelled() {
553+
tracing::debug!(
554+
"RunningService dropped without explicit close(). \
555+
The connection will be closed asynchronously. \
556+
For guaranteed cleanup, call close() or cancel() before dropping."
557+
);
558+
}
559+
// The DropGuard will handle cancellation
470560
}
471561
}
472562

@@ -847,7 +937,7 @@ where
847937
RunningService {
848938
service,
849939
peer: peer_return,
850-
handle,
940+
handle: Some(handle),
851941
cancellation_token: ct.clone(),
852942
dg: ct.drop_guard(),
853943
}

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -333,37 +333,10 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
333333
}
334334
None
335335
};
336-
// delete session when drop guard is dropped
337-
if let Some(session_id) = &session_id {
338-
let ct = transport_task_ct.clone();
339-
let client = self.client.clone();
340-
let session_id = session_id.clone();
341-
let url = config.uri.clone();
342-
let auth_header = config.auth_header.clone();
343-
tokio::spawn(async move {
344-
ct.cancelled().await;
345-
let delete_session_result = client
346-
.delete_session(url, session_id.clone(), auth_header.clone())
347-
.await;
348-
match delete_session_result {
349-
Ok(_) => {
350-
tracing::info!(session_id = session_id.as_ref(), "delete session success")
351-
}
352-
Err(StreamableHttpError::ServerDoesNotSupportDeleteSession) => {
353-
tracing::info!(
354-
session_id = session_id.as_ref(),
355-
"server doesn't support delete session"
356-
)
357-
}
358-
Err(e) => {
359-
tracing::error!(
360-
session_id = session_id.as_ref(),
361-
"fail to delete session: {e}"
362-
);
363-
}
364-
};
365-
});
366-
}
336+
// Store session info for cleanup when run() exits (not spawned, so cleanup completes before close() returns)
337+
let session_cleanup_info = session_id.as_ref().map(|sid| {
338+
(self.client.clone(), config.uri.clone(), sid.clone(), config.auth_header.clone())
339+
});
367340

368341
context.send_to_handler(message).await?;
369342
let initialized_notification = context.recv_from_handler().await?;
@@ -437,20 +410,23 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
437410
}
438411
});
439412
}
440-
loop {
413+
// Main event loop - capture exit reason so we can do cleanup before returning
414+
let loop_result: Result<(), WorkerQuitReason<Self::Error>> = 'main_loop: loop {
441415
let event = tokio::select! {
442416
_ = transport_task_ct.cancelled() => {
443417
tracing::debug!("cancelled");
444-
return Err(WorkerQuitReason::Cancelled);
418+
break 'main_loop Err(WorkerQuitReason::Cancelled);
445419
}
446420
message = context.recv_from_handler() => {
447-
let message = message?;
448-
Event::ClientMessage(message)
421+
match message {
422+
Ok(msg) => Event::ClientMessage(msg),
423+
Err(e) => break 'main_loop Err(e),
424+
}
449425
},
450426
message = sse_worker_rx.recv() => {
451427
let Some(message) = message else {
452428
tracing::trace!("transport dropped, exiting");
453-
return Err(WorkerQuitReason::HandlerTerminated);
429+
break 'main_loop Err(WorkerQuitReason::HandlerTerminated);
454430
};
455431
Event::ServerMessage(message)
456432
},
@@ -525,7 +501,9 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
525501
}
526502
Event::ServerMessage(json_rpc_message) => {
527503
// send the message to the handler
528-
context.send_to_handler(json_rpc_message).await?;
504+
if let Err(e) = context.send_to_handler(json_rpc_message).await {
505+
break 'main_loop Err(e);
506+
}
529507
}
530508
Event::StreamResult(result) => {
531509
if result.is_err() {
@@ -536,7 +514,31 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
536514
}
537515
}
538516
}
517+
};
518+
519+
// Cleanup session before returning (ensures close() waits for session deletion)
520+
if let Some((client, url, session_id, auth_header)) = session_cleanup_info {
521+
let delete_result = client.delete_session(url, session_id.clone(), auth_header).await;
522+
match delete_result {
523+
Ok(_) => {
524+
tracing::info!(session_id = session_id.as_ref(), "delete session success")
525+
}
526+
Err(StreamableHttpError::ServerDoesNotSupportDeleteSession) => {
527+
tracing::info!(
528+
session_id = session_id.as_ref(),
529+
"server doesn't support delete session"
530+
)
531+
}
532+
Err(e) => {
533+
tracing::error!(
534+
session_id = session_id.as_ref(),
535+
"fail to delete session: {e}"
536+
);
537+
}
538+
}
539539
}
540+
541+
loop_result
540542
}
541543
}
542544

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//cargo test --test test_close_connection --features "client server"
2+
3+
mod common;
4+
use std::time::Duration;
5+
6+
use common::handlers::{TestClientHandler, TestServer};
7+
use rmcp::{service::QuitReason, ServiceExt};
8+
9+
/// Test that close() properly shuts down the connection
10+
#[tokio::test]
11+
async fn test_close_method() -> anyhow::Result<()> {
12+
let (server_transport, client_transport) = tokio::io::duplex(4096);
13+
14+
// Start server
15+
let server_handle = tokio::spawn(async move {
16+
let server = TestServer::new().serve(server_transport).await?;
17+
server.waiting().await?;
18+
anyhow::Ok(())
19+
});
20+
21+
// Start client
22+
let handler = TestClientHandler::new(true, true);
23+
let mut client = handler.serve(client_transport).await?;
24+
25+
// Verify client is not closed
26+
assert!(!client.is_closed());
27+
28+
// Call close() and verify it returns
29+
let result = client.close().await?;
30+
assert!(matches!(result, QuitReason::Cancelled));
31+
32+
// Verify client is now closed
33+
assert!(client.is_closed());
34+
35+
// Calling close() again should return Closed immediately
36+
let result = client.close().await?;
37+
assert!(matches!(result, QuitReason::Closed));
38+
39+
// Wait for server to finish
40+
server_handle.await??;
41+
Ok(())
42+
}
43+
44+
/// Test that close_with_timeout() respects the timeout
45+
#[tokio::test]
46+
async fn test_close_with_timeout() -> anyhow::Result<()> {
47+
let (server_transport, client_transport) = tokio::io::duplex(4096);
48+
49+
// Start server
50+
let server_handle = tokio::spawn(async move {
51+
let server = TestServer::new().serve(server_transport).await?;
52+
server.waiting().await?;
53+
anyhow::Ok(())
54+
});
55+
56+
// Start client
57+
let handler = TestClientHandler::new(true, true);
58+
let mut client = handler.serve(client_transport).await?;
59+
60+
// Close with a reasonable timeout
61+
let result = client.close_with_timeout(Duration::from_secs(5)).await?;
62+
assert!(result.is_some());
63+
assert!(matches!(result.unwrap(), QuitReason::Cancelled));
64+
65+
// Verify client is now closed
66+
assert!(client.is_closed());
67+
68+
// Wait for server to finish
69+
server_handle.await??;
70+
Ok(())
71+
}
72+
73+
/// Test that cancel() still works and consumes self
74+
#[tokio::test]
75+
async fn test_cancel_method() -> anyhow::Result<()> {
76+
let (server_transport, client_transport) = tokio::io::duplex(4096);
77+
78+
// Start server
79+
let server_handle = tokio::spawn(async move {
80+
let server = TestServer::new().serve(server_transport).await?;
81+
server.waiting().await?;
82+
anyhow::Ok(())
83+
});
84+
85+
// Start client
86+
let handler = TestClientHandler::new(true, true);
87+
let client = handler.serve(client_transport).await?;
88+
89+
// Cancel should work as before
90+
let result = client.cancel().await?;
91+
assert!(matches!(result, QuitReason::Cancelled));
92+
93+
// Wait for server to finish
94+
server_handle.await??;
95+
Ok(())
96+
}
97+
98+
/// Test that dropping without close() logs a debug message (we can't easily test
99+
/// the log output, but we can verify the drop doesn't panic)
100+
#[tokio::test]
101+
async fn test_drop_without_close() -> anyhow::Result<()> {
102+
let (server_transport, client_transport) = tokio::io::duplex(4096);
103+
104+
// Start server that will handle the drop
105+
let server_handle = tokio::spawn(async move {
106+
let server = TestServer::new().serve(server_transport).await?;
107+
// The server should close when the client drops
108+
let result = server.waiting().await?;
109+
// Server should detect closure
110+
assert!(matches!(result, QuitReason::Closed | QuitReason::Cancelled));
111+
anyhow::Ok(())
112+
});
113+
114+
// Create and immediately drop the client
115+
{
116+
let handler = TestClientHandler::new(true, true);
117+
let _client = handler.serve(client_transport).await?;
118+
// Client dropped here without calling close()
119+
}
120+
121+
// Give the async cleanup a moment to run
122+
tokio::time::sleep(Duration::from_millis(100)).await;
123+
124+
// Wait for server to finish (it should detect the closure)
125+
server_handle.await??;
126+
Ok(())
127+
}

0 commit comments

Comments
 (0)