Skip to content

Commit 2c40a6b

Browse files
Add MCP Server
1 parent 1eb66c4 commit 2c40a6b

File tree

6 files changed

+150
-85
lines changed

6 files changed

+150
-85
lines changed

application/apps/indexer/mcp/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ workspace = true
99
[dependencies]
1010
rmcp = { version = "0.10", features = ["server", "transport-io"] }
1111
schemars = "1.1"
12-
serder.workspace = true
12+
serde.workspace = true
13+
anyhow.workspace = true
1314
serde_json.workspace = true
1415
tokio.workspace = true
1516
tokio-util.workspace = true

application/apps/indexer/mcp/src/server/messages.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,21 @@ use crate::types::McpError;
99
pub enum Tasks {
1010
Search {
1111
filters: Vec<Filter>,
12-
callback_tx: oneshot::Sender<Result<TaskResponse, McpError>>,
12+
callback_tx: oneshot::Sender<TaskResponse>,
1313
},
1414
PrepareChart {
1515
filters: Vec<Filter>,
16-
callback_tx: oneshot::Sender<Result<TaskResponse, McpError>>,
16+
callback_tx: oneshot::Sender<TaskResponse>,
1717
},
1818
}
1919

2020
#[derive(Clone, Debug)]
2121
pub enum TaskResponse {
2222
Success(String),
23-
Error(String),
23+
Error(McpError),
2424
}
2525

26-
// TODO: MOCK
27-
#[derive(Debug, JsonSchema, Serialize, Deserialize)]
26+
#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)]
2827
pub struct Filter {
2928
pub value: String,
3029
pub is_regex: bool,
Lines changed: 32 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,48 @@
11
pub mod messages;
22
pub mod tools;
33

4-
use log::{error, warn};
5-
use tokio::{
6-
sync::{mpsc, oneshot},
7-
time::{self, sleep},
4+
use anyhow::{Context, Result};
5+
use rmcp::{
6+
handler::server::{ServerHandler, tool::ToolRouter},
7+
model::{ServerCapabilities, ServerInfo},
8+
service::{QuitReason, ServiceExt},
9+
tool_handler,
10+
transport::stdio,
811
};
9-
10-
use rmcp::handler::server::tool::ToolRouter;
11-
use rmcp::tool_router;
12+
use tokio::sync::mpsc;
1213

1314
use messages::{Filter, Tasks};
1415

1516
#[derive(Debug)]
1617
pub struct McpServer {
1718
task_tx: mpsc::Sender<Tasks>,
18-
tool_router: ToolRouter<Self>,
19+
pub tool_router: ToolRouter<Self>,
1920
}
2021

21-
#[tool_router]
22-
impl McpServer {
23-
pub fn new() -> (Self, mpsc::Receiver<Tasks>) {
24-
let (task_tx, task_rx) = mpsc::channel::<Tasks>(32);
25-
26-
(
27-
Self {
28-
task_tx,
29-
tool_router: Self::tool_router(),
30-
},
31-
task_rx,
32-
)
33-
}
34-
35-
pub async fn start(self) {
36-
tokio::spawn(self.run());
37-
}
38-
39-
async fn run(self) {
40-
// TODO: Send a mock message after 1 seconds
41-
warn!("🔅 MCP: sleep timer started");
42-
let duration = time::Duration::from_secs(10);
43-
sleep(duration).await;
44-
warn!("🔅 MCP: {:?} seconds passed", duration);
45-
46-
let (response_tx, response_rx) = oneshot::channel();
47-
48-
let filters = vec![Filter {
49-
value: String::from("icmp_seq=13"),
50-
is_regex: false,
51-
ignore_case: true,
52-
is_word: true,
53-
}];
54-
55-
let message = Tasks::Search {
56-
filters,
57-
callback_tx: response_tx,
58-
};
59-
60-
if let Err(err) = self.task_tx.send(message).await {
61-
error!(
62-
"[Chipmunk] <-X- [MCP server]: server failed to send request: ApplyFilter: {err}"
63-
);
64-
return;
22+
#[tool_handler]
23+
impl ServerHandler for McpServer {
24+
fn get_info(&self) -> ServerInfo {
25+
ServerInfo {
26+
instructions: Some("Chipmunk MCP Server".to_string()),
27+
capabilities: ServerCapabilities::builder()
28+
.enable_tools()
29+
.enable_resources()
30+
.enable_prompts()
31+
.build(),
32+
..Default::default()
6533
}
34+
}
35+
}
6636

67-
match response_rx.await {
68-
Err(err) => {
69-
error!(
70-
"[Chipmunk] -X-> [MCP server]: server failed to receive response: ApplyFilter: {err}"
71-
);
72-
}
73-
Ok(result) => {
74-
error!(
75-
"[Chipmunk] --> [MCP server]: ✅ Received response: {:?}",
76-
result
77-
)
78-
}
79-
}
37+
impl McpServer {
38+
pub async fn start(self) -> Result<QuitReason> {
39+
let server = self
40+
.serve(stdio())
41+
.await
42+
.inspect_err(|err| println!("Error while starting the server {err}"))?;
43+
44+
tokio::spawn(server.waiting())
45+
.await?
46+
.context("Error while spawning a MCP server")
8047
}
8148
}
Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,108 @@
1-
use super::McpServer;
1+
use super::{Filter, McpServer, Tasks, messages::TaskResponse};
22

3-
use rmcp::tool;
3+
use tokio::sync::mpsc;
44

5+
use rmcp::{
6+
ErrorData as RmcpError,
7+
handler::server::wrapper::Parameters,
8+
model::{CallToolResult, Content, ErrorCode},
9+
tool, tool_router,
10+
};
11+
12+
#[tool_router]
513
impl McpServer {
6-
#[tool(description = "test")]
7-
fn apply_search_filter() {}
14+
pub fn new() -> (Self, mpsc::Receiver<Tasks>) {
15+
let (task_tx, task_rx) = mpsc::channel::<Tasks>(32);
16+
17+
(
18+
Self {
19+
task_tx,
20+
tool_router: Self::tool_router(),
21+
},
22+
task_rx,
23+
)
24+
}
25+
26+
#[tool(description = r#"Generate SearchFilter objects for filtering logs.
27+
28+
This tool accepts one or more filter specifications and returns a list of SearchFilter objects.
29+
Each filter can be customized with flags for regex matching, case sensitivity, and word boundaries.
30+
31+
**Input Parameters:**
32+
- `filters`: An array of filter objects, where each object contains:
33+
- `filter` (string): The text or pattern to search for
34+
- `is_regex` (boolean): true if the filter is a regular expression pattern
35+
- `ignore_case` (boolean): true for case-insensitive matching
36+
- `is_word` (boolean): true to match whole words only (word boundary matching)
37+
38+
**Usage Examples:**
39+
40+
Single filter:
41+
- Input: [{"filter": "error", "is_regex": false, "ignore_case": false, "is_word": false}]
42+
- Use case: Find exact matches of "error"
43+
44+
Multiple filters:
45+
- Input: [
46+
{"filter": "ERROR", "is_regex": false, "ignore_case": true, "is_word": false},
47+
{"filter": "\\d{4}-\\d{2}-\\d{2}", "is_regex": true, "ignore_case": false, "is_word": false}
48+
]
49+
- Use case: Find "ERROR" (any case) OR date patterns
50+
51+
Common patterns:
52+
- Case-insensitive word: {"filter": "warning", "is_regex": false, "ignore_case": true, "is_word": true}
53+
- Regex pattern: {"filter": "\\b(error|fail|exception)\\b", "is_regex": true, "ignore_case": false, "is_word": false}
54+
- Exact match: {"filter": "timeout", "is_regex": false, "ignore_case": false, "is_word": false}
55+
56+
**Natural Language Interpretation:**
57+
When the user provides natural language instructions, interpret them as follows:
58+
- "error" → single filter for "error"
59+
- "error or warning" → two filters, one for "error" and one for "warning"
60+
- "case-insensitive ERROR" → set ignore_case: true
61+
- "match the word 'timeout'" → set is_word: true
62+
- "regex pattern \\d+" → set is_regex: true
63+
- "find ERROR, WARNING, and CRITICAL" → three separate filters
64+
"#)]
65+
async fn apply_search_filter(
66+
&self,
67+
Parameters(params): Parameters<Vec<Filter>>,
68+
) -> Result<CallToolResult, RmcpError> {
69+
log::info!(
70+
"Received apply_search_filter tool call with params: {:?}",
71+
params
72+
);
73+
let (callback_tx, callback_rx) = tokio::sync::oneshot::channel::<TaskResponse>();
74+
let task = Tasks::Search {
75+
filters: vec![],
76+
callback_tx,
77+
};
78+
let task_tx_clone = self.task_tx.clone();
79+
match tokio::spawn(async move { task_tx_clone.send(task).await }).await {
80+
Ok(_) => log::info!("Sent Search task to MCP server"),
81+
Err(err) => log::error!(
82+
"Failed to send Search task to MCP server: ApplyFilter: {}",
83+
err
84+
),
85+
};
86+
87+
callback_rx
88+
.await
89+
.map(|task_response| {
90+
//Ok(CallToolResult::success(vec![Content::json("Success")?]))
91+
match task_response {
92+
TaskResponse::Success(message) => {
93+
Ok(CallToolResult::success(vec![Content::json(message)?]))
94+
}
95+
TaskResponse::Error(err) => {
96+
Ok(CallToolResult::error(vec![Content::json(err)?]))
97+
}
98+
}
99+
})
100+
.map_err(|err| {
101+
RmcpError::new(
102+
ErrorCode::INTERNAL_ERROR,
103+
format!("Did not receive the response from search filter task {err:?}"),
104+
None,
105+
)
106+
})?
107+
}
8108
}

application/apps/indexer/mcp/src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::fmt;
22

3-
#[derive(Debug)]
3+
use serde::{Deserialize, Serialize};
4+
5+
#[derive(Clone, Debug, Serialize, Deserialize)]
46
pub enum McpError {
57
Generic { message: String },
68
ApplyFilter { message: String },

application/apps/indexer/session/src/mcp_api/mod.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use crate::state::SessionStateAPI;
33
use crate::tracker::OperationTrackerAPI;
44
use log::error;
55
use mcp::McpChannelEndpoints;
6-
use mcp::client::messages::{McpChipmunkToClient, McpClientToChipmunk};
7-
use mcp::server::messages::McpServerToChipmunk;
6+
use mcp::client::{Prompt, Response};
7+
use mcp::server::messages::Tasks;
88
use mcp::types::McpError;
99
use processor::search::filter::SearchFilter;
1010
use tokio::select;
@@ -14,17 +14,13 @@ use uuid::Uuid;
1414

1515
#[derive(Debug, Clone)]
1616
pub struct McpApi {
17-
pub chipmunk_to_client_tx: mpsc::Sender<McpChipmunkToClient>,
17+
pub prompt_tx: mpsc::Sender<Prompt>,
1818
}
1919

2020
impl McpApi {
2121
pub fn new(
2222
mcp_channel_endpoints: McpChannelEndpoints,
23-
) -> (
24-
Self,
25-
mpsc::Receiver<McpClientToChipmunk>,
26-
mpsc::Receiver<McpServerToChipmunk>,
27-
) {
23+
) -> (Self, mpsc::Receiver<Response>, mpsc::Receiver<Tasks>) {
2824
(
2925
Self {
3026
chipmunk_to_client_tx: mcp_channel_endpoints.chipmunk_to_client_tx,
@@ -66,7 +62,7 @@ pub async fn run(
6662
);
6763

6864
let filters = filters.iter().map(|f| SearchFilter::new(f.value.clone(), f.is_regex, f.ignore_case, f.is_word)).collect();
69-
65+
7066
let operation = Operation::new(
7167
session_uuid,
7268
OperationKind::Search { filters },

0 commit comments

Comments
 (0)