Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/salty-apes-find.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"evlog": patch
---

fix(hono): resolve "ReadableStream is locked" error with AI SDK streaming responses

Using `createUIMessageStreamResponse` or `createAgentUIStreamResponse` from the Vercel AI SDK inside a Hono route would throw `ERR_INVALID_STATE: ReadableStream is locked` when running under `@hono/node-server`.

**Root cause:** The middleware called `createObservedBody(c.res.body)` (which calls `body.getReader()`, locking the stream) and then relied on Hono's `compose` to update `c.res` with the wrapped response via the middleware return value. However, Hono skips that update when `context.finalized` is already `true` β€” which is always the case after a route handler returns a `Response`. This left `c.res` pointing at the original response whose body was now locked, so `@hono/node-server`'s subsequent `response.body.getReader()` call threw.

**Fix:** Explicitly assign `c.res = await finishResponse(c.res, ...)` instead of returning the wrapped response, so `c.res` is always updated regardless of `context.finalized`.

Closes #382
10 changes: 5 additions & 5 deletions packages/evlog/src/hono/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ export function evlog(options: EvlogHonoOptions = {}): MiddlewareHandler {
try {
await next()
if (shouldDeferEmitForResponse(c.res)) {
const response = new Response(c.res.body, {
status: c.res.status,
headers: c.res.headers,
})
return finishResponse(response, { status: response.status })
// Assign directly β€” Hono's compose ignores middleware return values when
// context.finalized is already true, so returning the wrapped response
// would leave c.res with a locked body stream.
c.res = await finishResponse(c.res, { status: c.res.status })
return
}
await finish({ status: c.res.status })
} catch (error) {
Expand Down
63 changes: 63 additions & 0 deletions packages/evlog/test/frameworks/hono.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from '../helpers/framework'
import { defined, getDrainCallArg } from '../helpers/defined'
import { describeStandardHttpMatrix } from '../helpers/frameworkMatrix'
import { createDeferredStream } from '../helpers/stream'

describeStandardHttpMatrix({
name: 'hono',
Expand Down Expand Up @@ -326,4 +327,66 @@ describe('evlog/hono', () => {
expect(enrich).not.toHaveBeenCalled()
})
})

describe('streaming responses', () => {
it('does not lock the response body when the handler returns a streaming SSE response (#382)', async () => {
const { drain } = createPipelineSpies()
let closeStream!: () => void

const app = new Hono<EvlogVariables>()
app.use(evlog({ drain }))
app.get('/api/stream', () => {
const { stream, close } = createDeferredStream()
closeStream = close
return new Response(stream, {
headers: { 'content-type': 'text/event-stream' },
})
})

const res = await app.request('/api/stream')
expect(res.status).toBe(200)

// Body must not be locked β€” @hono/node-server calls body.getReader() to
// stream to the client after app.fetch() resolves.
expect(res.body).not.toBeNull()
expect(res.body?.locked).toBe(false)

closeStream()
await res.text()
await waitForDrainCalls(drain)
assertHttpEventEmitted(drain, { path: '/api/stream', status: 200 })
})

it('defers drain until the SSE stream closes and captures mid-stream context (#321)', async () => {
const { drain } = createPipelineSpies()
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})
let closeStream!: () => void

const app = new Hono<EvlogVariables>()
app.use(evlog({ drain }))
app.get('/api/chat', (c) => {
const log = c.get('log')
const { stream, close } = createDeferredStream()
closeStream = close
queueMicrotask(() => {
log.set({ ai: { calls: 1, totalTokens: 42 } })
})
return new Response(stream, {
headers: { 'content-type': 'text/event-stream' },
})
})

const res = await app.request('/api/chat')
expect(drain).not.toHaveBeenCalled()

closeStream()
await expect(res.text()).resolves.toBe('hello world')
await vi.waitFor(() => {
expect(drain).toHaveBeenCalledTimes(1)
})

expect(warnSpy.mock.calls.some(([message]) => String(message).includes('Keys dropped: ai'))).toBe(false)
expect(drain.mock.calls[0]?.[0]?.event?.ai).toEqual({ calls: 1, totalTokens: 42 })
})
})
})
Loading