Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-world-postgres-events-uniqueness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-postgres": patch
---

Fix race in `events.create()` where concurrent `step_created` / `hook_created` / `wait_created` writes with the same `correlationId` would persist duplicate event rows. Adds a unique partial index and surfaces the violation as `EntityConflictError`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Enforce uniqueness of (run_id, correlation_id, event_type) for the
-- entity-creating events (step_created, hook_created, wait_created).
--
-- Without this constraint, two concurrent runtime invocations producing
-- identical correlationIds (e.g. the snapshot runtime's deterministic
-- ULIDs across replays of the same resumption) can both insert events,
-- causing duplicate step/hook/wait events in the log. The unique
-- violation is caught in events.create and surfaced as
-- EntityConflictError, which the runtime already handles as a dedup
-- signal.
Comment thread
TooTallNate marked this conversation as resolved.
--
-- Existing installations may already contain duplicate
-- (run_id, correlation_id, type) rows for these event types — the
-- previous storage behavior allowed them through. Deduplicate before
-- creating the unique partial index, otherwise the CREATE UNIQUE INDEX
-- statement would fail at migration time. We keep the earliest-inserted
-- row for each (run_id, correlation_id, type) tuple (lowest ctid) and
-- drop the rest. The duplicates that this removes are exactly the rows
-- that would have been rejected as `EntityConflictError` had the unique
-- index existed when they were inserted.
WITH "ranked_workflow_events" AS (
SELECT
ctid,
ROW_NUMBER() OVER (
PARTITION BY "run_id", "correlation_id", "type"
ORDER BY ctid
) AS "row_num"
FROM "workflow"."workflow_events"
WHERE "type" IN ('step_created', 'hook_created', 'wait_created')
)
DELETE FROM "workflow"."workflow_events"
WHERE ctid IN (
SELECT ctid
FROM "ranked_workflow_events"
WHERE "row_num" > 1
);

CREATE UNIQUE INDEX IF NOT EXISTS "workflow_events_entity_creation_unique"
ON "workflow"."workflow_events" ("run_id", "correlation_id", "type")
WHERE "type" IN ('step_created', 'hook_created', 'wait_created');
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
"when": 1770500000000,
"tag": "0009_add_is_webhook",
"breakpoints": true
},
{
"idx": 10,
"version": "7",
"when": 1771000000000,
"tag": "0010_add_events_entity_creation_unique_index",
"breakpoints": true
}
]
}
18 changes: 17 additions & 1 deletion packages/world-postgres/src/drizzle/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type WorkflowRun,
WorkflowRunStatusSchema,
} from '@workflow/world';
import { sql } from 'drizzle-orm';
import {
boolean,
customType,
Expand All @@ -21,6 +22,7 @@ import {
primaryKey,
text,
timestamp,
uniqueIndex,
varchar,
} from 'drizzle-orm/pg-core';
import { Cbor, type Cborized } from './cbor.js';
Expand Down Expand Up @@ -114,7 +116,21 @@ export const events = schema.table(
} satisfies DrizzlishOfType<
Cborized<Event & { eventData?: undefined }, 'eventData'>
>,
(tb) => [index().on(tb.runId), index().on(tb.correlationId)]
(tb) => [
index().on(tb.runId),
index().on(tb.correlationId),
// Entity-creating events must be unique per (run, correlation) — without
// this, two concurrent invocations producing identical correlationIds
// (e.g. the snapshot runtime's deterministic ULIDs across replays) can
// both insert events, causing duplicate steps/hooks/waits in the log.
// The unique violation is caught in events.create and translated to
// EntityConflictError, matching the runtime's expected dedup contract.
uniqueIndex('workflow_events_entity_creation_unique')
.on(tb.runId, tb.correlationId, tb.eventType)
.where(
sql`${tb.eventType} IN ('step_created', 'hook_created', 'wait_created')`
),
]
);

export const steps = schema.table(
Expand Down
58 changes: 47 additions & 11 deletions packages/world-postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1240,17 +1240,53 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
? data.eventData
: undefined;

const [value] = await drizzle
.insert(events)
.values({
runId: effectiveRunId,
eventId,
correlationId: data.correlationId,
eventType: data.eventType,
eventData: storedEventData,
specVersion: effectiveSpecVersion,
})
.returning({ createdAt: events.createdAt });
let value: { createdAt: Date } | undefined;
try {
[value] = await drizzle
.insert(events)
.values({
runId: effectiveRunId,
eventId,
correlationId: data.correlationId,
eventType: data.eventType,
eventData: storedEventData,
specVersion: effectiveSpecVersion,
})
.returning({ createdAt: events.createdAt });
} catch (err) {
// Translate unique-violation on the entity-creation partial index
// (workflow_events_entity_creation_unique) into EntityConflictError
// so the runtime's existing dedup catch path can handle it. Without
// this, two concurrent invocations producing identical
// correlationIds (e.g. snapshot runtime deterministic ULIDs) would
// surface as unhandled DB errors instead of dedup signals.
// Drizzle wraps the underlying pg error in DrizzleQueryError; the
// pg error (with .code === '23505') lives on .cause. We additionally
// gate on the violated constraint name so other 23505 violations on
// these event types (e.g. the events primary key, or any future
// unique constraint we might add) don't get misclassified as a
// correlationId conflict.
const isEntityCreatingEvent =
data.eventType === 'step_created' ||
data.eventType === 'hook_created' ||
data.eventType === 'wait_created';
const pgErr = (err as { code?: string; constraint?: string }).code
? (err as { code?: string; constraint?: string })
: ((err as { cause?: { code?: string; constraint?: string } })
.cause ?? {});
const pgCode = pgErr.code;
const pgConstraint = pgErr.constraint;
if (
isEntityCreatingEvent &&
pgCode === '23505' &&
pgConstraint === 'workflow_events_entity_creation_unique'
) {
throw new EntityConflictError(
`${data.eventType} for correlationId "${data.correlationId}" already exists in run "${effectiveRunId}"`
);
}
Comment thread
TooTallNate marked this conversation as resolved.
throw err;
}
if (!value) {
throw new EntityConflictError(`Event ${eventId} could not be created`);
}
Expand Down
105 changes: 105 additions & 0 deletions packages/world-postgres/test/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,111 @@ describe('Storage (Postgres integration)', () => {
});
});

describe('concurrent entity-creation races', () => {
let testRunId: string;
beforeEach(async () => {
const run = await createRun(events, {
deploymentId: 'deployment-123',
workflowName: 'test-workflow',
input: new Uint8Array(),
});
testRunId = run.runId;
await updateRun(events, testRunId, 'run_started');
});

it('should reject concurrent step_created with the same correlationId', async () => {
// Two concurrent step_created calls with identical correlationIds
// (as produced by the snapshot runtime's deterministic ULIDs across
// concurrent VM invocations of the same resumption) must produce
// exactly one step_created event in the log. The unique partial
// index on workflow_events ensures the loser's INSERT raises a
// unique-violation, which storage translates to EntityConflictError
// for the runtime's existing dedup catch path.
const results = await Promise.allSettled([
createStep(events, testRunId, {
stepId: 'step_dup_1',
stepName: 'test-step',
input: new Uint8Array([1]),
}),
createStep(events, testRunId, {
stepId: 'step_dup_1',
stepName: 'test-step',
input: new Uint8Array([2]),
}),
]);

const fulfilled = results.filter((r) => r.status === 'fulfilled');
const rejected = results.filter((r) => r.status === 'rejected');
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect((rejected[0] as PromiseRejectedResult).reason).toMatchObject({
name: 'EntityConflictError',
});

// Verify only one step_created event exists in the log.
const evts = await events.list({
runId: testRunId,
pagination: {},
});
const stepCreated = evts.data.filter(
(e) =>
e.eventType === 'step_created' && e.correlationId === 'step_dup_1'
);
expect(stepCreated).toHaveLength(1);
});

it('should reject sequential duplicate step_created with EntityConflictError', async () => {
await createStep(events, testRunId, {
stepId: 'step_seq_dup',
stepName: 'test-step',
input: new Uint8Array(),
});
await expect(
createStep(events, testRunId, {
stepId: 'step_seq_dup',
stepName: 'test-step',
input: new Uint8Array(),
})
).rejects.toMatchObject({ name: 'EntityConflictError' });
});

it('should reject duplicate wait_created with EntityConflictError', async () => {
// Sequential duplicate wait_created — the wait_created insert path
// uses `INSERT ... onConflictDoNothing()` plus an existence check, so
// the second insert is silently dropped at the SQL level. The unique
// partial index on workflow_events still provides a stronger
// concurrent guarantee here, and the storage layer translates the
// resulting unique-violation into an EntityConflictError matching the
// step_created behavior.
await events.create(testRunId, {
eventType: 'wait_created',
correlationId: 'wait_seq_dup',
eventData: { resumeAt: new Date('2099-01-01') },
});
await expect(
events.create(testRunId, {
eventType: 'wait_created',
correlationId: 'wait_seq_dup',
eventData: { resumeAt: new Date('2099-01-02') },
})
).rejects.toMatchObject({ name: 'EntityConflictError' });

// Mirror the step_created test: assert exactly one wait_created
// event landed in the log, so a regression that allowed both
// inserts through would fail this test even if the second
// insert's translation to EntityConflictError still worked.
const evts = await events.list({
runId: testRunId,
pagination: {},
});
const waitCreated = evts.data.filter(
(e) =>
e.eventType === 'wait_created' && e.correlationId === 'wait_seq_dup'
);
expect(waitCreated).toHaveLength(1);
});
});

describe('step terminal state validation', () => {
let testRunId: string;

Expand Down
Loading