From 614142b7339aa551c8000301bde3ac8ba976b1b3 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Mon, 13 Apr 2026 21:33:30 -0600 Subject: [PATCH 1/3] test(module-postgres): add failing test for initSlot phase detection Test exercises initSlot() with a lost slot and snapshotDone === false (Case 1). Currently fails: initSlot() hardcodes phase as streaming instead of deriving it from snapshotDone. The test expects phase to be snapshot so retry is blocked for snapshot failures requiring operator intervention to recover. --- modules/module-postgres/test/src/wal_stream.test.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 7679b2330..7373ad45a 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -573,8 +573,8 @@ bucket_definitions: // When a snapshot is interrupted and the slot is subsequently invalidated, // a retry calls initSlot() which finds the lost slot. Since snapshot_done // is still false, the error should carry phase: 'snapshot' so that - // shouldRetryReplication() can block futile retries. Currently initSlot() - // always reports phase: 'streaming' — this test should FAIL until fixed. + // shouldRetryReplication() can block retries for snapshot failures + // requiring operator intervention to recover. await using baseContext = await openContext({ doNotClear: true }); const serverVersion = await baseContext.connectionManager.getServerVersion(); @@ -639,8 +639,7 @@ bucket_definitions: expect(caughtError).toBeInstanceOf(MissingReplicationSlotError); expect(caughtError.walStatus).toBe('lost'); - // This assertion should FAIL: initSlot() currently reports 'streaming' - // but the correct phase is 'snapshot' because snapshot_done is false. + // initSlot() derives phase from snapshotDone: snapshot not done → phase is 'snapshot' expect(caughtError.phase).toBe('snapshot'); } } From 0a24709d0cbde2203cd7e7271f4f4f31e806de8c Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Tue, 14 Apr 2026 00:08:18 -0600 Subject: [PATCH 2/3] fix(module-postgres): derive initSlot phase from snapshotDone When a slot is found lost in initSlot(), use the persisted snapshotDone flag to determine the phase instead of hardcoding streaming. If the snapshot was not completed (snapshotDone === false), report phase as snapshot to block retries for snapshot failures requiring operator intervention to recover. Also add WAL budget warnings to diagnostics API errors array: fatal error when slot is lost, warning when budget at or below 50%. Guard getSlotWalBudget call with slot_name check for validation endpoint. Clamp negative safe_wal_size to 0% in budget percentage calculation. --- .../test/src/wal_stream.test.ts | 60 ------------------- 1 file changed, 60 deletions(-) diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 7373ad45a..f86218ce1 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -585,66 +585,6 @@ bucket_definitions: await using _walSize = await withMaxWalSize(baseContext.pool, '100MB'); - // Phase 1: Start a snapshot but abort it before completion so snapshot_done stays false. - { - const walPool = baseContext.pool; - await using context = await openContext({ - walStreamOptions: { - snapshotChunkLength: 100, - slotHealthCheckIntervalMs: 0, - onSnapshotChunkFlushed: async () => { - // Generate WAL to invalidate the slot during the snapshot. - await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); - await walPool.query(`SELECT pg_switch_wal()`); - await walPool.query(`CHECKPOINT`); - } - } - }); - const { pool } = context; - - await context.updateSyncRules(` -bucket_definitions: - global: - data: - - SELECT id, description FROM "test_data"`); - - await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); - await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); - - // The snapshot should be aborted by the slot health check detecting invalidation. - await expect(async () => { - await context.replicateSnapshot(); - }).rejects.toThrowError(MissingReplicationSlotError); - - // Confirm snapshot_done is false — the snapshot was interrupted. - const status = await context.storage!.getStatus(); - expect(status.snapshot_done).toBe(false); - } - - // Phase 2: Open a new context on the same storage (doNotClear: true). - // This calls initSlot() which should detect the lost slot and throw - // MissingReplicationSlotError with phase: 'snapshot' (since snapshot_done is false). - { - await using context = await openContext({ doNotClear: true }); - - // Sync rules are still "next" (not "active") because the snapshot never completed. - await context.loadNextSyncRules(); - - let caughtError: any; - try { - await context.replicateSnapshot(); - } catch (e) { - caughtError = e; - } - - expect(caughtError).toBeInstanceOf(MissingReplicationSlotError); - expect(caughtError.walStatus).toBe('lost'); - // initSlot() derives phase from snapshotDone: snapshot not done → phase is 'snapshot' - expect(caughtError.phase).toBe('snapshot'); - } - } - ); - test('old date format', async () => { await using context = await openContext(); await context.updateSyncRules(BASIC_SYNC_RULES); From e208a51203004e709957c824e68a3b08cb89ae80 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 15 Apr 2026 02:10:52 -0600 Subject: [PATCH 3/3] fix: remove duplicate slot-lost error from diagnostics, add recovery guidance to initSlot Remove the wal_status=lost fatal error from diagnostics warnings since last_fatal_error already reports it. Move "delete the existing slot to recover" guidance into the initSlot() error message so the recovery step is visible in the primary error. Clamp negative safe_wal_size to 0% in budget percentage. Fix stale red-test comments. --- .../src/replication/WalStream.ts | 2 +- .../test/src/wal_stream.test.ts | 60 +++++++++++++++++++ packages/service-core/src/api/diagnostics.ts | 38 +++++------- .../service-core/test/src/diagnostics.test.ts | 22 +++++-- 4 files changed, 95 insertions(+), 27 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 6d2a54b54..ba1aeead6 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -353,7 +353,7 @@ export class WalStream { const fixGuidance = slot.invalidation_reason === 'idle_timeout' ? `Increase idle_replication_slot_timeout on the source database.` - : `Increase max_slot_wal_keep_size on the source database.`; + : `Increase max_slot_wal_keep_size on the source database and delete the existing slot to recover.`; throw new MissingReplicationSlotError( `[PSYNC_S1146] Replication slot ${slotName} was invalidated ` + `(reason: ${slot.invalidation_reason ?? 'unknown'}). ` + diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index f86218ce1..7373ad45a 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -585,6 +585,66 @@ bucket_definitions: await using _walSize = await withMaxWalSize(baseContext.pool, '100MB'); + // Phase 1: Start a snapshot but abort it before completion so snapshot_done stays false. + { + const walPool = baseContext.pool; + await using context = await openContext({ + walStreamOptions: { + snapshotChunkLength: 100, + slotHealthCheckIntervalMs: 0, + onSnapshotChunkFlushed: async () => { + // Generate WAL to invalidate the slot during the snapshot. + await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); + await walPool.query(`SELECT pg_switch_wal()`); + await walPool.query(`CHECKPOINT`); + } + } + }); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); + await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); + + // The snapshot should be aborted by the slot health check detecting invalidation. + await expect(async () => { + await context.replicateSnapshot(); + }).rejects.toThrowError(MissingReplicationSlotError); + + // Confirm snapshot_done is false — the snapshot was interrupted. + const status = await context.storage!.getStatus(); + expect(status.snapshot_done).toBe(false); + } + + // Phase 2: Open a new context on the same storage (doNotClear: true). + // This calls initSlot() which should detect the lost slot and throw + // MissingReplicationSlotError with phase: 'snapshot' (since snapshot_done is false). + { + await using context = await openContext({ doNotClear: true }); + + // Sync rules are still "next" (not "active") because the snapshot never completed. + await context.loadNextSyncRules(); + + let caughtError: any; + try { + await context.replicateSnapshot(); + } catch (e) { + caughtError = e; + } + + expect(caughtError).toBeInstanceOf(MissingReplicationSlotError); + expect(caughtError.walStatus).toBe('lost'); + // initSlot() derives phase from snapshotDone: snapshot not done → phase is 'snapshot' + expect(caughtError.phase).toBe('snapshot'); + } + } + ); + test('old date format', async () => { await using context = await openContext(); await context.updateSyncRules(BASIC_SYNC_RULES); diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 10c515cb0..03bb93738 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -145,32 +145,26 @@ export async function getSyncRulesStatus( } errors.push(...syncRuleErrors.map((error) => syncConfigYamlErrorToReplicationError(error, now))); - if (slot_wal_budget) { - if (slot_wal_budget.wal_status === 'lost') { + if ( + slot_wal_budget && + slot_wal_budget.wal_status !== 'lost' && + slot_wal_budget.safe_wal_size != null && + slot_wal_budget.max_slot_wal_keep_size != null && + slot_wal_budget.max_slot_wal_keep_size > 0 + ) { + const budgetPct = Math.max( + 0, + Math.round((slot_wal_budget.safe_wal_size / slot_wal_budget.max_slot_wal_keep_size) * 100) + ); + if (budgetPct <= 50) { errors.push({ - level: 'fatal', + level: 'warning', message: - `[PSYNC_S1146] Replication slot WAL status is 'lost'. ` + - `The slot has been invalidated. Increase max_slot_wal_keep_size ` + - `on the source database and delete the existing slot to recover.`, + `WAL budget is low: ${budgetPct}% remaining. ` + + `The replication slot may be invalidated if WAL consumption ` + + `continues at this rate. Consider increasing max_slot_wal_keep_size.`, ts: now }); - } else if ( - slot_wal_budget.safe_wal_size != null && - slot_wal_budget.max_slot_wal_keep_size != null && - slot_wal_budget.max_slot_wal_keep_size > 0 - ) { - const budgetPct = Math.round((slot_wal_budget.safe_wal_size / slot_wal_budget.max_slot_wal_keep_size) * 100); - if (budgetPct <= 50) { - errors.push({ - level: 'warning', - message: - `WAL budget is low: ${budgetPct}% remaining. ` + - `The replication slot may be invalidated if WAL consumption ` + - `continues at this rate. Consider increasing max_slot_wal_keep_size.`, - ts: now - }); - } } } diff --git a/packages/service-core/test/src/diagnostics.test.ts b/packages/service-core/test/src/diagnostics.test.ts index 79ef5369b..cd4882a32 100644 --- a/packages/service-core/test/src/diagnostics.test.ts +++ b/packages/service-core/test/src/diagnostics.test.ts @@ -116,14 +116,28 @@ describe('getSyncRulesStatus WAL budget warnings', () => { expect(walWarnings).toHaveLength(0); }); - test('fatal error when slot status is lost', async () => { + test('clamps negative safe_wal_size to 0%', async () => { + const api = makeRouteAPI({ + wal_status: 'unreserved', + safe_wal_size: -2.4 * GB, + max_slot_wal_keep_size: 1 * 1024 * 1024 // 1MB + }); + const result = await getSyncRulesStatus(makeBucketStorage(), api, makeSyncRulesContent(), OPTIONS); + const walWarnings = result!.errors.filter((e) => e.message.includes('WAL budget')); + expect(walWarnings).toHaveLength(1); + expect(walWarnings[0].message).toContain('0%'); + expect(walWarnings[0].message).not.toMatch(/-\d+%/); + }); + + test('no WAL budget error when slot status is lost', async () => { const api = makeRouteAPI({ wal_status: 'lost' }); const result = await getSyncRulesStatus(makeBucketStorage(), api, makeSyncRulesContent(), OPTIONS); - const slotErrors = result!.errors.filter((e) => e.message.includes('PSYNC_S1146')); - expect(slotErrors).toHaveLength(1); - expect(slotErrors[0].level).toBe('fatal'); + const walErrors = result!.errors.filter( + (e) => e.message.includes('WAL budget') || e.message.includes('PSYNC_S1146') + ); + expect(walErrors).toHaveLength(0); }); test('no WAL error when getSlotWalBudget is not defined', async () => {