From f7d42b347b8017709529dd5675dc455d31d26937 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Fri, 6 Mar 2026 18:21:19 -0600 Subject: [PATCH 01/12] test(module-postgres): add failing tests for mid-snapshot WAL slot invalidation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add onSnapshotChunkFlushed hook to WalStreamOptions for deterministic test-time WAL generation during snapshot chunk processing. Two new tests: - "slot lost during snapshot aborts early" — verifies snapshot aborts when slot is invalidated mid-flight (currently fails: no slot health check) - "slot invalidation error carries diagnostic context" — verifies error carries walStatus/phase fields (currently fails: properties not yet added) Both tests are expected to fail until the detection feature is implemented. --- .../src/replication/WalStream.ts | 12 +++ .../test/src/wal_stream.test.ts | 96 +++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 094d37eab..11dc1ba9b 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -61,6 +61,13 @@ export interface WalStreamOptions { * Note that queries are streamed, so we don't actually keep that much data in memory. */ snapshotChunkLength?: number; + + /** + * Called after each snapshot chunk is flushed, for testing. + * This allows tests to perform actions (like generating WAL) synchronously + * with the snapshot's chunk processing. + */ + onSnapshotChunkFlushed?: () => Promise; } interface InitResult { @@ -133,6 +140,7 @@ export class WalStream { private startedStreaming = false; private snapshotChunkLength: number; + private onSnapshotChunkFlushed?: () => Promise; private replicationLag = new ReplicationLagTracker(); @@ -147,6 +155,7 @@ export class WalStream { this.slot_name = options.storage.slot_name; this.connections = options.connections; this.snapshotChunkLength = options.snapshotChunkLength ?? 10_000; + this.onSnapshotChunkFlushed = options.onSnapshotChunkFlushed; this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -597,6 +606,9 @@ WHERE oid = $1::regclass`, // Important: flush before marking progress await batch.flush(); + if (this.onSnapshotChunkFlushed) { + await this.onSnapshotChunkFlushed(); + } if (limited == null) { let lastKey: Uint8Array | undefined; if (q instanceof ChunkedSnapshotQuery) { diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 5dd6aee65..a69bed3d7 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -470,6 +470,102 @@ bucket_definitions: } }); + test('slot lost during snapshot aborts early', { timeout: 120_000 }, async () => { + await using baseContext = await openContext({ doNotClear: true }); + + const serverVersion = await baseContext.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`max_slot_wal_keep_size not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + await using _walSize = await withMaxWalSize(baseContext.pool, '100MB'); + + const walPool = baseContext.pool; + await using context = await openContext({ + walStreamOptions: { + snapshotChunkLength: 100, + onSnapshotChunkFlushed: async () => { + // Generate ~16MB WAL per call. Slot lost after ~7 chunks. + await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); + await walPool.query(`SELECT pg_switch_wal()`); + await walPool.query(`CHECKPOINT`); + } + } + }); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query( + `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` + ); + await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); + + // The hook generates WAL synchronously after each chunk flush. + // Post-implementation, the WAL check (at the same injection point) + // detects slot invalidation and aborts the snapshot. + await expect(async () => { + await context.replicateSnapshot(); + }).rejects.toThrowError(MissingReplicationSlotError); + + // Snapshot should NOT be marked as complete + const status = await context.storage!.getStatus(); + expect(status.snapshot_done).toBe(false); + }); + + test('slot invalidation error carries diagnostic context', { timeout: 120_000 }, async () => { + await using baseContext = await openContext({ doNotClear: true }); + + const serverVersion = await baseContext.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`max_slot_wal_keep_size not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + await using _walSize = await withMaxWalSize(baseContext.pool, '100MB'); + + const walPool = baseContext.pool; + await using context = await openContext({ + walStreamOptions: { + snapshotChunkLength: 100, + onSnapshotChunkFlushed: async () => { + await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); + await walPool.query(`SELECT pg_switch_wal()`); + await walPool.query(`CHECKPOINT`); + } + } + }); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query( + `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` + ); + await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); + + let caughtError: any; + try { + await context.replicateSnapshot(); + } catch (e) { + caughtError = e; + } + + // Error should be a MissingReplicationSlotError with diagnostic context + expect(caughtError).toBeInstanceOf(MissingReplicationSlotError); + expect(caughtError.walStatus).toBe('lost'); + expect(caughtError.phase).toBe('snapshot'); + }); + test('old date format', async () => { await using context = await openContext(); await context.updateSyncRules(BASIC_SYNC_RULES); From f62b55c40918d216112b0312817fc6932724eb87 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Tue, 10 Mar 2026 02:24:47 -0600 Subject: [PATCH 02/12] test(module-postgres): add shouldRetryReplication() stub and failing unit tests SlotInvalidationContext interface and shouldRetryReplication() stub (always returns true) added to WalStream.ts. Four pure unit tests cover the retry decision matrix: block retry when slot lost during snapshot, allow retry for streaming/missing/rows_removed. Test 1 fails as expected (stub returns true, test expects false). Implementation comes in a later commit. --- .../src/replication/WalStream.ts | 20 +++++++++++ .../test/src/replication_retry.test.ts | 33 +++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 modules/module-postgres/test/src/replication_retry.test.ts diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 11dc1ba9b..a1a696f62 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -114,6 +114,26 @@ export class MissingReplicationSlotError extends Error { } } +export interface SlotInvalidationContext { + walStatus: string; // 'lost' | 'reserved' | 'extended' | 'missing' | ... + phase: 'snapshot' | 'streaming'; + invalidationReason?: string; // PG 14+: 'wal_removed', 'rows_removed', etc. +} + +/** + * Determines whether replication should be retried after a slot invalidation. + * + * Returns false when retry would be futile (e.g. slot lost during snapshot + * due to WAL budget exhaustion — retrying would repeat the same long snapshot + * and likely fail again). + * + * This is a stub that always returns true, preserving existing retry-everything + * behavior. The real implementation comes in a later spec. + */ +export function shouldRetryReplication(_context: SlotInvalidationContext): boolean { + return true; +} + export class WalStream { sync_rules: HydratedSyncRules; group_id: number; diff --git a/modules/module-postgres/test/src/replication_retry.test.ts b/modules/module-postgres/test/src/replication_retry.test.ts new file mode 100644 index 000000000..b8a401c5c --- /dev/null +++ b/modules/module-postgres/test/src/replication_retry.test.ts @@ -0,0 +1,33 @@ +import { shouldRetryReplication } from '@module/replication/WalStream.js'; +import { describe, expect, test } from 'vitest'; + +describe('shouldRetryReplication', () => { + test('blocks retry when slot lost during snapshot', () => { + expect(shouldRetryReplication({ + walStatus: 'lost', + phase: 'snapshot' + })).toBe(false); + }); + + test('allows retry when slot lost during streaming', () => { + expect(shouldRetryReplication({ + walStatus: 'lost', + phase: 'streaming' + })).toBe(true); + }); + + test('allows retry when slot is missing', () => { + expect(shouldRetryReplication({ + walStatus: 'missing', + phase: 'snapshot' + })).toBe(true); + }); + + test('allows retry when invalidation reason is rows_removed', () => { + expect(shouldRetryReplication({ + walStatus: 'lost', + phase: 'snapshot', + invalidationReason: 'rows_removed' + })).toBe(true); + }); +}); From 176633613b7add4cb0e097bf58a6d3088b2b849f Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Tue, 10 Mar 2026 14:55:15 -0600 Subject: [PATCH 03/12] feat(module-postgres): detect WAL slot invalidation during snapshot Add checkSlotHealth() to WalStream that queries pg_replication_slots after each chunk flush. Throws MissingReplicationSlotError with phase and walStatus metadata so shouldRetryReplication() can distinguish snapshot-phase invalidation (non-retryable) from streaming (retryable). --- .../src/replication/WalStream.ts | 48 +++++++++++++++++-- .../test/src/replication_retry.test.ts | 42 +++++++++------- .../test/src/wal_stream.test.ts | 8 +--- 3 files changed, 71 insertions(+), 27 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index a1a696f62..108602048 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -107,10 +107,16 @@ export const sendKeepAlive = async (db: pgwire.PgClient) => { }; export class MissingReplicationSlotError extends Error { - constructor(message: string, cause?: any) { - super(message); + walStatus?: string; + phase?: 'snapshot' | 'streaming'; - this.cause = cause; + constructor(message: string, options?: { cause?: any; walStatus?: string; phase?: 'snapshot' | 'streaming' }) { + super(message); + if (options) { + this.cause = options.cause; + this.walStatus = options.walStatus; + this.phase = options.phase; + } } } @@ -376,6 +382,39 @@ export class WalStream { } } + /** + * Check if the replication slot is still valid. Called after each chunk + * flush during snapshot to detect slot invalidation early. + * + * The query hits pg_replication_slots (shared memory, not a table scan) + * and costs ~1-2ms per round-trip — negligible next to the per-chunk + * storage flush. + */ + private async checkSlotHealth(): Promise { + const rows = pgwire.pgwireRows( + await this.connections.pool.query({ + statement: 'SELECT wal_status FROM pg_replication_slots WHERE slot_name = $1', + params: [{ type: 'varchar', value: this.slot_name }] + }) + ); + + if (rows.length === 0) { + // Slot disappeared entirely during snapshot + throw new MissingReplicationSlotError(`Replication slot ${this.slot_name} disappeared during snapshot`, { + walStatus: 'missing', + phase: 'snapshot' + }); + } + + const walStatus = rows[0].wal_status; + if (walStatus === 'lost') { + throw new MissingReplicationSlotError( + `Replication slot ${this.slot_name} was invalidated during snapshot (wal_status: lost)`, + { walStatus: 'lost', phase: 'snapshot' } + ); + } + } + async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { const results = await db.query({ statement: `SELECT reltuples::bigint AS estimate @@ -629,6 +668,7 @@ WHERE oid = $1::regclass`, if (this.onSnapshotChunkFlushed) { await this.onSnapshotChunkFlushed(); } + await this.checkSlotHealth(); if (limited == null) { let lastKey: Uint8Array | undefined; if (q instanceof ChunkedSnapshotQuery) { @@ -884,7 +924,7 @@ WHERE oid = $1::regclass`, await this.streamChangesInternal(replicationConnection); } catch (e) { if (isReplicationSlotInvalidError(e)) { - throw new MissingReplicationSlotError(e.message, e); + throw new MissingReplicationSlotError(e.message, { cause: e }); } throw e; } diff --git a/modules/module-postgres/test/src/replication_retry.test.ts b/modules/module-postgres/test/src/replication_retry.test.ts index b8a401c5c..fcb2e59d5 100644 --- a/modules/module-postgres/test/src/replication_retry.test.ts +++ b/modules/module-postgres/test/src/replication_retry.test.ts @@ -3,31 +3,39 @@ import { describe, expect, test } from 'vitest'; describe('shouldRetryReplication', () => { test('blocks retry when slot lost during snapshot', () => { - expect(shouldRetryReplication({ - walStatus: 'lost', - phase: 'snapshot' - })).toBe(false); + expect( + shouldRetryReplication({ + walStatus: 'lost', + phase: 'snapshot' + }) + ).toBe(false); }); test('allows retry when slot lost during streaming', () => { - expect(shouldRetryReplication({ - walStatus: 'lost', - phase: 'streaming' - })).toBe(true); + expect( + shouldRetryReplication({ + walStatus: 'lost', + phase: 'streaming' + }) + ).toBe(true); }); test('allows retry when slot is missing', () => { - expect(shouldRetryReplication({ - walStatus: 'missing', - phase: 'snapshot' - })).toBe(true); + expect( + shouldRetryReplication({ + walStatus: 'missing', + phase: 'snapshot' + }) + ).toBe(true); }); test('allows retry when invalidation reason is rows_removed', () => { - expect(shouldRetryReplication({ - walStatus: 'lost', - phase: 'snapshot', - invalidationReason: 'rows_removed' - })).toBe(true); + expect( + shouldRetryReplication({ + walStatus: 'lost', + phase: 'snapshot', + invalidationReason: 'rows_removed' + }) + ).toBe(true); }); }); diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index a69bed3d7..24f92ef42 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -501,9 +501,7 @@ bucket_definitions: data: - SELECT id, description FROM "test_data"`); - await pool.query( - `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` - ); + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); // The hook generates WAL synchronously after each chunk flush. @@ -548,9 +546,7 @@ bucket_definitions: data: - SELECT id, description FROM "test_data"`); - await pool.query( - `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` - ); + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); let caughtError: any; From 948b0755833d87a94e34b558ff081f9bc5a89a8e Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 11 Mar 2026 01:50:42 -0600 Subject: [PATCH 04/12] feat(module-postgres): implement conditional retry for WAL slot invalidation Block replication retry when slot is invalidated (wal_status=lost) during snapshot phase, since retrying would repeat the same long snapshot and likely fail again. Allow retry for streaming phase, missing slots, and rows_removed invalidation reason. --- .../src/replication/WalStream.ts | 18 ++++++++++++++---- .../src/replication/WalStreamReplicationJob.ts | 14 +++++++++++--- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 108602048..03230a7dc 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -133,10 +133,14 @@ export interface SlotInvalidationContext { * due to WAL budget exhaustion — retrying would repeat the same long snapshot * and likely fail again). * - * This is a stub that always returns true, preserving existing retry-everything - * behavior. The real implementation comes in a later spec. + * Blocks retry when walStatus is 'lost' during snapshot phase (unless the + * invalidation reason is 'rows_removed', which is not a WAL budget issue). + * Allows retry in all other cases. */ -export function shouldRetryReplication(_context: SlotInvalidationContext): boolean { +export function shouldRetryReplication(context: SlotInvalidationContext): boolean { + if (context.walStatus === 'lost' && context.phase === 'snapshot' && context.invalidationReason !== 'rows_removed') { + return false; + } return true; } @@ -399,7 +403,8 @@ export class WalStream { ); if (rows.length === 0) { - // Slot disappeared entirely during snapshot + // Slot row gone from pg_replication_slots — dropped externally. + // Possible causes: pg_drop_replication_slot() call (operator, management tool, cleanup cron) throw new MissingReplicationSlotError(`Replication slot ${this.slot_name} disappeared during snapshot`, { walStatus: 'missing', phase: 'snapshot' @@ -408,6 +413,11 @@ export class WalStream { const walStatus = rows[0].wal_status; if (walStatus === 'lost') { + // Postgres marked the slot invalid. Possible invalidation_reason values (PG 14+): + // - wal_removed: WAL growth exceeded max_slot_wal_keep_size (the primary case) + // - wal_level_insufficient: wal_level changed away from 'logical' + // - idle_timeout (PG 18+): idle_replication_slot_timeout expired + // - rows_removed: catalog rows needed by the slot were vacuumed (safe to retry — fresh slot won't need them) throw new MissingReplicationSlotError( `Replication slot ${this.slot_name} was invalidated during snapshot (wal_status: lost)`, { walStatus: 'lost', phase: 'snapshot' } diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index d715b50c6..75cb9a3ff 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -1,6 +1,6 @@ import { container, logger } from '@powersync/lib-services-framework'; import { PgManager } from './PgManager.js'; -import { MissingReplicationSlotError, sendKeepAlive, WalStream } from './WalStream.js'; +import { MissingReplicationSlotError, sendKeepAlive, shouldRetryReplication, WalStream } from './WalStream.js'; import { replication } from '@powersync/service-core'; import { getApplicationName } from '../utils/application-name.js'; @@ -91,8 +91,16 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob } if (e instanceof MissingReplicationSlotError) { - // This stops replication on this slot and restarts with a new slot - await this.options.storage.factory.restartReplication(this.storage.group_id); + if ( + shouldRetryReplication({ + walStatus: e.walStatus ?? 'missing', + phase: e.phase ?? 'streaming', + invalidationReason: (e as any).invalidationReason + }) + ) { + // This stops replication on this slot and restarts with a new slot + await this.options.storage.factory.restartReplication(this.storage.group_id); + } } // No need to rethrow - the error is already logged, and retry behavior is the same on error From a5d12887e7957ac0206a56f16336345c8c85190f Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 11 Mar 2026 02:22:28 -0600 Subject: [PATCH 05/12] feat(module-postgres): add WAL budget reporting during snapshot Time-throttled logging of WAL budget remaining, consumption rate, and ETA during snapshot. Warns at 50% budget. Exported pure functions for budget computation and formatting. Fixed PG <13 compat in checkSlotHealth() query. --- .../src/replication/WalStream.ts | 187 +++++++++++++++++- .../test/src/wal_budget.test.ts | 183 +++++++++++++++++ 2 files changed, 369 insertions(+), 1 deletion(-) create mode 100644 modules/module-postgres/test/src/wal_budget.test.ts diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 03230a7dc..0a3bb5d03 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -68,6 +68,13 @@ export interface WalStreamOptions { * with the snapshot's chunk processing. */ onSnapshotChunkFlushed?: () => Promise; + + /** + * Interval for WAL budget log messages during snapshot, in milliseconds. + * Set to 0 for every-chunk logging (useful for debugging). + * Defaults to 120_000 (2 minutes). + */ + walBudgetLogIntervalMs?: number; } interface InitResult { @@ -144,6 +151,90 @@ export function shouldRetryReplication(context: SlotInvalidationContext): boolea return true; } +export function formatBytes(bytes: number): string { + if (bytes >= 1024 * 1024 * 1024) { + return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)}GB`; + } else if (bytes >= 1024 * 1024) { + return `${(bytes / (1024 * 1024)).toFixed(1)}MB`; + } else if (bytes >= 1024) { + return `${(bytes / 1024).toFixed(1)}KB`; + } + return `${bytes}B`; +} + +export function formatDuration(hours: number): string { + if (hours >= 24) { + return `${(hours / 24).toFixed(1)} days`; + } else if (hours >= 1) { + return `${hours.toFixed(1)} hours`; + } + return `${Math.round(hours * 60)} minutes`; +} + +export interface WalBudgetSample { + safeWalSize: number; + timestamp: number; +} + +export interface WalBudgetReport { + budgetRemainingPct: number; + safeWalSize: number; + maxSize: number; + walStatus: string; + ratePerHour: number | null; + etaHours: number | null; + isWarning: boolean; +} + +export function computeWalBudgetReport(opts: { + safeWalSize: number; + maxSize: number; + walStatus: string; + prevSample: WalBudgetSample | null; + now: number; +}): WalBudgetReport { + const budgetRemainingPct = Math.round((opts.safeWalSize / opts.maxSize) * 100); + + let ratePerHour: number | null = null; + let etaHours: number | null = null; + + if (opts.prevSample != null) { + const elapsedMs = opts.now - opts.prevSample.timestamp; + const consumed = opts.prevSample.safeWalSize - opts.safeWalSize; + if (elapsedMs > 0 && consumed > 0) { + ratePerHour = (consumed / elapsedMs) * 3_600_000; + const eta = opts.safeWalSize / ratePerHour; + etaHours = eta < 48 ? eta : null; + } + } + + return { + budgetRemainingPct, + safeWalSize: opts.safeWalSize, + maxSize: opts.maxSize, + walStatus: opts.walStatus, + ratePerHour, + etaHours, + isWarning: budgetRemainingPct <= 50 + }; +} + +export function formatWalBudgetLine(report: WalBudgetReport): string { + let line = + `WAL budget: ${formatBytes(report.safeWalSize)} remaining of ` + + `${formatBytes(report.maxSize)} limit (${report.budgetRemainingPct}% remaining).`; + + if (report.ratePerHour != null) { + line += ` WAL consumption: ~${formatBytes(report.ratePerHour)}/hr.`; + if (report.etaHours != null) { + line += ` ETA to exhaustion: ~${formatDuration(report.etaHours)}.`; + } + } + + line += ` Slot status: ${report.walStatus}.`; + return line; +} + export class WalStream { sync_rules: HydratedSyncRules; group_id: number; @@ -176,6 +267,19 @@ export class WalStream { private initialSnapshotPromise: Promise | null = null; + private walBudgetLogIntervalMs: number; + + /** Cached max_slot_wal_keep_size in bytes. null = not yet queried. */ + private maxSlotWalKeepSize: number | null = null; + /** Whether max_slot_wal_keep_size has been queried (distinguishes "not + * queried" from "queried and found unlimited"). */ + private maxSlotWalKeepSizeQueried = false; + + /** Previous safe_wal_size sample for rate calculation. */ + private prevWalBudgetSample: { safeWalSize: number; timestamp: number } | null = null; + /** Timestamp of last WAL budget log message. */ + private lastWalBudgetLogTime = 0; + constructor(options: WalStreamOptions) { this.logger = options.logger ?? defaultLogger; this.storage = options.storage; @@ -186,6 +290,7 @@ export class WalStream { this.connections = options.connections; this.snapshotChunkLength = options.snapshotChunkLength ?? 10_000; this.onSnapshotChunkFlushed = options.onSnapshotChunkFlushed; + this.walBudgetLogIntervalMs = options.walBudgetLogIntervalMs ?? 120_000; this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -386,6 +491,79 @@ export class WalStream { } } + private async queryMaxSlotWalKeepSize(): Promise { + if (this.maxSlotWalKeepSizeQueried) { + return this.maxSlotWalKeepSize; + } + this.maxSlotWalKeepSizeQueried = true; + + try { + const rows = pgwire.pgwireRows( + await this.connections.pool.query({ + statement: `SELECT setting, unit FROM pg_settings WHERE name = 'max_slot_wal_keep_size'` + }) + ); + if (rows.length === 0) { + // PG < 13 or setting doesn't exist + return null; + } + const setting = Number(rows[0].setting); + if (setting < 0) { + // -1 = unlimited + this.maxSlotWalKeepSize = null; + return null; + } + // setting is in MB, convert to bytes + const unit = rows[0].unit; + const multiplier = unit === 'kB' ? 1024 : unit === '8kB' ? 8192 : 1024 * 1024; // default MB + this.maxSlotWalKeepSize = setting * multiplier; + return this.maxSlotWalKeepSize; + } catch (e) { + // Non-fatal — budget reporting is best-effort + this.logger.warn(`Could not query max_slot_wal_keep_size`, e); + return null; + } + } + + private async logWalBudget(slot: { safe_wal_size?: any; wal_status?: string }, now: number): Promise { + const maxSize = await this.queryMaxSlotWalKeepSize(); + + // No limit configured + if (maxSize == null) { + this.logger.info(`WAL budget: no limit configured (max_slot_wal_keep_size is unlimited).`); + return; + } + + const safeWalSize = slot.safe_wal_size != null ? Number(slot.safe_wal_size) : null; + if (safeWalSize == null) { + // safe_wal_size is null on PG < 13, or when no limit is set + return; + } + + const report = computeWalBudgetReport({ + safeWalSize, + maxSize, + walStatus: slot.wal_status ?? 'unknown', + prevSample: this.prevWalBudgetSample, + now + }); + + // Update sample for next rate calculation + this.prevWalBudgetSample = { safeWalSize, timestamp: now }; + + const budgetLine = formatWalBudgetLine(report); + + if (report.isWarning) { + this.logger.warn(budgetLine); + this.logger.warn( + `Replication slot may be invalidated before snapshot completes. ` + + `Increase max_slot_wal_keep_size on the source database.` + ); + } else { + this.logger.info(budgetLine); + } + } + /** * Check if the replication slot is still valid. Called after each chunk * flush during snapshot to detect slot invalidation early. @@ -397,7 +575,7 @@ export class WalStream { private async checkSlotHealth(): Promise { const rows = pgwire.pgwireRows( await this.connections.pool.query({ - statement: 'SELECT wal_status FROM pg_replication_slots WHERE slot_name = $1', + statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', params: [{ type: 'varchar', value: this.slot_name }] }) ); @@ -423,6 +601,13 @@ export class WalStream { { walStatus: 'lost', phase: 'snapshot' } ); } + + // WAL budget reporting (time-throttled) + const now = performance.now(); + if (now - this.lastWalBudgetLogTime >= this.walBudgetLogIntervalMs) { + this.lastWalBudgetLogTime = now; + await this.logWalBudget(rows[0], now); + } } async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { diff --git a/modules/module-postgres/test/src/wal_budget.test.ts b/modules/module-postgres/test/src/wal_budget.test.ts new file mode 100644 index 000000000..b9971cba6 --- /dev/null +++ b/modules/module-postgres/test/src/wal_budget.test.ts @@ -0,0 +1,183 @@ +import { + formatBytes, + formatDuration, + computeWalBudgetReport, + formatWalBudgetLine +} from '@module/replication/WalStream.js'; +import { describe, expect, test } from 'vitest'; + +describe('formatBytes', () => { + test('formats bytes', () => { + expect(formatBytes(500)).toBe('500B'); + }); + + test('formats kilobytes', () => { + expect(formatBytes(1024)).toBe('1.0KB'); + expect(formatBytes(1536)).toBe('1.5KB'); + }); + + test('formats megabytes', () => { + expect(formatBytes(1024 * 1024)).toBe('1.0MB'); + expect(formatBytes(1.5 * 1024 * 1024)).toBe('1.5MB'); + }); + + test('formats gigabytes', () => { + expect(formatBytes(1024 * 1024 * 1024)).toBe('1.0GB'); + expect(formatBytes(10.5 * 1024 * 1024 * 1024)).toBe('10.5GB'); + }); +}); + +describe('formatDuration', () => { + test('formats minutes', () => { + expect(formatDuration(0.5)).toBe('30 minutes'); + }); + + test('formats hours', () => { + expect(formatDuration(1.0)).toBe('1.0 hours'); + expect(formatDuration(5.5)).toBe('5.5 hours'); + }); + + test('formats days', () => { + expect(formatDuration(24)).toBe('1.0 days'); + expect(formatDuration(48)).toBe('2.0 days'); + }); +}); + +describe('computeWalBudgetReport', () => { + const GB = 1024 * 1024 * 1024; + + test('computes budget percentage', () => { + const report = computeWalBudgetReport({ + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: null, + now: 1000 + }); + expect(report.budgetRemainingPct).toBe(80); + expect(report.isWarning).toBe(false); + }); + + test('warns at 50% threshold', () => { + const report = computeWalBudgetReport({ + safeWalSize: 5 * GB, + maxSize: 10 * GB, + walStatus: 'extended', + prevSample: null, + now: 1000 + }); + expect(report.budgetRemainingPct).toBe(50); + expect(report.isWarning).toBe(true); + }); + + test('warns below 50%', () => { + const report = computeWalBudgetReport({ + safeWalSize: 4.2 * GB, + maxSize: 10 * GB, + walStatus: 'extended', + prevSample: null, + now: 1000 + }); + expect(report.budgetRemainingPct).toBe(42); + expect(report.isWarning).toBe(true); + }); + + test('no rate without previous sample', () => { + const report = computeWalBudgetReport({ + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: null, + now: 1000 + }); + expect(report.ratePerHour).toBeNull(); + expect(report.etaHours).toBeNull(); + }); + + test('computes rate from two samples', () => { + const twoMinutesMs = 2 * 60 * 1000; + const report = computeWalBudgetReport({ + safeWalSize: 7 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: { safeWalSize: 8 * GB, timestamp: 0 }, + now: twoMinutesMs + }); + // 1 GB consumed in 2 minutes = 30 GB/hr + expect(report.ratePerHour).toBeCloseTo(30 * GB, -5); + // 7 GB remaining at 30 GB/hr ≈ 0.233 hours + expect(report.etaHours).toBeCloseTo(7 / 30, 3); + }); + + test('suppresses ETA when > 48 hours', () => { + const oneHourMs = 3_600_000; + const report = computeWalBudgetReport({ + safeWalSize: 9.99 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: { safeWalSize: 10 * GB, timestamp: 0 }, + now: oneHourMs + }); + // 0.01 GB/hr → ETA = 999 hours → suppressed + expect(report.ratePerHour).toBeCloseTo(0.01 * GB, -5); + expect(report.etaHours).toBeNull(); + }); + + test('null rate when no WAL consumed', () => { + const report = computeWalBudgetReport({ + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: { safeWalSize: 8 * GB, timestamp: 0 }, + now: 60_000 + }); + // safe_wal_size unchanged → consumed = 0 → no rate + expect(report.ratePerHour).toBeNull(); + expect(report.etaHours).toBeNull(); + }); +}); + +describe('formatWalBudgetLine', () => { + const GB = 1024 * 1024 * 1024; + + test('formats basic budget line without rate', () => { + const line = formatWalBudgetLine({ + budgetRemainingPct: 80, + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + ratePerHour: null, + etaHours: null, + isWarning: false + }); + expect(line).toBe('WAL budget: 8.0GB remaining of 10.0GB limit (80% remaining). Slot status: reserved.'); + }); + + test('formats budget line with rate and ETA', () => { + const line = formatWalBudgetLine({ + budgetRemainingPct: 65, + safeWalSize: 6.5 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + ratePerHour: 1.2 * GB, + etaHours: 5.4, + isWarning: false + }); + expect(line).toContain('WAL consumption: ~1.2GB/hr.'); + expect(line).toContain('ETA to exhaustion: ~5.4 hours.'); + }); + + test('formats budget line with rate but no ETA (suppressed)', () => { + const line = formatWalBudgetLine({ + budgetRemainingPct: 99, + safeWalSize: 9.9 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + ratePerHour: 0.01 * GB, + etaHours: null, + isWarning: false + }); + expect(line).toContain('WAL consumption:'); + expect(line).not.toContain('ETA to exhaustion'); + }); +}); From f58bd78ae3bf0b9af92793506ddbc96907b4a5db Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 11 Mar 2026 02:38:29 -0600 Subject: [PATCH 06/12] feat(module-postgres): add PSYNC_S1146 error code and actionable error messages Add error code, fix guidance, and docs link to slot invalidation errors in checkSlotHealth() and initSlot(). Include observed WAL budget context when available. Add test assertions for error code and docs link in both checkSlotHealth and initSlot error paths. --- .../src/replication/WalStream.ts | 42 ++++++++++++++++--- .../test/src/wal_stream.test.ts | 3 ++ packages/service-errors/src/codes.ts | 13 ++++++ 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 0a3bb5d03..c4c7acaed 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -471,7 +471,10 @@ export class WalStream { if (lost) { // Case 1 / 4 throw new MissingReplicationSlotError( - `Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}` + `[PSYNC_S1146] Replication slot ${slotName} was invalidated ` + + `(reason: ${slot.invalidation_reason ?? 'unknown'}). ` + + `Increase max_slot_wal_keep_size on the source database. ` + + `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated` ); } // Case 3 / 6 @@ -573,6 +576,9 @@ export class WalStream { * storage flush. */ private async checkSlotHealth(): Promise { + // Ensure maxSlotWalKeepSize is populated for diagnostic context in error messages + await this.queryMaxSlotWalKeepSize(); + const rows = pgwire.pgwireRows( await this.connections.pool.query({ statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', @@ -583,10 +589,11 @@ export class WalStream { if (rows.length === 0) { // Slot row gone from pg_replication_slots — dropped externally. // Possible causes: pg_drop_replication_slot() call (operator, management tool, cleanup cron) - throw new MissingReplicationSlotError(`Replication slot ${this.slot_name} disappeared during snapshot`, { - walStatus: 'missing', - phase: 'snapshot' - }); + throw new MissingReplicationSlotError( + `[PSYNC_S1146] Replication slot ${this.slot_name} disappeared during snapshot. ` + + `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated`, + { walStatus: 'missing', phase: 'snapshot' } + ); } const walStatus = rows[0].wal_status; @@ -597,7 +604,10 @@ export class WalStream { // - idle_timeout (PG 18+): idle_replication_slot_timeout expired // - rows_removed: catalog rows needed by the slot were vacuumed (safe to retry — fresh slot won't need them) throw new MissingReplicationSlotError( - `Replication slot ${this.slot_name} was invalidated during snapshot (wal_status: lost)`, + `[PSYNC_S1146] Replication slot ${this.slot_name} was invalidated during snapshot` + + `${this.formatWalBudgetContext()}. ` + + `Increase max_slot_wal_keep_size on the source database. ` + + `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated`, { walStatus: 'lost', phase: 'snapshot' } ); } @@ -610,6 +620,26 @@ export class WalStream { } } + private formatWalBudgetContext(): string { + if (this.maxSlotWalKeepSize == null) { + return ''; + } + const parts: string[] = []; + parts.push(` (limit: ${formatBytes(this.maxSlotWalKeepSize)})`); + + if (this.prevWalBudgetSample != null) { + const elapsed = performance.now() - this.prevWalBudgetSample.timestamp; + if (elapsed > 0 && this.prevWalBudgetSample.safeWalSize > 0) { + const elapsedHours = elapsed / 3_600_000; + if (elapsedHours >= 0.1) { + parts.push(`, exhausted in ~${formatDuration(elapsedHours)}`); + } + } + } + + return parts.join(''); + } + async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { const results = await db.query({ statement: `SELECT reltuples::bigint AS estimate diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 24f92ef42..790d04aa5 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -560,6 +560,9 @@ bucket_definitions: expect(caughtError).toBeInstanceOf(MissingReplicationSlotError); expect(caughtError.walStatus).toBe('lost'); expect(caughtError.phase).toBe('snapshot'); + expect(caughtError.message).toContain('PSYNC_S1146'); + expect(caughtError.message).toContain('docs.powersync.com'); + expect(caughtError.message).toContain('limit:'); }); test('old date format', async () => { diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index 1b17476c8..92bca2b22 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -186,6 +186,19 @@ export enum ErrorCode { */ PSYNC_S1145 = 'PSYNC_S1145', + /** + * Replication slot invalidated. + * + * The replication slot was invalidated by PostgreSQL, typically because + * WAL retention exceeded max_slot_wal_keep_size during a long-running + * snapshot. Increase max_slot_wal_keep_size on the source database and + * redeploy sync rules. + * + * Other causes: rows_removed (catalog rows needed by the slot were + * removed), wal_level_insufficient, idle_timeout (PG 18+). + */ + PSYNC_S1146 = 'PSYNC_S1146', + // ## PSYNC_S12xx: MySQL replication issues // ## PSYNC_S13xx: MongoDB replication issues From e74b110cd1e4062cce3cfe778e1f8bd619524967 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 11 Mar 2026 03:37:09 -0600 Subject: [PATCH 07/12] feat(diagnostics): surface WAL budget fields in diagnostics API Add wal_status, safe_wal_size, and max_slot_wal_keep_size to the SyncRulesStatus connection object. New optional getSlotWalBudget() on RouteAPI, implemented by PostgresRouteAPIAdapter. Fix withMaxWalSize() test helper to use its size parameter instead of hardcoding 100MB. --- .changeset/bright-walls-dance.md | 7 ++ .../src/api/PostgresRouteAPIAdapter.ts | 50 ++++++++++ .../test/src/wal_budget_api.test.ts | 96 +++++++++++++++++++ .../test/src/wal_stream_utils.ts | 6 +- packages/service-core/src/api/RouteAPI.ts | 17 ++++ packages/service-core/src/api/diagnostics.ts | 16 +++- packages/types/src/definitions.ts | 19 ++++ 7 files changed, 209 insertions(+), 2 deletions(-) create mode 100644 .changeset/bright-walls-dance.md create mode 100644 modules/module-postgres/test/src/wal_budget_api.test.ts diff --git a/.changeset/bright-walls-dance.md b/.changeset/bright-walls-dance.md new file mode 100644 index 000000000..2f822fe7b --- /dev/null +++ b/.changeset/bright-walls-dance.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-types': patch +'@powersync/service-core': patch +'@powersync/service-module-postgres': patch +--- + +Surface WAL budget fields (wal_status, safe_wal_size, max_slot_wal_keep_size) in the diagnostics API. diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index 353a8ec74..a1b76bcbb 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -184,6 +184,56 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`, }); } + async getSlotWalBudget(options: api.SlotWalBudgetOptions): Promise { + const { slotName } = options; + + // Query slot status + const slotResult = await lib_postgres.retriedQuery(this.pool, { + statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', + params: [{ type: 'varchar', value: slotName }] + }); + const [slot] = pgwire.pgwireRows(slotResult); + if (!slot) { + return undefined; + } + + const walStatus = slot.wal_status; + if (walStatus == null) { + // PG < 13 — wal_status column doesn't exist + return undefined; + } + + const result: api.SlotWalBudgetInfo = { + wal_status: String(walStatus) + }; + + // Query max_slot_wal_keep_size + try { + const settingsResult = await lib_postgres.retriedQuery(this.pool, { + statement: `SELECT setting, unit FROM pg_settings WHERE name = 'max_slot_wal_keep_size'` + }); + const [row] = pgwire.pgwireRows(settingsResult); + if (row) { + const setting = Number(row.setting); + if (setting >= 0) { + const unit = String(row.unit ?? 'MB'); + const multiplier = unit === 'kB' ? 1024 : unit === '8kB' ? 8192 : 1024 * 1024; // default MB + result.max_slot_wal_keep_size = setting * multiplier; + } + // setting < 0 means unlimited — leave undefined + } + } catch (e) { + // Best-effort — budget info is informational + } + + // safe_wal_size (PG 13+, only populated when max_slot_wal_keep_size is set) + if (slot.safe_wal_size != null) { + result.safe_wal_size = Number(slot.safe_wal_size); + } + + return result; + } + async getReplicationHead(): Promise { // On most Postgres versions, pg_logical_emit_message() returns the correct LSN. // However, on Aurora (Postgres compatible), it can return an entirely different LSN, diff --git a/modules/module-postgres/test/src/wal_budget_api.test.ts b/modules/module-postgres/test/src/wal_budget_api.test.ts new file mode 100644 index 000000000..dcd6a1072 --- /dev/null +++ b/modules/module-postgres/test/src/wal_budget_api.test.ts @@ -0,0 +1,96 @@ +import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js'; +import { describe, expect, test } from 'vitest'; +import { describeWithStorage, StorageVersionTestContext } from './util.js'; +import { WalStreamTestContext, withMaxWalSize } from './wal_stream_utils.js'; + +describe('getSlotWalBudget', () => { + describeWithStorage({ timeout: 20_000 }, defineWalBudgetTests); +}); + +function defineWalBudgetTests({ factory, storageVersion }: StorageVersionTestContext) { + const openContext = (options?: Parameters[1]) => { + return WalStreamTestContext.open(factory, { ...options, storageVersion }); + }; + + test('returns WAL budget with configured limit', async () => { + await using context = await openContext(); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); + + // Start replication to create the slot + await context.replicateSnapshot(); + + const serverVersion = await context.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`wal_status not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + await using _walSize = await withMaxWalSize(pool, '1GB'); + + const adapter = new PostgresRouteAPIAdapter(pool); + const budget = await adapter.getSlotWalBudget({ + slotName: context.storage!.slot_name + }); + + expect(budget).toBeDefined(); + expect(budget!.wal_status).toBeTypeOf('string'); + expect(budget!.safe_wal_size).toBeTypeOf('number'); + expect(budget!.max_slot_wal_keep_size).toBeTypeOf('number'); + expect(budget!.max_slot_wal_keep_size).toBeGreaterThan(0); + }); + + test('returns undefined max when unlimited', async () => { + await using context = await openContext(); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); + + await context.replicateSnapshot(); + + const serverVersion = await context.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`wal_status not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + // Default is -1 (unlimited) — ensure it's set + await using _walSize = await withMaxWalSize(pool, '-1'); + + const adapter = new PostgresRouteAPIAdapter(pool); + const budget = await adapter.getSlotWalBudget({ + slotName: context.storage!.slot_name + }); + + expect(budget).toBeDefined(); + expect(budget!.wal_status).toBeTypeOf('string'); + expect(budget!.max_slot_wal_keep_size).toBeUndefined(); + // safe_wal_size is null when no limit is configured + expect(budget!.safe_wal_size).toBeUndefined(); + }); + + test('returns undefined when slot is missing', async () => { + await using context = await openContext(); + const { pool } = context; + + const adapter = new PostgresRouteAPIAdapter(pool); + const budget = await adapter.getSlotWalBudget({ + slotName: 'nonexistent_slot' + }); + + expect(budget).toBeUndefined(); + }); +} diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 94f1d3f35..22429269a 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -265,8 +265,10 @@ export async function withMaxWalSize(db: pgwire.PgClient, size: string) { try { const r1 = await db.query(`SHOW max_slot_wal_keep_size`); - await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '100MB'`); + await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '${size}'`); await db.query(`SELECT pg_reload_conf()`); + // Wait for the config reload to propagate to all backends + await new Promise((resolve) => setTimeout(resolve, 100)); const oldSize = r1.results[0].rows[0].decodeWithoutCustomTypes(0); @@ -274,6 +276,8 @@ export async function withMaxWalSize(db: pgwire.PgClient, size: string) { [Symbol.asyncDispose]: async () => { await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '${oldSize}'`); await db.query(`SELECT pg_reload_conf()`); + // Wait for the config reload to propagate to all backends + await new Promise((resolve) => setTimeout(resolve, 100)); } }; } catch (e) { diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index d98b7747c..f40a67a44 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -14,6 +14,16 @@ export interface ReplicationLagOptions { bucketStorage: SyncRulesBucketStorage; } +export interface SlotWalBudgetInfo { + wal_status: string; + safe_wal_size?: number; + max_slot_wal_keep_size?: number; +} + +export interface SlotWalBudgetOptions { + slotName: string; +} + /** * Describes all the methods currently required to service the sync API endpoints. */ @@ -49,6 +59,13 @@ export interface RouteAPI { */ getReplicationLagBytes(options: ReplicationLagOptions): Promise; + /** + * @returns WAL budget information for the replication slot, or undefined + * if the slot doesn't exist or the source doesn't support this. + * Only implemented by the Postgres adapter. + */ + getSlotWalBudget?(options: SlotWalBudgetOptions): Promise; + /** * Get the current LSN or equivalent replication HEAD position identifier. * diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 4100ec018..dbf09bfa3 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -4,7 +4,7 @@ import { ReplicationError, SyncRulesStatus, TableInfo } from '@powersync/service import * as storage from '../storage/storage-index.js'; import { syncConfigYamlErrorToReplicationError } from '../util/errors.js'; -import { RouteAPI } from './RouteAPI.js'; +import { RouteAPI, SlotWalBudgetInfo } from './RouteAPI.js'; export interface DiagnosticsOptions { /** @@ -63,6 +63,7 @@ export async function getSyncRulesStatus( const systemStorage = live_status ? bucketStorage.getInstance(sync_rules) : undefined; const status = await systemStorage?.getStatus(); let replication_lag_bytes: number | undefined = undefined; + let slot_wal_budget: SlotWalBudgetInfo | undefined = undefined; let tables_flat: TableInfo[] = []; @@ -88,6 +89,16 @@ export async function getSyncRulesStatus( // Ignore logger.warn(`Unable to get replication lag`, e); } + + if (apiHandler.getSlotWalBudget) { + try { + slot_wal_budget = await apiHandler.getSlotWalBudget({ + slotName: sync_rules.slot_name + }); + } catch (e) { + logger.warn(`Unable to get WAL budget`, e); + } + } } } else { const source_table_patterns = rules.getSourceTables(); @@ -182,6 +193,9 @@ export async function getSyncRulesStatus( last_checkpoint_ts: sync_rules.last_checkpoint_ts?.toISOString(), last_keepalive_ts: sync_rules.last_keepalive_ts?.toISOString(), replication_lag_bytes: replication_lag_bytes, + wal_status: slot_wal_budget?.wal_status, + safe_wal_size: slot_wal_budget?.safe_wal_size, + max_slot_wal_keep_size: slot_wal_budget?.max_slot_wal_keep_size, tables: tables_flat } ], diff --git a/packages/types/src/definitions.ts b/packages/types/src/definitions.ts index f02eae960..f65a977bd 100644 --- a/packages/types/src/definitions.ts +++ b/packages/types/src/definitions.ts @@ -73,6 +73,25 @@ export const SyncRulesStatus = t.object({ /** Replication lag in bytes. undefined if we cannot calculate this. */ replication_lag_bytes: t.number.optional(), + /** + * Replication slot WAL status (PG 13+). + * Values: 'reserved', 'extended', 'unreserved', 'lost'. + * Undefined for non-Postgres sources or PG < 13. + */ + wal_status: t.string.optional(), + + /** + * WAL budget remaining in bytes before potential slot invalidation + * (PG 13+, only when max_slot_wal_keep_size is set). + */ + safe_wal_size: t.number.optional(), + + /** + * Configured max_slot_wal_keep_size in bytes (PG 13+). + * Undefined when unlimited (-1) or not available. + */ + max_slot_wal_keep_size: t.number.optional(), + tables: t.array(TableInfo) }) ), From 8e3d1a68f813b7eeb0f813e2ec5cd75bf8ff64bc Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 11 Mar 2026 03:55:01 -0600 Subject: [PATCH 08/12] fix(module-postgres): tailor slot-invalidation guidance to the invalidation reason idle_timeout suggests increasing idle_replication_slot_timeout; other reasons still suggest max_slot_wal_keep_size. --- modules/module-postgres/src/replication/WalStream.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index c4c7acaed..56a47dc7c 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -470,10 +470,14 @@ export class WalStream { const lost = slot.wal_status == 'lost'; if (lost) { // Case 1 / 4 + const fixGuidance = + slot.invalidation_reason === 'idle_timeout' + ? `Increase idle_replication_slot_timeout on the source database.` + : `Increase max_slot_wal_keep_size on the source database.`; throw new MissingReplicationSlotError( `[PSYNC_S1146] Replication slot ${slotName} was invalidated ` + `(reason: ${slot.invalidation_reason ?? 'unknown'}). ` + - `Increase max_slot_wal_keep_size on the source database. ` + + `${fixGuidance} ` + `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated` ); } From 8c0e9a69b94a1e17efafdda4f4aae1a1368cb92c Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 11 Mar 2026 04:11:41 -0600 Subject: [PATCH 09/12] fix(changeset): add missing service-errors package and broaden summary --- .changeset/bright-walls-dance.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.changeset/bright-walls-dance.md b/.changeset/bright-walls-dance.md index 2f822fe7b..89cd33122 100644 --- a/.changeset/bright-walls-dance.md +++ b/.changeset/bright-walls-dance.md @@ -2,6 +2,7 @@ '@powersync/service-types': patch '@powersync/service-core': patch '@powersync/service-module-postgres': patch +'@powersync/service-errors': patch --- -Surface WAL budget fields (wal_status, safe_wal_size, max_slot_wal_keep_size) in the diagnostics API. +Detect WAL slot invalidation mid-snapshot, warn on WAL budget depletion, block futile retries, and surface WAL budget in the diagnostics API. From bf18b0157dd16fc1e18e3ff7686ba715853dbf6f Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Fri, 3 Apr 2026 01:33:26 -0600 Subject: [PATCH 10/12] refactor(module-postgres): make MissingReplicationSlotError fields required and extract to separate files Make walStatus and phase required constructor parameters, add invalidationReason field. Remove SlotInvalidationContext interface and pass error directly to shouldRetryReplication(). Move error class to MissingReplicationSlotError.ts and utility functions to wal-budget-utils.ts. --- .../MissingReplicationSlotError.ts | 52 +++++++ .../replication/PostgresErrorRateLimiter.ts | 2 +- .../src/replication/WalStream.ts | 136 ++---------------- .../replication/WalStreamReplicationJob.ts | 11 +- .../src/replication/replication-index.ts | 2 + .../src/replication/wal-budget-utils.ts | 83 +++++++++++ .../test/src/replication_retry.test.ts | 51 ++++--- .../test/src/wal_budget.test.ts | 4 +- .../test/src/wal_stream.test.ts | 2 +- 9 files changed, 179 insertions(+), 164 deletions(-) create mode 100644 modules/module-postgres/src/replication/MissingReplicationSlotError.ts create mode 100644 modules/module-postgres/src/replication/wal-budget-utils.ts diff --git a/modules/module-postgres/src/replication/MissingReplicationSlotError.ts b/modules/module-postgres/src/replication/MissingReplicationSlotError.ts new file mode 100644 index 000000000..3dc5b14ba --- /dev/null +++ b/modules/module-postgres/src/replication/MissingReplicationSlotError.ts @@ -0,0 +1,52 @@ +/** + * Replication slot WAL status from `pg_replication_slots.wal_status` (PG 13+). + * - `reserved`: WAL needed by the slot is within `max_wal_size` + * - `extended`: WAL needed exceeds `max_wal_size` but is protected by `max_slot_wal_keep_size` + * - `unreserved`: WAL may be removed at next checkpoint — slot is at risk + * - `lost`: WAL has been removed — slot is invalidated and unusable + * - `missing`: synthetic value — the slot row is absent from `pg_replication_slots` + */ +export type WalStatus = 'reserved' | 'extended' | 'unreserved' | 'lost' | 'missing'; + +export class MissingReplicationSlotError extends Error { + /** Slot WAL status at the time the error was detected. */ + walStatus: WalStatus; + /** Replication phase when the error occurred — controls retry behavior. */ + phase: 'snapshot' | 'streaming'; + /** PG 14+ invalidation reason from `pg_replication_slots.invalidation_reason`. */ + invalidationReason?: string; + + constructor( + message: string, + options: { + walStatus: WalStatus; + phase: 'snapshot' | 'streaming'; + invalidationReason?: string; + cause?: any; + } + ) { + super(message); + this.cause = options.cause; + this.walStatus = options.walStatus; + this.phase = options.phase; + this.invalidationReason = options.invalidationReason; + } +} + +/** + * Determines whether replication should be retried after a slot invalidation. + * + * Returns false when retry would be futile (e.g. slot lost during snapshot + * due to WAL budget exhaustion — retrying would repeat the same long snapshot + * and likely fail again). + * + * Blocks retry when walStatus is 'lost' during snapshot phase (unless the + * invalidation reason is 'rows_removed', which is not a WAL budget issue). + * Allows retry in all other cases. + */ +export function shouldRetryReplication(error: MissingReplicationSlotError): boolean { + if (error.walStatus === 'lost' && error.phase === 'snapshot' && error.invalidationReason !== 'rows_removed') { + return false; + } + return true; +} diff --git a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts index 60173a3a9..962b26528 100644 --- a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts +++ b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts @@ -1,6 +1,6 @@ import { ErrorRateLimiter } from '@powersync/service-core'; import { setTimeout } from 'timers/promises'; -import { MissingReplicationSlotError } from './WalStream.js'; +import { MissingReplicationSlotError } from './MissingReplicationSlotError.js'; export class PostgresErrorRateLimiter implements ErrorRateLimiter { nextAllowed: number = Date.now(); diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 56a47dc7c..dc276b104 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -34,6 +34,7 @@ import { import { ReplicationMetric } from '@powersync/service-types'; import { PostgresTypeResolver } from '../types/resolver.js'; +import { MissingReplicationSlotError } from './MissingReplicationSlotError.js'; import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js'; import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js'; @@ -45,6 +46,7 @@ import { SimpleSnapshotQuery, SnapshotQuery } from './SnapshotQuery.js'; +import { computeWalBudgetReport, formatBytes, formatDuration, formatWalBudgetLine } from './wal-budget-utils.js'; export interface WalStreamOptions { logger?: Logger; @@ -113,128 +115,6 @@ export const sendKeepAlive = async (db: pgwire.PgClient) => { await lib_postgres.retriedQuery(db, KEEPALIVE_STATEMENT); }; -export class MissingReplicationSlotError extends Error { - walStatus?: string; - phase?: 'snapshot' | 'streaming'; - - constructor(message: string, options?: { cause?: any; walStatus?: string; phase?: 'snapshot' | 'streaming' }) { - super(message); - if (options) { - this.cause = options.cause; - this.walStatus = options.walStatus; - this.phase = options.phase; - } - } -} - -export interface SlotInvalidationContext { - walStatus: string; // 'lost' | 'reserved' | 'extended' | 'missing' | ... - phase: 'snapshot' | 'streaming'; - invalidationReason?: string; // PG 14+: 'wal_removed', 'rows_removed', etc. -} - -/** - * Determines whether replication should be retried after a slot invalidation. - * - * Returns false when retry would be futile (e.g. slot lost during snapshot - * due to WAL budget exhaustion — retrying would repeat the same long snapshot - * and likely fail again). - * - * Blocks retry when walStatus is 'lost' during snapshot phase (unless the - * invalidation reason is 'rows_removed', which is not a WAL budget issue). - * Allows retry in all other cases. - */ -export function shouldRetryReplication(context: SlotInvalidationContext): boolean { - if (context.walStatus === 'lost' && context.phase === 'snapshot' && context.invalidationReason !== 'rows_removed') { - return false; - } - return true; -} - -export function formatBytes(bytes: number): string { - if (bytes >= 1024 * 1024 * 1024) { - return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)}GB`; - } else if (bytes >= 1024 * 1024) { - return `${(bytes / (1024 * 1024)).toFixed(1)}MB`; - } else if (bytes >= 1024) { - return `${(bytes / 1024).toFixed(1)}KB`; - } - return `${bytes}B`; -} - -export function formatDuration(hours: number): string { - if (hours >= 24) { - return `${(hours / 24).toFixed(1)} days`; - } else if (hours >= 1) { - return `${hours.toFixed(1)} hours`; - } - return `${Math.round(hours * 60)} minutes`; -} - -export interface WalBudgetSample { - safeWalSize: number; - timestamp: number; -} - -export interface WalBudgetReport { - budgetRemainingPct: number; - safeWalSize: number; - maxSize: number; - walStatus: string; - ratePerHour: number | null; - etaHours: number | null; - isWarning: boolean; -} - -export function computeWalBudgetReport(opts: { - safeWalSize: number; - maxSize: number; - walStatus: string; - prevSample: WalBudgetSample | null; - now: number; -}): WalBudgetReport { - const budgetRemainingPct = Math.round((opts.safeWalSize / opts.maxSize) * 100); - - let ratePerHour: number | null = null; - let etaHours: number | null = null; - - if (opts.prevSample != null) { - const elapsedMs = opts.now - opts.prevSample.timestamp; - const consumed = opts.prevSample.safeWalSize - opts.safeWalSize; - if (elapsedMs > 0 && consumed > 0) { - ratePerHour = (consumed / elapsedMs) * 3_600_000; - const eta = opts.safeWalSize / ratePerHour; - etaHours = eta < 48 ? eta : null; - } - } - - return { - budgetRemainingPct, - safeWalSize: opts.safeWalSize, - maxSize: opts.maxSize, - walStatus: opts.walStatus, - ratePerHour, - etaHours, - isWarning: budgetRemainingPct <= 50 - }; -} - -export function formatWalBudgetLine(report: WalBudgetReport): string { - let line = - `WAL budget: ${formatBytes(report.safeWalSize)} remaining of ` + - `${formatBytes(report.maxSize)} limit (${report.budgetRemainingPct}% remaining).`; - - if (report.ratePerHour != null) { - line += ` WAL consumption: ~${formatBytes(report.ratePerHour)}/hr.`; - if (report.etaHours != null) { - line += ` ETA to exhaustion: ~${formatDuration(report.etaHours)}.`; - } - } - - line += ` Slot status: ${report.walStatus}.`; - return line; -} - export class WalStream { sync_rules: HydratedSyncRules; group_id: number; @@ -478,7 +358,8 @@ export class WalStream { `[PSYNC_S1146] Replication slot ${slotName} was invalidated ` + `(reason: ${slot.invalidation_reason ?? 'unknown'}). ` + `${fixGuidance} ` + - `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated` + `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated`, + { walStatus: 'lost', phase: 'streaming', invalidationReason: slot.invalidation_reason ?? undefined } ); } // Case 3 / 6 @@ -490,7 +371,10 @@ export class WalStream { if (snapshotDone) { // Case 5 // This will create a new slot, while keeping the current sync rules active - throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); + throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`, { + walStatus: 'missing', + phase: 'streaming' + }); } // Case 2 // This will clear data (if any) and re-create the same slot @@ -612,7 +496,7 @@ export class WalStream { `${this.formatWalBudgetContext()}. ` + `Increase max_slot_wal_keep_size on the source database. ` + `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated`, - { walStatus: 'lost', phase: 'snapshot' } + { walStatus: 'lost', phase: 'snapshot', invalidationReason: rows[0].invalidation_reason ?? undefined } ); } @@ -1153,7 +1037,7 @@ WHERE oid = $1::regclass`, await this.streamChangesInternal(replicationConnection); } catch (e) { if (isReplicationSlotInvalidError(e)) { - throw new MissingReplicationSlotError(e.message, { cause: e }); + throw new MissingReplicationSlotError(e.message, { walStatus: 'lost', phase: 'streaming', cause: e }); } throw e; } diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index 75cb9a3ff..d865697c7 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -1,6 +1,7 @@ import { container, logger } from '@powersync/lib-services-framework'; +import { MissingReplicationSlotError, shouldRetryReplication } from './MissingReplicationSlotError.js'; import { PgManager } from './PgManager.js'; -import { MissingReplicationSlotError, sendKeepAlive, shouldRetryReplication, WalStream } from './WalStream.js'; +import { sendKeepAlive, WalStream } from './WalStream.js'; import { replication } from '@powersync/service-core'; import { getApplicationName } from '../utils/application-name.js'; @@ -91,13 +92,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob } if (e instanceof MissingReplicationSlotError) { - if ( - shouldRetryReplication({ - walStatus: e.walStatus ?? 'missing', - phase: e.phase ?? 'streaming', - invalidationReason: (e as any).invalidationReason - }) - ) { + if (shouldRetryReplication(e)) { // This stops replication on this slot and restarts with a new slot await this.options.storage.factory.restartReplication(this.storage.group_id); } diff --git a/modules/module-postgres/src/replication/replication-index.ts b/modules/module-postgres/src/replication/replication-index.ts index 1bf207453..4cc41b63d 100644 --- a/modules/module-postgres/src/replication/replication-index.ts +++ b/modules/module-postgres/src/replication/replication-index.ts @@ -1,5 +1,7 @@ +export * from './MissingReplicationSlotError.js'; export * from './PgRelation.js'; export * from './replication-utils.js'; +export * from './wal-budget-utils.js'; export * from './WalStream.js'; export * from './WalStreamReplicationJob.js'; export * from './WalStreamReplicator.js'; diff --git a/modules/module-postgres/src/replication/wal-budget-utils.ts b/modules/module-postgres/src/replication/wal-budget-utils.ts new file mode 100644 index 000000000..3637824f7 --- /dev/null +++ b/modules/module-postgres/src/replication/wal-budget-utils.ts @@ -0,0 +1,83 @@ +export function formatBytes(bytes: number): string { + if (bytes >= 1024 * 1024 * 1024) { + return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)}GB`; + } else if (bytes >= 1024 * 1024) { + return `${(bytes / (1024 * 1024)).toFixed(1)}MB`; + } else if (bytes >= 1024) { + return `${(bytes / 1024).toFixed(1)}KB`; + } + return `${bytes}B`; +} + +export function formatDuration(hours: number): string { + if (hours >= 24) { + return `${(hours / 24).toFixed(1)} days`; + } else if (hours >= 1) { + return `${hours.toFixed(1)} hours`; + } + return `${Math.round(hours * 60)} minutes`; +} + +export interface WalBudgetSample { + safeWalSize: number; + timestamp: number; +} + +export interface WalBudgetReport { + budgetRemainingPct: number; + safeWalSize: number; + maxSize: number; + walStatus: string; + ratePerHour: number | null; + etaHours: number | null; + isWarning: boolean; +} + +export function computeWalBudgetReport(opts: { + safeWalSize: number; + maxSize: number; + walStatus: string; + prevSample: WalBudgetSample | null; + now: number; +}): WalBudgetReport { + const budgetRemainingPct = Math.round((opts.safeWalSize / opts.maxSize) * 100); + + let ratePerHour: number | null = null; + let etaHours: number | null = null; + + if (opts.prevSample != null) { + const elapsedMs = opts.now - opts.prevSample.timestamp; + const consumed = opts.prevSample.safeWalSize - opts.safeWalSize; + if (elapsedMs > 0 && consumed > 0) { + ratePerHour = (consumed / elapsedMs) * 3_600_000; + const eta = opts.safeWalSize / ratePerHour; + etaHours = eta < 48 ? eta : null; + } + } + + return { + budgetRemainingPct, + safeWalSize: opts.safeWalSize, + maxSize: opts.maxSize, + walStatus: opts.walStatus, + ratePerHour, + etaHours, + isWarning: budgetRemainingPct <= 50 + }; +} + +export function formatWalBudgetLine(report: WalBudgetReport): string { + let line = + `WAL budget: ${formatBytes(report.safeWalSize)} remaining of ` + + `${formatBytes(report.maxSize)} limit (${report.budgetRemainingPct}% remaining).`; + + if (report.ratePerHour != null) { + line += ` WAL consumption: ~${formatBytes(report.ratePerHour)}/hr.`; + if (report.etaHours != null) { + line += ` ETA to exhaustion: ~${formatDuration(report.etaHours)}.`; + } + } + + line += ` Slot status: ${report.walStatus}.`; + return line; +} diff --git a/modules/module-postgres/test/src/replication_retry.test.ts b/modules/module-postgres/test/src/replication_retry.test.ts index fcb2e59d5..a1d191eb0 100644 --- a/modules/module-postgres/test/src/replication_retry.test.ts +++ b/modules/module-postgres/test/src/replication_retry.test.ts @@ -1,41 +1,40 @@ -import { shouldRetryReplication } from '@module/replication/WalStream.js'; +import { + MissingReplicationSlotError, + shouldRetryReplication +} from '@module/replication/MissingReplicationSlotError.js'; import { describe, expect, test } from 'vitest'; describe('shouldRetryReplication', () => { test('blocks retry when slot lost during snapshot', () => { - expect( - shouldRetryReplication({ - walStatus: 'lost', - phase: 'snapshot' - }) - ).toBe(false); + const error = new MissingReplicationSlotError('test', { + walStatus: 'lost', + phase: 'snapshot' + }); + expect(shouldRetryReplication(error)).toBe(false); }); test('allows retry when slot lost during streaming', () => { - expect( - shouldRetryReplication({ - walStatus: 'lost', - phase: 'streaming' - }) - ).toBe(true); + const error = new MissingReplicationSlotError('test', { + walStatus: 'lost', + phase: 'streaming' + }); + expect(shouldRetryReplication(error)).toBe(true); }); test('allows retry when slot is missing', () => { - expect( - shouldRetryReplication({ - walStatus: 'missing', - phase: 'snapshot' - }) - ).toBe(true); + const error = new MissingReplicationSlotError('test', { + walStatus: 'missing', + phase: 'snapshot' + }); + expect(shouldRetryReplication(error)).toBe(true); }); test('allows retry when invalidation reason is rows_removed', () => { - expect( - shouldRetryReplication({ - walStatus: 'lost', - phase: 'snapshot', - invalidationReason: 'rows_removed' - }) - ).toBe(true); + const error = new MissingReplicationSlotError('test', { + walStatus: 'lost', + phase: 'snapshot', + invalidationReason: 'rows_removed' + }); + expect(shouldRetryReplication(error)).toBe(true); }); }); diff --git a/modules/module-postgres/test/src/wal_budget.test.ts b/modules/module-postgres/test/src/wal_budget.test.ts index b9971cba6..b4c52235a 100644 --- a/modules/module-postgres/test/src/wal_budget.test.ts +++ b/modules/module-postgres/test/src/wal_budget.test.ts @@ -1,9 +1,9 @@ import { + computeWalBudgetReport, formatBytes, formatDuration, - computeWalBudgetReport, formatWalBudgetLine -} from '@module/replication/WalStream.js'; +} from '@module/replication/wal-budget-utils.js'; import { describe, expect, test } from 'vitest'; describe('formatBytes', () => { diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 790d04aa5..438d1069b 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -1,4 +1,4 @@ -import { MissingReplicationSlotError } from '@module/replication/WalStream.js'; +import { MissingReplicationSlotError } from '@module/replication/MissingReplicationSlotError.js'; import { METRICS_HELPER, putOp, removeOp } from '@powersync/service-core-tests'; import { pgwireRows } from '@powersync/service-jpgwire'; import { JSONBig } from '@powersync/service-jsonbig'; From 48f42d67da3dfb518627fd258dc1acf2f4431f32 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Fri, 3 Apr 2026 01:39:30 -0600 Subject: [PATCH 11/12] refactor(module-postgres): throttle slot health check, remove docs URLs, fix budget context Throttle checkSlotHealth() call in snapshotTable() instead of only throttling the budget log inside it. Remove docs URLs from error messages (codebase pattern is error code only). Simplify formatWalBudgetContext() to show only the WAL limit, removing misleading elapsed-time field. --- .../src/replication/WalStream.ts | 53 +++++++------------ .../test/src/wal_stream.test.ts | 3 +- 2 files changed, 20 insertions(+), 36 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index dc276b104..2a0172092 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -46,7 +46,7 @@ import { SimpleSnapshotQuery, SnapshotQuery } from './SnapshotQuery.js'; -import { computeWalBudgetReport, formatBytes, formatDuration, formatWalBudgetLine } from './wal-budget-utils.js'; +import { computeWalBudgetReport, formatBytes, formatWalBudgetLine } from './wal-budget-utils.js'; export interface WalStreamOptions { logger?: Logger; @@ -72,11 +72,11 @@ export interface WalStreamOptions { onSnapshotChunkFlushed?: () => Promise; /** - * Interval for WAL budget log messages during snapshot, in milliseconds. - * Set to 0 for every-chunk logging (useful for debugging). + * Interval for slot health checks during snapshot, in milliseconds. + * Set to 0 for every-chunk checking (useful for testing). * Defaults to 120_000 (2 minutes). */ - walBudgetLogIntervalMs?: number; + slotHealthCheckIntervalMs?: number; } interface InitResult { @@ -147,7 +147,7 @@ export class WalStream { private initialSnapshotPromise: Promise | null = null; - private walBudgetLogIntervalMs: number; + private slotHealthCheckIntervalMs: number; /** Cached max_slot_wal_keep_size in bytes. null = not yet queried. */ private maxSlotWalKeepSize: number | null = null; @@ -157,8 +157,8 @@ export class WalStream { /** Previous safe_wal_size sample for rate calculation. */ private prevWalBudgetSample: { safeWalSize: number; timestamp: number } | null = null; - /** Timestamp of last WAL budget log message. */ - private lastWalBudgetLogTime = 0; + /** Timestamp of last slot health check. */ + private lastSlotHealthCheckTime = 0; constructor(options: WalStreamOptions) { this.logger = options.logger ?? defaultLogger; @@ -170,7 +170,7 @@ export class WalStream { this.connections = options.connections; this.snapshotChunkLength = options.snapshotChunkLength ?? 10_000; this.onSnapshotChunkFlushed = options.onSnapshotChunkFlushed; - this.walBudgetLogIntervalMs = options.walBudgetLogIntervalMs ?? 120_000; + this.slotHealthCheckIntervalMs = options.slotHealthCheckIntervalMs ?? 120_000; this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -357,8 +357,7 @@ export class WalStream { throw new MissingReplicationSlotError( `[PSYNC_S1146] Replication slot ${slotName} was invalidated ` + `(reason: ${slot.invalidation_reason ?? 'unknown'}). ` + - `${fixGuidance} ` + - `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated`, + `${fixGuidance}`, { walStatus: 'lost', phase: 'streaming', invalidationReason: slot.invalidation_reason ?? undefined } ); } @@ -478,8 +477,7 @@ export class WalStream { // Slot row gone from pg_replication_slots — dropped externally. // Possible causes: pg_drop_replication_slot() call (operator, management tool, cleanup cron) throw new MissingReplicationSlotError( - `[PSYNC_S1146] Replication slot ${this.slot_name} disappeared during snapshot. ` + - `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated`, + `[PSYNC_S1146] Replication slot ${this.slot_name} disappeared during snapshot.`, { walStatus: 'missing', phase: 'snapshot' } ); } @@ -494,38 +492,19 @@ export class WalStream { throw new MissingReplicationSlotError( `[PSYNC_S1146] Replication slot ${this.slot_name} was invalidated during snapshot` + `${this.formatWalBudgetContext()}. ` + - `Increase max_slot_wal_keep_size on the source database. ` + - `https://docs.powersync.com/self-hosting/troubleshooting/replication-slot-invalidated`, + `Increase max_slot_wal_keep_size on the source database.`, { walStatus: 'lost', phase: 'snapshot', invalidationReason: rows[0].invalidation_reason ?? undefined } ); } - // WAL budget reporting (time-throttled) - const now = performance.now(); - if (now - this.lastWalBudgetLogTime >= this.walBudgetLogIntervalMs) { - this.lastWalBudgetLogTime = now; - await this.logWalBudget(rows[0], now); - } + await this.logWalBudget(rows[0], performance.now()); } private formatWalBudgetContext(): string { if (this.maxSlotWalKeepSize == null) { return ''; } - const parts: string[] = []; - parts.push(` (limit: ${formatBytes(this.maxSlotWalKeepSize)})`); - - if (this.prevWalBudgetSample != null) { - const elapsed = performance.now() - this.prevWalBudgetSample.timestamp; - if (elapsed > 0 && this.prevWalBudgetSample.safeWalSize > 0) { - const elapsedHours = elapsed / 3_600_000; - if (elapsedHours >= 0.1) { - parts.push(`, exhausted in ~${formatDuration(elapsedHours)}`); - } - } - } - - return parts.join(''); + return ` (limit: ${formatBytes(this.maxSlotWalKeepSize)})`; } async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { @@ -781,7 +760,11 @@ WHERE oid = $1::regclass`, if (this.onSnapshotChunkFlushed) { await this.onSnapshotChunkFlushed(); } - await this.checkSlotHealth(); + const now = performance.now(); + if (now - this.lastSlotHealthCheckTime >= this.slotHealthCheckIntervalMs) { + this.lastSlotHealthCheckTime = now; + await this.checkSlotHealth(); + } if (limited == null) { let lastKey: Uint8Array | undefined; if (q instanceof ChunkedSnapshotQuery) { diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 438d1069b..ff4a3b140 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -485,6 +485,7 @@ bucket_definitions: await using context = await openContext({ walStreamOptions: { snapshotChunkLength: 100, + slotHealthCheckIntervalMs: 0, onSnapshotChunkFlushed: async () => { // Generate ~16MB WAL per call. Slot lost after ~7 chunks. await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); @@ -531,6 +532,7 @@ bucket_definitions: await using context = await openContext({ walStreamOptions: { snapshotChunkLength: 100, + slotHealthCheckIntervalMs: 0, onSnapshotChunkFlushed: async () => { await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); await walPool.query(`SELECT pg_switch_wal()`); @@ -561,7 +563,6 @@ bucket_definitions: expect(caughtError.walStatus).toBe('lost'); expect(caughtError.phase).toBe('snapshot'); expect(caughtError.message).toContain('PSYNC_S1146'); - expect(caughtError.message).toContain('docs.powersync.com'); expect(caughtError.message).toContain('limit:'); }); From aed41f0e667d4bb442ad57e1f2676a55df482650 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Fri, 3 Apr 2026 15:19:17 -0600 Subject: [PATCH 12/12] refactor(module-postgres): replace string literals with WalStatus and ReplicationPhase union types Extract ReplicationPhase type alias with JSDoc explaining retry semantics for each phase. Narrows the phase field from bare string literals to the named type for better discoverability and type safety. --- .../src/replication/MissingReplicationSlotError.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/modules/module-postgres/src/replication/MissingReplicationSlotError.ts b/modules/module-postgres/src/replication/MissingReplicationSlotError.ts index 3dc5b14ba..118b75e99 100644 --- a/modules/module-postgres/src/replication/MissingReplicationSlotError.ts +++ b/modules/module-postgres/src/replication/MissingReplicationSlotError.ts @@ -8,11 +8,18 @@ */ export type WalStatus = 'reserved' | 'extended' | 'unreserved' | 'lost' | 'missing'; +/** + * Replication phase when the error was detected. + * - `snapshot`: during an active full-table scan (snapshotTable) — long-running, retry may be futile + * - `streaming`: during WAL streaming, at startup, or between replication cycles — retry is safe + */ +export type ReplicationPhase = 'snapshot' | 'streaming'; + export class MissingReplicationSlotError extends Error { /** Slot WAL status at the time the error was detected. */ walStatus: WalStatus; /** Replication phase when the error occurred — controls retry behavior. */ - phase: 'snapshot' | 'streaming'; + phase: ReplicationPhase; /** PG 14+ invalidation reason from `pg_replication_slots.invalidation_reason`. */ invalidationReason?: string; @@ -20,7 +27,7 @@ export class MissingReplicationSlotError extends Error { message: string, options: { walStatus: WalStatus; - phase: 'snapshot' | 'streaming'; + phase: ReplicationPhase; invalidationReason?: string; cause?: any; }