Skip to content
195 changes: 192 additions & 3 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1661,7 +1661,7 @@
"type": "string",
"format": "text/event-stream"
},
"example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19}, \"available_quotas\": {}}\n\n"
"example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\", \"request_id\": \"123e4567-e89b-12d3-a456-426614174001\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19}, \"available_quotas\": {}}\n\n"
}
}
},
Expand Down Expand Up @@ -1912,6 +1912,121 @@
}
}
},
"/v1/streaming_query/interrupt": {
"post": {
"tags": [
"streaming_query_interrupt"
],
"summary": "Streaming Query Interrupt Endpoint Handler",
"description": "Interrupt an in-progress streaming query by request identifier.\n\nParameters:\n interrupt_request: Request payload containing the stream request ID.\n auth: Auth context tuple resolved from the authentication dependency.\n registry: Stream interrupt registry dependency used to cancel streams.\n\nReturns:\n StreamingInterruptResponse: Confirmation payload when interruption succeeds.\n\nRaises:\n HTTPException: If no active stream for the given request ID can be interrupted.",
"operationId": "stream_interrupt_endpoint_handler_v1_streaming_query_interrupt_post",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/StreamingInterruptRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/StreamingInterruptResponse"
},
"example": {
"interrupted": true,
"message": "Streaming request interrupted",
"request_id": "123e4567-e89b-12d3-a456-426614174000"
}
}
}
},
"401": {
"description": "Unauthorized",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UnauthorizedResponse"
},
"examples": {
"missing header": {
"value": {
"detail": {
"cause": "No Authorization header found",
"response": "Missing or invalid credentials provided by client"
}
}
},
"missing token": {
"value": {
"detail": {
"cause": "No token found in Authorization header",
"response": "Missing or invalid credentials provided by client"
}
}
}
}
}
}
},
"403": {
"description": "Permission denied",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ForbiddenResponse"
},
"examples": {
"endpoint": {
"value": {
"detail": {
"cause": "User 6789 is not authorized to access this endpoint.",
"response": "User does not have permission to access this endpoint"
}
}
}
}
}
}
},
"404": {
"description": "Resource not found",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/NotFoundResponse"
},
"examples": {
"streaming request": {
"value": {
"detail": {
"cause": "Streaming Request with ID 123e4567-e89b-12d3-a456-426614174000 does not exist",
"response": "Streaming Request not found"
}
}
}
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/v1/config": {
"get": {
"tags": [
Expand Down Expand Up @@ -4332,7 +4447,7 @@
],
"summary": "Handle A2A Jsonrpc",
"description": "Handle A2A JSON-RPC requests following the A2A protocol specification.\n\nThis endpoint uses the DefaultRequestHandler from the A2A SDK to handle\nall JSON-RPC requests including message/send, message/stream, etc.\n\nThe A2A SDK application is created per-request to include authentication\ncontext while still leveraging FastAPI's authorization middleware.\n\nAutomatically detects streaming requests (message/stream JSON-RPC method)\nand returns a StreamingResponse to enable real-time chunk delivery.\n\nArgs:\n request: FastAPI request object\n auth: Authentication tuple\n mcp_headers: MCP headers for context propagation\n\nReturns:\n JSON-RPC response or streaming response",
"operationId": "handle_a2a_jsonrpc_a2a_post",
"operationId": "handle_a2a_jsonrpc_a2a_get",
"responses": {
"200": {
"description": "Successful Response",
Expand All @@ -4350,7 +4465,7 @@
],
"summary": "Handle A2A Jsonrpc",
"description": "Handle A2A JSON-RPC requests following the A2A protocol specification.\n\nThis endpoint uses the DefaultRequestHandler from the A2A SDK to handle\nall JSON-RPC requests including message/send, message/stream, etc.\n\nThe A2A SDK application is created per-request to include authentication\ncontext while still leveraging FastAPI's authorization middleware.\n\nAutomatically detects streaming requests (message/stream JSON-RPC method)\nand returns a StreamingResponse to enable real-time chunk delivery.\n\nArgs:\n request: FastAPI request object\n auth: Authentication tuple\n mcp_headers: MCP headers for context propagation\n\nReturns:\n JSON-RPC response or streaming response",
"operationId": "handle_a2a_jsonrpc_a2a_post",
"operationId": "handle_a2a_jsonrpc_a2a_get",
"responses": {
"200": {
"description": "Successful Response",
Expand Down Expand Up @@ -7349,6 +7464,13 @@
"response": "Rag not found"
},
"label": "rag"
},
{
"detail": {
"cause": "Streaming Request with ID 123e4567-e89b-12d3-a456-426614174000 does not exist",
"response": "Streaming Request not found"
},
"label": "streaming request"
}
]
},
Expand Down Expand Up @@ -9224,6 +9346,73 @@
}
]
},
"StreamingInterruptRequest": {
"properties": {
"request_id": {
"type": "string",
"title": "Request Id",
"description": "The active streaming request ID to interrupt",
"examples": [
"123e4567-e89b-12d3-a456-426614174000"
]
}
},
"additionalProperties": false,
"type": "object",
"required": [
"request_id"
],
"title": "StreamingInterruptRequest",
"description": "Model representing a request to interrupt an active streaming query.\n\nAttributes:\n request_id: Unique ID of the active streaming request to interrupt.",
"examples": [
{
"request_id": "123e4567-e89b-12d3-a456-426614174000"
}
]
},
"StreamingInterruptResponse": {
"properties": {
"request_id": {
"type": "string",
"title": "Request Id",
"description": "The streaming request ID targeted by the interrupt call",
"examples": [
"123e4567-e89b-12d3-a456-426614174000"
]
},
"interrupted": {
"type": "boolean",
"title": "Interrupted",
"description": "Whether an in-progress stream was interrupted",
"examples": [
true
]
},
"message": {
"type": "string",
"title": "Message",
"description": "Human-readable interruption status message",
"examples": [
"Streaming request interrupted"
]
}
},
"type": "object",
"required": [
"request_id",
"interrupted",
"message"
],
"title": "StreamingInterruptResponse",
"description": "Model representing a response to a streaming interrupt request.\n\nAttributes:\n request_id: The streaming request ID targeted by the interrupt call.\n interrupted: Whether an in-progress stream was interrupted.\n message: Human-readable interruption status message.\n\nExample:\n ```python\n response = StreamingInterruptResponse(\n request_id=\"123e4567-e89b-12d3-a456-426614174000\",\n interrupted=True,\n message=\"Streaming request interrupted\",\n )\n ```",
"examples": [
{
"interrupted": true,
"message": "Streaming request interrupted",
"request_id": "123e4567-e89b-12d3-a456-426614174000"
}
]
},
"TLSConfiguration": {
"properties": {
"tls_certificate_path": {
Expand Down
91 changes: 91 additions & 0 deletions src/app/endpoints/stream_interrupt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Endpoint for interrupting in-progress streaming query requests."""

from typing import Annotated, Any

from fastapi import APIRouter, Depends, HTTPException

from authentication import get_auth_dependency
from authentication.interface import AuthTuple
from authorization.middleware import authorize
from models.config import Action
from models.requests import StreamingInterruptRequest
from models.responses import (
ForbiddenResponse,
NotFoundResponse,
StreamingInterruptResponse,
UnauthorizedResponse,
)
from utils.stream_interrupts import (
CancelStreamResult,
StreamInterruptRegistry,
get_stream_interrupt_registry,
)

router = APIRouter(tags=["streaming_query_interrupt"])

stream_interrupt_responses: dict[int | str, dict[str, Any]] = {
200: StreamingInterruptResponse.openapi_response(),
401: UnauthorizedResponse.openapi_response(
examples=["missing header", "missing token"]
),
403: ForbiddenResponse.openapi_response(examples=["endpoint"]),
404: NotFoundResponse.openapi_response(examples=["streaming request"]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to extend examples of this response model to make it work properly (see models/responses.NotFoundResponse).

}


@router.post(
"/streaming_query/interrupt",
responses=stream_interrupt_responses,
summary="Streaming Query Interrupt Endpoint Handler",
)
@authorize(Action.STREAMING_QUERY)
async def stream_interrupt_endpoint_handler(
interrupt_request: StreamingInterruptRequest,
auth: Annotated[AuthTuple, Depends(get_auth_dependency())],
registry: Annotated[
StreamInterruptRegistry, Depends(get_stream_interrupt_registry)
],
) -> StreamingInterruptResponse:
"""Interrupt an in-progress streaming query by request identifier.

Parameters:
interrupt_request: Request payload containing the stream request ID.
auth: Auth context tuple resolved from the authentication dependency.
registry: Stream interrupt registry dependency used to cancel streams.

Returns:
StreamingInterruptResponse: Confirmation payload when interruption succeeds.

Raises:
HTTPException: If no active stream for the given request ID can be interrupted.
"""
user_id, _, _, _ = auth
request_id = interrupt_request.request_id
cancel_result = registry.cancel_stream(request_id, user_id)
if cancel_result == CancelStreamResult.NOT_FOUND:
response = NotFoundResponse(
resource="streaming request",
resource_id=request_id,
)
raise HTTPException(**response.model_dump())
if cancel_result == CancelStreamResult.FORBIDDEN:
response = ForbiddenResponse(
response="User does not have permission to interrupt this streaming request",
cause=(
f"User {user_id} does not own streaming request "
f"with ID {request_id}"
),
)
raise HTTPException(**response.model_dump())
if cancel_result == CancelStreamResult.ALREADY_DONE:
return StreamingInterruptResponse(
request_id=request_id,
interrupted=False,
message="Streaming request already completed; nothing to interrupt",
)

return StreamingInterruptResponse(
request_id=request_id,
interrupted=True,
message="Streaming request interrupted",
)
Loading
Loading