world-postgres: enforce per-(run, correlation) uniqueness for entity-creating events#1878
world-postgres: enforce per-(run, correlation) uniqueness for entity-creating events#1878TooTallNate wants to merge 1 commit intomainfrom
Conversation
🦋 Changeset detectedLatest commit: 5ce19f0 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📊 Benchmark Results
workflow with no steps💻 Local Development
workflow with 1 step💻 Local Development
workflow with 10 sequential steps💻 Local Development
workflow with 25 sequential steps💻 Local Development
workflow with 50 sequential steps💻 Local Development
Promise.all with 10 concurrent steps💻 Local Development
Promise.all with 25 concurrent steps💻 Local Development
Promise.all with 50 concurrent steps💻 Local Development
Promise.race with 10 concurrent steps💻 Local Development
Promise.race with 25 concurrent steps💻 Local Development
Promise.race with 50 concurrent steps💻 Local Development
workflow with 10 sequential data payload steps (10KB)💻 Local Development
workflow with 25 sequential data payload steps (10KB)💻 Local Development
workflow with 50 sequential data payload steps (10KB)💻 Local Development
workflow with 10 concurrent data payload steps (10KB)💻 Local Development
workflow with 25 concurrent data payload steps (10KB)💻 Local Development
workflow with 50 concurrent data payload steps (10KB)💻 Local Development
Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
stream pipeline with 5 transform steps (1MB)💻 Local Development
10 parallel streams (1MB each)💻 Local Development
fan-out fan-in 10 streams (1MB each)💻 Local Development
SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
|
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests▲ Vercel Production (1 failed)nextjs-turbopack (1 failed):
🐘 Local Postgres (4 failed)astro-stable (2 failed):
sveltekit-stable (2 failed):
Details by Category❌ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
❌ 🐘 Local Postgres
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details. |
There was a problem hiding this comment.
Pull request overview
Adds a Postgres-level deduplication guard for entity-creating events so concurrent (or repeated) creates with the same (runId, correlationId, type) surface as EntityConflictError, aligning world-postgres behavior with the runtime’s existing dedup contract.
Changes:
- Add a partial unique index on
workflow_events(run_id, correlation_id, type)forstep_created/hook_created/wait_created. - Translate Postgres unique-violation (
23505) duringevents.create()intoEntityConflictErrorfor entity-creating event types. - Add regression tests for duplicate
step_createdandwait_createdscenarios.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/world-postgres/src/storage.ts | Catch and translate unique-violation errors into EntityConflictError for entity-creating events. |
| packages/world-postgres/src/drizzle/schema.ts | Add Drizzle schema definition for the partial unique index. |
| packages/world-postgres/src/drizzle/migrations/0010_add_events_entity_creation_unique_index.sql | Create the partial unique index in SQL migration. |
| packages/world-postgres/src/drizzle/migrations/meta/_journal.json | Register the new migration in the Drizzle journal. |
| packages/world-postgres/test/storage.test.ts | Add tests asserting duplicate entity-creation attempts surface as EntityConflictError and don’t duplicate event rows. |
| .changeset/fix-world-postgres-events-uniqueness.md | Publish a patch changeset describing the new uniqueness + error translation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| -- causing duplicate step/hook/wait events in the log. The unique | ||
| -- violation is caught in events.create and surfaced as | ||
| -- EntityConflictError, which the runtime already handles as a dedup | ||
| -- signal. |
There was a problem hiding this comment.
Creating a UNIQUE index will fail if the table already contains duplicate (run_id, correlation_id, type) rows for the filtered event types. Since the previous storage behavior could emit duplicates, this migration is likely to fail on existing installations unless it first deduplicates those rows (e.g., delete all but the earliest row per (run_id, correlation_id, type) for the affected types) before creating the index.
| -- signal. | |
| -- signal. | |
| -- | |
| -- Existing installations may already contain duplicates for these event | |
| -- types, so deduplicate them before creating the unique partial index. | |
| WITH "ranked_workflow_events" AS ( | |
| SELECT | |
| ctid, | |
| ROW_NUMBER() OVER ( | |
| PARTITION BY "run_id", "correlation_id", "type" | |
| ORDER BY ctid | |
| ) AS "row_num" | |
| FROM "workflow"."workflow_events" | |
| WHERE "type" IN ('step_created', 'hook_created', 'wait_created') | |
| ) | |
| DELETE FROM "workflow"."workflow_events" | |
| WHERE ctid IN ( | |
| SELECT ctid | |
| FROM "ranked_workflow_events" | |
| WHERE "row_num" > 1 | |
| ); |
| const isEntityCreatingEvent = | ||
| data.eventType === 'step_created' || | ||
| data.eventType === 'hook_created' || | ||
| data.eventType === 'wait_created'; | ||
| const pgCode = ((err as { code?: string }).code ?? | ||
| (err as { cause?: { code?: string } }).cause?.code) as | ||
| | string | ||
| | undefined; | ||
| if (isEntityCreatingEvent && pgCode === '23505') { | ||
| throw new EntityConflictError( | ||
| `${data.eventType} for correlationId "${data.correlationId}" already exists in run "${effectiveRunId}"` | ||
| ); | ||
| } |
There was a problem hiding this comment.
The 23505 handling is currently based only on the Postgres error code and event type. Consider additionally checking the violated constraint/index name (e.g. the pg error's constraint field) so other unique violations on these event types (like the events primary key) don’t get misclassified as a correlationId conflict.
| // Sequential duplicate wait_created — the existing TOCTOU read | ||
| // catches this case, but the unique index now provides a stronger | ||
| // guarantee that survives concurrent writers. | ||
| await events.create(testRunId, { |
There was a problem hiding this comment.
This comment mentions an “existing TOCTOU read”, but the current wait_created implementation uses an INSERT ... onConflictDoNothing() + check, not a read-then-write pattern. Updating the comment (or asserting the event log has exactly one wait_created event like the step test) would make the test intent clearer and avoid misleading future readers.
…in world-postgres Adds a unique partial index on workflow_events(run_id, correlation_id, type) filtered to step_created/hook_created/wait_created, and translates the resulting unique-violation (pg code 23505, surfaced via DrizzleQueryError.cause) into EntityConflictError. The steps table already deduped via onConflictDoNothing, but the event row still inserted, leaving duplicate events in the log. Now both rows are kept consistent and the runtime's existing dedup catch path handles concurrent writers cleanly.
72af83e to
5ce19f0
Compare
Summary
Adds a unique partial index on
workflow_events(run_id, correlation_id, type)filtered tostep_created/hook_created/wait_created, and translates the resulting unique-violation (pg code23505, surfaced viaDrizzleQueryError.cause) intoEntityConflictError.Background
Concurrent invocations producing identical correlationIds previously both succeeded at the events-table level, leaving duplicate rows in the log. The
stepstable already deduped viaonConflictDoNothing, but the corresponding event row still inserted, so the storage was internally inconsistent (one step, twostep_createdevents). The runtime's existing dedup catch path (if (EntityConflictError.is(err)) continueinruntime/snapshot-entrypoint.ts) was the intended consumer of this signal but never received it from world-postgres.This is the postgres counterpart of PR #1877 (world-local).
Fix
workflow_events_entity_creation_uniqueon(run_id, correlation_id, type)filtered to entity-creating events (migration0010_add_events_entity_creation_unique_index.sql).events.create()wraps the INSERT in try/catch, detects pg23505(read offDrizzleQueryError.cause.code), and re-throws asEntityConflictErrorfor the relevant event types.step_createdandwait_createdduplicate scenarios.Verification
Extracted from PR #1300 (snapshot-runtime). The snapshot runtime produces deterministic correlationIds across concurrent VM invocations of the same resumption by design — that path made the dedup gap reliably reproducible — but the fix is also valuable on its own for replay-runtime concurrent scenarios.