Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f0b81bb
feat(ai): align start event types with AG-UI
thruflo Feb 10, 2026
30d00e1
feat(ai): add MessageStreamState type for per-message stream tracking
thruflo Feb 10, 2026
f03493b
feat(ai): refactor StreamProcessor to per-message state
thruflo Feb 10, 2026
f036f55
feat(ai): replace STATE_SNAPSHOT with MESSAGES_SNAPSHOT event
thruflo Feb 10, 2026
3f1ccb5
feat(ai-client): add SessionAdapter interface and createDefaultSession
thruflo Feb 10, 2026
202244a
feat(ai-client): refactor ChatClient to use SessionAdapter subscripti…
thruflo Feb 10, 2026
d4cc2b1
fix(ai-preact): thread option through.
thruflo Feb 10, 2026
f45dd77
fix(ai): finalizeStream when RUN_FINISHED.
thruflo Feb 10, 2026
6d1c733
fix(ai-client): handle reload during active stream with generation co…
thruflo Feb 10, 2026
a217426
docs: remove proposal docs.
thruflo Feb 10, 2026
fd1c50c
fix(ai, ai-client): address stream lifecycle edge cases from PR review
thruflo Feb 10, 2026
8c628ee
fix(ai-client): fix reload failures from stale stream state and waite…
thruflo Feb 11, 2026
36d6a93
ci: apply automated fixes
autofix-ci[bot] Feb 11, 2026
2abc6c7
fix(ai): resolve eslint errors in stream processor
thruflo Feb 11, 2026
c5e1aa3
fix(ai-client): resolve eslint errors in chat-client and session-adapter
thruflo Feb 11, 2026
537b73c
fix(ai-client): propagate send() errors to subscribe() consumers
thruflo Feb 11, 2026
fc52ef7
fix(ai): map 'tool' role to 'assistant' in message state to fix lookups
thruflo Feb 11, 2026
ed1cddb
fix(ai): normalize chunk.delta to avoid "undefined" string concatenation
thruflo Feb 12, 2026
fd7c226
fix(ai): use || instead of ?? for chunk.delta fallback to satisfy eslint
thruflo Feb 12, 2026
64f5517
fix(ai): reset stream flags on MESSAGES_SNAPSHOT to avoid stale state
thruflo Feb 12, 2026
7155e87
refactor(ai-client): finalize connection adapter unification
samwillis Feb 18, 2026
1bcfdfe
Combine the SessionAdapter with the ConnectionAdapter
samwillis Feb 19, 2026
04e3e70
Merge branch 'main' into thruflo/durable-session-support
jherr Feb 23, 2026
fe307a1
ci: apply automated fixes
autofix-ci[bot] Feb 23, 2026
4e3a6cd
Merge remote-tracking branch 'origin/main' into thruflo/durable-sessi…
samwillis Mar 9, 2026
620ab8e
fix: return booleans from chat client streamResponse
samwillis Mar 9, 2026
8c0800e
feat: add sessionGenerating lifecycle for shared generation activity
samwillis Mar 3, 2026
b32ca9f
fix: make stream finalization run-aware and fix reconnect message dedup
samwillis Mar 4, 2026
1a0f3d7
fix: clean up framework client type exports
samwillis Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
436 changes: 350 additions & 86 deletions packages/typescript/ai-client/src/chat-client.ts

Large diffs are not rendered by default.

159 changes: 147 additions & 12 deletions packages/typescript/ai-client/src/connection-adapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,9 @@ async function* readStreamLines(
}
}

/**
* Connection adapter interface - converts a connection into a stream of chunks
*/
export interface ConnectionAdapter {
export interface ConnectConnectionAdapter {
/**
* Connect and return an async iterable of StreamChunks
* @param messages - The messages to send (UIMessages or ModelMessages)
* @param data - Additional data to send
* @param abortSignal - Optional abort signal for request cancellation
* Connect and return an async iterable of StreamChunks.
*/
connect: (
messages: Array<UIMessage> | Array<ModelMessage>,
Expand All @@ -79,6 +73,147 @@ export interface ConnectionAdapter {
) => AsyncIterable<StreamChunk>
}

export interface SubscribeConnectionAdapter {
/**
* Subscribe to stream chunks.
*/
subscribe: (abortSignal?: AbortSignal) => AsyncIterable<StreamChunk>
/**
* Send a request; chunks arrive through subscribe().
*/
send: (
messages: Array<UIMessage> | Array<ModelMessage>,
data?: Record<string, any>,
abortSignal?: AbortSignal,
) => Promise<void>
}

/**
* Connection adapter union.
* Provide either `connect`, or `subscribe` + `send`.
*/
export type ConnectionAdapter =
| ConnectConnectionAdapter
| SubscribeConnectionAdapter

/**
* Normalize a ConnectionAdapter to subscribe/send operations.
*
* If a connection provides native subscribe/send, that mode is used.
* Otherwise, connect() is wrapped using an async queue.
*/
export function normalizeConnectionAdapter(
connection: ConnectionAdapter | undefined,
): SubscribeConnectionAdapter {
if (!connection) {
throw new Error('Connection adapter is required')
}

const hasConnect = 'connect' in connection
const hasSubscribe = 'subscribe' in connection
const hasSend = 'send' in connection

if (hasConnect && (hasSubscribe || hasSend)) {
throw new Error(
'Connection adapter must provide either connect or both subscribe and send, not both modes',
)
}

if (hasSubscribe && hasSend) {
return {
subscribe: connection.subscribe.bind(connection),
send: connection.send.bind(connection),
}
}

if (!hasConnect) {
throw new Error(
'Connection adapter must provide either connect or both subscribe and send',
)
}

// Legacy connect() wrapper
let activeBuffer: Array<StreamChunk> = []
let activeWaiters: Array<(chunk: StreamChunk | null) => void> = []

function push(chunk: StreamChunk): void {
const waiter = activeWaiters.shift()
if (waiter) {
waiter(chunk)
} else {
activeBuffer.push(chunk)
}
}

return {
subscribe(abortSignal?: AbortSignal): AsyncIterable<StreamChunk> {
// Transfer ownership to the latest subscriber so only one active
// subscribe() call receives chunks from the shared connect-wrapper queue.
const myBuffer: Array<StreamChunk> = activeBuffer.splice(0)
const myWaiters: Array<(chunk: StreamChunk | null) => void> = []
activeBuffer = myBuffer
activeWaiters = myWaiters

return (async function* () {
while (!abortSignal?.aborted) {
let chunk: StreamChunk | null
if (myBuffer.length > 0) {
chunk = myBuffer.shift()!
} else {
chunk = await new Promise<StreamChunk | null>((resolve) => {
const onAbort = () => resolve(null)
myWaiters.push((c) => {
abortSignal?.removeEventListener('abort', onAbort)
resolve(c)
})
abortSignal?.addEventListener('abort', onAbort, { once: true })
})
}
if (chunk !== null) yield chunk
}
})()
},
async send(messages, data, abortSignal) {
let hasTerminalEvent = false
try {
const stream = connection.connect(messages, data, abortSignal)
for await (const chunk of stream) {
if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') {
hasTerminalEvent = true
}
push(chunk)
}

// If the connect stream ended cleanly without a terminal event,
// synthesize RUN_FINISHED so request-scoped consumers can complete.
if (!abortSignal?.aborted && !hasTerminalEvent) {
push({
type: 'RUN_FINISHED',
runId: `run-${Date.now()}`,
model: 'connect-wrapper',
timestamp: Date.now(),
finishReason: 'stop',
})
}
} catch (err) {
if (!abortSignal?.aborted && !hasTerminalEvent) {
push({
type: 'RUN_ERROR',
timestamp: Date.now(),
error: {
message:
err instanceof Error
? err.message
: 'Unknown error in connect()',
},
})
}
throw err
}
},
}
}

/**
* Options for fetch-based connection adapters
*/
Expand Down Expand Up @@ -129,7 +264,7 @@ export function fetchServerSentEvents(
options:
| FetchConnectionOptions
| (() => FetchConnectionOptions | Promise<FetchConnectionOptions>) = {},
): ConnectionAdapter {
): ConnectConnectionAdapter {
return {
async *connect(messages, data, abortSignal) {
// Resolve URL and options if they are functions
Expand Down Expand Up @@ -228,7 +363,7 @@ export function fetchHttpStream(
options:
| FetchConnectionOptions
| (() => FetchConnectionOptions | Promise<FetchConnectionOptions>) = {},
): ConnectionAdapter {
): ConnectConnectionAdapter {
return {
async *connect(messages, data, abortSignal) {
// Resolve URL and options if they are functions
Expand Down Expand Up @@ -301,7 +436,7 @@ export function stream(
messages: Array<UIMessage> | Array<ModelMessage>,
data?: Record<string, any>,
) => AsyncIterable<StreamChunk>,
): ConnectionAdapter {
): ConnectConnectionAdapter {
return {
async *connect(messages, data) {
// Pass messages as-is (UIMessages with parts preserved)
Expand Down Expand Up @@ -332,7 +467,7 @@ export function rpcStream(
messages: Array<UIMessage> | Array<ModelMessage>,
data?: Record<string, any>,
) => AsyncIterable<StreamChunk>,
): ConnectionAdapter {
): ConnectConnectionAdapter {
return {
async *connect(messages, data) {
// Pass messages as-is (UIMessages with parts preserved)
Expand Down
3 changes: 3 additions & 0 deletions packages/typescript/ai-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type {
ChatRequestBody,
InferChatMessages,
ChatClientState,
ConnectionStatus,
// Multimodal content input type
MultimodalContent,
} from './types'
Expand All @@ -27,8 +28,10 @@ export {
fetchHttpStream,
stream,
rpcStream,
type ConnectConnectionAdapter,
type ConnectionAdapter,
type FetchConnectionOptions,
type SubscribeConnectionAdapter,
} from './connection-adapters'

// Re-export message converters from @tanstack/ai
Expand Down
33 changes: 31 additions & 2 deletions packages/typescript/ai-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ export type ToolResultState =
*/
export type ChatClientState = 'ready' | 'submitted' | 'streaming' | 'error'

/**
* Connection lifecycle state for the subscription loop.
*/
export type ConnectionStatus =
| 'disconnected'
| 'connecting'
| 'connected'
| 'error'

/**
* Multimodal content input for sending messages with rich media.
* Allows sending text, images, audio, video, and documents to the LLM.
Expand Down Expand Up @@ -178,8 +187,9 @@ export interface ChatClientOptions<
TTools extends ReadonlyArray<AnyClientTool> = any,
> {
/**
* Connection adapter for streaming
* Use fetchServerSentEvents(), fetchHttpStream(), or stream() to create adapters
* Connection adapter for streaming.
* Supports mutually exclusive modes: request-response via `connect()`, or
* subscribe/send mode via `subscribe()` + `send()`.
*/
connection: ConnectionAdapter

Expand Down Expand Up @@ -239,6 +249,25 @@ export interface ChatClientOptions<
*/
onStatusChange?: (status: ChatClientState) => void

/**
* Callback when subscription lifecycle changes.
* This is independent from request lifecycle (`isLoading`, `status`).
*/
onSubscriptionChange?: (isSubscribed: boolean) => void

/**
* Callback when connection lifecycle changes.
*/
onConnectionStatusChange?: (status: ConnectionStatus) => void

/**
* Callback when session generation activity changes.
* Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR).
* Unlike `onLoadingChange` (request-local), this reflects shared generation
* activity visible to all subscribers (e.g. across tabs/devices).
*/
onSessionGeneratingChange?: (isGenerating: boolean) => void

/**
* Callback when a custom event is received from a server-side tool.
* Custom events are emitted by tools using `context.emitCustomEvent()` during execution.
Expand Down
Loading