Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/examples/server/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,10 +496,10 @@ const getServer = () => {
task
};
},
async getTask(_args, { taskId, taskStore }) {
async getTask({ taskId, taskStore }) {
return await taskStore.getTask(taskId);
},
async getTaskResult(_args, { taskId, taskStore }) {
async getTaskResult({ taskId, taskStore }) {
const result = await taskStore.getTaskResult(taskId);
return result as CallToolResult;
}
Expand Down
9 changes: 3 additions & 6 deletions src/experimental/tasks/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,16 @@ export type CreateTaskRequestHandler<
* Handler for task operations (get, getResult).
* @experimental
*/
export type TaskRequestHandler<
SendResultT extends Result,
Args extends undefined | ZodRawShapeCompat | AnySchema = undefined
> = BaseToolCallback<SendResultT, TaskRequestHandlerExtra, Args>;
export type TaskRequestHandler<SendResultT extends Result> = (extra: TaskRequestHandlerExtra) => SendResultT | Promise<SendResultT>;

/**
* Interface for task-based tool handlers.
* @experimental
*/
export interface ToolTaskHandler<Args extends undefined | ZodRawShapeCompat | AnySchema = undefined> {
createTask: CreateTaskRequestHandler<CreateTaskResult, Args>;
getTask: TaskRequestHandler<GetTaskResult, Args>;
getTaskResult: TaskRequestHandler<CallToolResult, Args>;
getTask: TaskRequestHandler<GetTaskResult>;
getTaskResult: TaskRequestHandler<CallToolResult>;
}

/**
Expand Down
68 changes: 61 additions & 7 deletions src/server/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,37 @@ export class McpServer {
private _registeredTools: { [name: string]: RegisteredTool } = {};
private _registeredPrompts: { [name: string]: RegisteredPrompt } = {};
private _experimental?: { tasks: ExperimentalMcpServerTasks };
private _taskToolMap: Map<string, string> = new Map();

constructor(serverInfo: Implementation, options?: ServerOptions) {
this.server = new Server(serverInfo, options);
const taskHandlerHooks = {
getTask: async (taskId: string, extra: RequestHandlerExtra<ServerRequest, ServerNotification>) => {
// taskStore is guaranteed to exist here because Protocol only calls hooks when taskStore is configured
const taskStore = extra.taskStore!;
const handler = this._getTaskHandler(taskId);
if (handler) {
return await handler.getTask({ ...extra, taskId, taskStore });
}
return await taskStore.getTask(taskId);
},
getTaskResult: async (taskId: string, extra: RequestHandlerExtra<ServerRequest, ServerNotification>) => {
const taskStore = extra.taskStore!;
const handler = this._getTaskHandler(taskId);
if (handler) {
return await handler.getTaskResult({ ...extra, taskId, taskStore });
}
return await taskStore.getTaskResult(taskId);
}
};
this.server = new Server(serverInfo, { ...options, taskHandlerHooks });
}

private _getTaskHandler(taskId: string): ToolTaskHandler<ZodRawShapeCompat | undefined> | null {
const toolName = this._taskToolMap.get(taskId);
if (!toolName) return null;
const tool = this._registeredTools[toolName];
if (!tool || !('createTask' in (tool.handler as AnyToolHandler<ZodRawShapeCompat>))) return null;
return tool.handler as ToolTaskHandler<ZodRawShapeCompat | undefined>;
}

/**
Expand Down Expand Up @@ -215,6 +243,10 @@ export class McpServer {

// Return CreateTaskResult immediately for task requests
if (isTaskRequest) {
const taskResult = result as CreateTaskResult;
if (taskResult.task?.taskId) {
this._taskToolMap.set(taskResult.task.taskId, request.params.name);
}
return result;
}

Expand Down Expand Up @@ -374,27 +406,28 @@ export class McpServer {
const handler = tool.handler as ToolTaskHandler<ZodRawShapeCompat | undefined>;
const taskExtra = { ...extra, taskStore: extra.taskStore };

const createTaskResult: CreateTaskResult = args // undefined only if tool.inputSchema is undefined
? await Promise.resolve((handler as ToolTaskHandler<ZodRawShapeCompat>).createTask(args, taskExtra))
: // eslint-disable-next-line @typescript-eslint/no-explicit-any
await Promise.resolve(((handler as ToolTaskHandler<undefined>).createTask as any)(taskExtra));
const wrappedHandler = toolTaskHandlerByArgs(handler, args);

const createTaskResult = await wrappedHandler.createTask(taskExtra);

// Poll until completion
const taskId = createTaskResult.task.taskId;
const taskExtraComplete = { ...extra, taskId, taskStore: extra.taskStore };
let task = createTaskResult.task;
const pollInterval = task.pollInterval ?? 5000;

while (task.status !== 'completed' && task.status !== 'failed' && task.status !== 'cancelled') {
await new Promise(resolve => setTimeout(resolve, pollInterval));
const updatedTask = await extra.taskStore.getTask(taskId);
const getTaskResult = await wrappedHandler.getTask(taskExtraComplete);
const updatedTask = getTaskResult;
if (!updatedTask) {
throw new McpError(ErrorCode.InternalError, `Task ${taskId} not found during polling`);
}
task = updatedTask;
}

// Return the final result
return (await extra.taskStore.getTaskResult(taskId)) as CallToolResult;
return await wrappedHandler.getTaskResult(taskExtraComplete);
}

private _completionHandlerInitialized = false;
Expand Down Expand Up @@ -1540,3 +1573,24 @@ const EMPTY_COMPLETION_RESULT: CompleteResult = {
hasMore: false
}
};

/**
* Wraps a tool task handler's createTask to handle args uniformly.
* getTask and getTaskResult don't take args, so they're passed through directly.
* @param handler The task handler to wrap.
* @param args The tool arguments.
* @returns A wrapped task handler for a tool, which only exposes a no-args interface for createTask.
*/
function toolTaskHandlerByArgs<Args extends AnySchema | ZodRawShapeCompat | undefined>(
handler: ToolTaskHandler<Args>,
args: unknown
): ToolTaskHandler<undefined> {
return {
createTask: extra =>
args // undefined only if tool.inputSchema is undefined
? Promise.resolve((handler as ToolTaskHandler<ZodRawShapeCompat>).createTask(args, extra))
: Promise.resolve((handler as ToolTaskHandler<undefined>).createTask(extra)),
getTask: handler.getTask,
getTaskResult: handler.getTaskResult
};
}
32 changes: 31 additions & 1 deletion src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ export type ProtocolOptions = {
* appropriately (e.g., by failing the task, dropping messages, etc.).
*/
maxTaskQueueSize?: number;
/**
* Optional hooks for customizing task request handling.
* If a hook is provided, it fully owns the behavior (no fallback to TaskStore).
*/
taskHandlerHooks?: {
/**
* Called when tasks/get is received. If provided, must return the task.
*/
getTask?: (taskId: string, extra: RequestHandlerExtra<Request, Notification>) => Promise<GetTaskResult>;
/**
* Called when tasks/payload needs to retrieve the final result. If provided, must return the result.
*/
getTaskResult?: (taskId: string, extra: RequestHandlerExtra<Request, Notification>) => Promise<Result>;
};
};

/**
Expand Down Expand Up @@ -383,6 +397,16 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
this._taskMessageQueue = _options?.taskMessageQueue;
if (this._taskStore) {
this.setRequestHandler(GetTaskRequestSchema, async (request, extra) => {
// Use hook if provided, otherwise fall back to TaskStore
if (_options?.taskHandlerHooks?.getTask) {
const hookResult = await _options.taskHandlerHooks.getTask(
request.params.taskId,
extra as unknown as RequestHandlerExtra<Request, Notification>
);
// @ts-expect-error SendResultT cannot contain GetTaskResult
return hookResult as SendResultT;
}

const task = await this._taskStore!.getTask(request.params.taskId, extra.sessionId);
if (!task) {
throw new McpError(ErrorCode.InvalidParams, 'Failed to retrieve task: Task not found');
Expand Down Expand Up @@ -462,7 +486,13 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e

// If task is terminal, return the result
if (isTerminal(task.status)) {
const result = await this._taskStore!.getTaskResult(taskId, extra.sessionId);
// Use hook if provided, otherwise fall back to TaskStore
const result = this._options?.taskHandlerHooks?.getTaskResult
? await this._options.taskHandlerHooks.getTaskResult(
taskId,
extra as unknown as RequestHandlerExtra<Request, Notification>
)
: await this._taskStore!.getTaskResult(taskId, extra.sessionId);

this._clearTaskQueue(taskId);

Expand Down
24 changes: 12 additions & 12 deletions test/client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2386,14 +2386,14 @@ describe('Task-based execution', () => {

return { task };
},
async getTask(_args, extra) {
async getTask(extra) {
const task = await extra.taskStore.getTask(extra.taskId);
if (!task) {
throw new Error(`Task ${extra.taskId} not found`);
}
return task;
},
async getTaskResult(_args, extra) {
async getTaskResult(extra) {
const result = await extra.taskStore.getTaskResult(extra.taskId);
return result as { content: Array<{ type: 'text'; text: string }> };
}
Expand Down Expand Up @@ -2462,14 +2462,14 @@ describe('Task-based execution', () => {

return { task };
},
async getTask(_args, extra) {
async getTask(extra) {
const task = await extra.taskStore.getTask(extra.taskId);
if (!task) {
throw new Error(`Task ${extra.taskId} not found`);
}
return task;
},
async getTaskResult(_args, extra) {
async getTaskResult(extra) {
const result = await extra.taskStore.getTaskResult(extra.taskId);
return result as { content: Array<{ type: 'text'; text: string }> };
}
Expand Down Expand Up @@ -2539,14 +2539,14 @@ describe('Task-based execution', () => {

return { task };
},
async getTask(_args, extra) {
async getTask(extra) {
const task = await extra.taskStore.getTask(extra.taskId);
if (!task) {
throw new Error(`Task ${extra.taskId} not found`);
}
return task;
},
async getTaskResult(_args, extra) {
async getTaskResult(extra) {
const result = await extra.taskStore.getTaskResult(extra.taskId);
return result as { content: Array<{ type: 'text'; text: string }> };
}
Expand Down Expand Up @@ -2620,14 +2620,14 @@ describe('Task-based execution', () => {

return { task };
},
async getTask(_args, extra) {
async getTask(extra) {
const task = await extra.taskStore.getTask(extra.taskId);
if (!task) {
throw new Error(`Task ${extra.taskId} not found`);
}
return task;
},
async getTaskResult(_args, extra) {
async getTaskResult(extra) {
const result = await extra.taskStore.getTaskResult(extra.taskId);
return result as { content: Array<{ type: 'text'; text: string }> };
}
Expand Down Expand Up @@ -3105,14 +3105,14 @@ describe('Task-based execution', () => {

return { task };
},
async getTask(_args, extra) {
async getTask(extra) {
const task = await extra.taskStore.getTask(extra.taskId);
if (!task) {
throw new Error(`Task ${extra.taskId} not found`);
}
return task;
},
async getTaskResult(_args, extra) {
async getTaskResult(extra) {
const result = await extra.taskStore.getTaskResult(extra.taskId);
return result as { content: Array<{ type: 'text'; text: string }> };
}
Expand Down Expand Up @@ -3373,14 +3373,14 @@ test('should respect server task capabilities', async () => {

return { task };
},
async getTask(_args, extra) {
async getTask(extra) {
const task = await extra.taskStore.getTask(extra.taskId);
if (!task) {
throw new Error(`Task ${extra.taskId} not found`);
}
return task;
},
async getTaskResult(_args, extra) {
async getTaskResult(extra) {
const result = await extra.taskStore.getTaskResult(extra.taskId);
return result as { content: Array<{ type: 'text'; text: string }> };
}
Expand Down
Loading
Loading