Make world-postgres stream delivery resilient to missed NOTIFY events#1881
Open
Pom4H wants to merge 3 commits intovercel:mainfrom
Open
Make world-postgres stream delivery resilient to missed NOTIFY events#1881Pom4H wants to merge 3 commits intovercel:mainfrom
Pom4H wants to merge 3 commits intovercel:mainfrom
Conversation
The dedicated `pg.Client` used for `LISTEN/NOTIFY` is long-lived and will eventually be dropped by the server (idle TCP timeout, pgbouncer rotation, k8s CNI eviction). Previously a single drop stopped all stream delivery until process restart. Two changes make delivery durable: 1. `listenChannel` now reconnects with bounded exponential backoff (250ms → 30s cap). The initial connect must succeed; subsequent reconnects are best-effort and logged. 2. `streams.get` runs a periodic `SELECT ... WHERE chunk_id > lastChunkId` as a safety net for chunks delivered while the LISTEN socket was reconnecting. The poll dedupes against in-band notifications via the existing `enqueue` ordering check. Configurable via `PostgresWorldConfig.streamPollIntervalMs` (default 5000ms; 0 to disable). Tracks vercel#1855. Tests cover three failure modes via testcontainers: - polling fallback delivers chunks inserted with NOTIFY suppressed - reader still receives chunks after pg_terminate_backend kills LISTEN - listenChannel itself reconnects and delivers post-reconnect notifies
`enqueue` previously decremented `offset` and returned without updating `lastChunkId`. The new polling fallback re-queries `chunk_id > lastChunkId` every tick, so chunks intentionally skipped for `startIndex` would come back on the next poll and be skipped again — double-decrementing `offset` and eventually mis-delivering them once `offset` hit zero. Move the high-water mark update to the top of `enqueue`, before the skip branch. Adds a regression test that pre-seeds two chunks, opens the reader with `startIndex=2`, lets several poll ticks fire (none should deliver), then writes a third chunk and asserts only the third reaches the reader.
Two reliability issues surfaced on review: 1. After natural EOF, `streams.get` set `closed = true` and closed the controller but never cleared the polling `setInterval` or removed the EventEmitter listener. The timer kept ticking (no-op via the `closed` guard) and the listener stayed attached for the lifetime of the process. Extracted an idempotent `stop()` that clears both, called from `cancel()` and from the EOF branch in `enqueue`. As a side benefit, the polling timer is no longer started at all if the initial chunk batch already delivered EOF. 2. `listenChannel.close()` called during an in-flight `connect()` could race: `closed = true` was set while `await next.connect()` / `LISTEN` was still resolving, after which the just-connected client would attach its notification listener and persist past close. Added a `closed` re-check after the awaits — if close raced ahead, end the client immediately and bail. Test: a regression test spies on `setInterval`/`clearInterval` and asserts that every interval the streamer scheduled at the configured poll cadence is cleared by the time the consumer reads `done: true`, without the consumer needing to call `cancel()`.
🦋 Changeset detectedLatest commit: 7f40c82 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
Contributor
|
@Pom4H is attempting to deploy a commit to the Vercel Labs Team on Vercel. A member of the Team first needs to authorize it. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes #1855.
Make
@workflow/world-postgresstream delivery resilient when the dedicated PostgreSQLLISTENconnection disconnects or aNOTIFYevent is missed.LISTEN/NOTIFYis treated as a wake-up signal only; the durablestreamstable remains the source of truth.Changes
Make the dedicated
LISTEN workflow_event_chunkclient self-healingerror/endhandlersLISTENafter reconnectclose()Add a polling fallback in
readFromStreamlastChunkIdstreams WHERE chunk_id > lastChunkIdNOTIFYas the fast pathchunk_idAdd
streamPollIntervalMsconfiguration forPostgresWorldConfig0disables the polling fallbackAdd regression coverage for reconnect, missed notifications, dedupe, and polling behavior
Why
PostgreSQL
NOTIFYis not a durable backlog. If the listener is disconnected, chunks may still be inserted successfully into thestreamstable while active readers stop receiving live updates.This change makes stream reads durable by always re-querying persisted chunks newer than the reader's last delivered
chunk_id.Notes
This is compatible with higher-level stream reconnect logic. Core-level reconnect can recover client transport interruptions, but it cannot recover a PostgreSQL notification that was never delivered to a disconnected
LISTENclient. The Postgres world still needs to treat the table as the durable source of truth.