diff --git a/.changeset/bright-walls-dance.md b/.changeset/bright-walls-dance.md new file mode 100644 index 000000000..89cd33122 --- /dev/null +++ b/.changeset/bright-walls-dance.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-types': patch +'@powersync/service-core': patch +'@powersync/service-module-postgres': patch +'@powersync/service-errors': patch +--- + +Detect WAL slot invalidation mid-snapshot, warn on WAL budget depletion, block futile retries, and surface WAL budget in the diagnostics API. diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index 353a8ec74..a1b76bcbb 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -184,6 +184,56 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`, }); } + async getSlotWalBudget(options: api.SlotWalBudgetOptions): Promise { + const { slotName } = options; + + // Query slot status + const slotResult = await lib_postgres.retriedQuery(this.pool, { + statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', + params: [{ type: 'varchar', value: slotName }] + }); + const [slot] = pgwire.pgwireRows(slotResult); + if (!slot) { + return undefined; + } + + const walStatus = slot.wal_status; + if (walStatus == null) { + // PG < 13 — wal_status column doesn't exist + return undefined; + } + + const result: api.SlotWalBudgetInfo = { + wal_status: String(walStatus) + }; + + // Query max_slot_wal_keep_size + try { + const settingsResult = await lib_postgres.retriedQuery(this.pool, { + statement: `SELECT setting, unit FROM pg_settings WHERE name = 'max_slot_wal_keep_size'` + }); + const [row] = pgwire.pgwireRows(settingsResult); + if (row) { + const setting = Number(row.setting); + if (setting >= 0) { + const unit = String(row.unit ?? 'MB'); + const multiplier = unit === 'kB' ? 1024 : unit === '8kB' ? 8192 : 1024 * 1024; // default MB + result.max_slot_wal_keep_size = setting * multiplier; + } + // setting < 0 means unlimited — leave undefined + } + } catch (e) { + // Best-effort — budget info is informational + } + + // safe_wal_size (PG 13+, only populated when max_slot_wal_keep_size is set) + if (slot.safe_wal_size != null) { + result.safe_wal_size = Number(slot.safe_wal_size); + } + + return result; + } + async getReplicationHead(): Promise { // On most Postgres versions, pg_logical_emit_message() returns the correct LSN. // However, on Aurora (Postgres compatible), it can return an entirely different LSN, diff --git a/modules/module-postgres/src/replication/MissingReplicationSlotError.ts b/modules/module-postgres/src/replication/MissingReplicationSlotError.ts new file mode 100644 index 000000000..118b75e99 --- /dev/null +++ b/modules/module-postgres/src/replication/MissingReplicationSlotError.ts @@ -0,0 +1,59 @@ +/** + * Replication slot WAL status from `pg_replication_slots.wal_status` (PG 13+). + * - `reserved`: WAL needed by the slot is within `max_wal_size` + * - `extended`: WAL needed exceeds `max_wal_size` but is protected by `max_slot_wal_keep_size` + * - `unreserved`: WAL may be removed at next checkpoint — slot is at risk + * - `lost`: WAL has been removed — slot is invalidated and unusable + * - `missing`: synthetic value — the slot row is absent from `pg_replication_slots` + */ +export type WalStatus = 'reserved' | 'extended' | 'unreserved' | 'lost' | 'missing'; + +/** + * Replication phase when the error was detected. + * - `snapshot`: during an active full-table scan (snapshotTable) — long-running, retry may be futile + * - `streaming`: during WAL streaming, at startup, or between replication cycles — retry is safe + */ +export type ReplicationPhase = 'snapshot' | 'streaming'; + +export class MissingReplicationSlotError extends Error { + /** Slot WAL status at the time the error was detected. */ + walStatus: WalStatus; + /** Replication phase when the error occurred — controls retry behavior. */ + phase: ReplicationPhase; + /** PG 14+ invalidation reason from `pg_replication_slots.invalidation_reason`. */ + invalidationReason?: string; + + constructor( + message: string, + options: { + walStatus: WalStatus; + phase: ReplicationPhase; + invalidationReason?: string; + cause?: any; + } + ) { + super(message); + this.cause = options.cause; + this.walStatus = options.walStatus; + this.phase = options.phase; + this.invalidationReason = options.invalidationReason; + } +} + +/** + * Determines whether replication should be retried after a slot invalidation. + * + * Returns false when retry would be futile (e.g. slot lost during snapshot + * due to WAL budget exhaustion — retrying would repeat the same long snapshot + * and likely fail again). + * + * Blocks retry when walStatus is 'lost' during snapshot phase (unless the + * invalidation reason is 'rows_removed', which is not a WAL budget issue). + * Allows retry in all other cases. + */ +export function shouldRetryReplication(error: MissingReplicationSlotError): boolean { + if (error.walStatus === 'lost' && error.phase === 'snapshot' && error.invalidationReason !== 'rows_removed') { + return false; + } + return true; +} diff --git a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts index 60173a3a9..962b26528 100644 --- a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts +++ b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts @@ -1,6 +1,6 @@ import { ErrorRateLimiter } from '@powersync/service-core'; import { setTimeout } from 'timers/promises'; -import { MissingReplicationSlotError } from './WalStream.js'; +import { MissingReplicationSlotError } from './MissingReplicationSlotError.js'; export class PostgresErrorRateLimiter implements ErrorRateLimiter { nextAllowed: number = Date.now(); diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 094d37eab..2a0172092 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -34,6 +34,7 @@ import { import { ReplicationMetric } from '@powersync/service-types'; import { PostgresTypeResolver } from '../types/resolver.js'; +import { MissingReplicationSlotError } from './MissingReplicationSlotError.js'; import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js'; import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js'; @@ -45,6 +46,7 @@ import { SimpleSnapshotQuery, SnapshotQuery } from './SnapshotQuery.js'; +import { computeWalBudgetReport, formatBytes, formatWalBudgetLine } from './wal-budget-utils.js'; export interface WalStreamOptions { logger?: Logger; @@ -61,6 +63,20 @@ export interface WalStreamOptions { * Note that queries are streamed, so we don't actually keep that much data in memory. */ snapshotChunkLength?: number; + + /** + * Called after each snapshot chunk is flushed, for testing. + * This allows tests to perform actions (like generating WAL) synchronously + * with the snapshot's chunk processing. + */ + onSnapshotChunkFlushed?: () => Promise; + + /** + * Interval for slot health checks during snapshot, in milliseconds. + * Set to 0 for every-chunk checking (useful for testing). + * Defaults to 120_000 (2 minutes). + */ + slotHealthCheckIntervalMs?: number; } interface InitResult { @@ -99,14 +115,6 @@ export const sendKeepAlive = async (db: pgwire.PgClient) => { await lib_postgres.retriedQuery(db, KEEPALIVE_STATEMENT); }; -export class MissingReplicationSlotError extends Error { - constructor(message: string, cause?: any) { - super(message); - - this.cause = cause; - } -} - export class WalStream { sync_rules: HydratedSyncRules; group_id: number; @@ -133,11 +141,25 @@ export class WalStream { private startedStreaming = false; private snapshotChunkLength: number; + private onSnapshotChunkFlushed?: () => Promise; private replicationLag = new ReplicationLagTracker(); private initialSnapshotPromise: Promise | null = null; + private slotHealthCheckIntervalMs: number; + + /** Cached max_slot_wal_keep_size in bytes. null = not yet queried. */ + private maxSlotWalKeepSize: number | null = null; + /** Whether max_slot_wal_keep_size has been queried (distinguishes "not + * queried" from "queried and found unlimited"). */ + private maxSlotWalKeepSizeQueried = false; + + /** Previous safe_wal_size sample for rate calculation. */ + private prevWalBudgetSample: { safeWalSize: number; timestamp: number } | null = null; + /** Timestamp of last slot health check. */ + private lastSlotHealthCheckTime = 0; + constructor(options: WalStreamOptions) { this.logger = options.logger ?? defaultLogger; this.storage = options.storage; @@ -147,6 +169,8 @@ export class WalStream { this.slot_name = options.storage.slot_name; this.connections = options.connections; this.snapshotChunkLength = options.snapshotChunkLength ?? 10_000; + this.onSnapshotChunkFlushed = options.onSnapshotChunkFlushed; + this.slotHealthCheckIntervalMs = options.slotHealthCheckIntervalMs ?? 120_000; this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -326,8 +350,15 @@ export class WalStream { const lost = slot.wal_status == 'lost'; if (lost) { // Case 1 / 4 + const fixGuidance = + slot.invalidation_reason === 'idle_timeout' + ? `Increase idle_replication_slot_timeout on the source database.` + : `Increase max_slot_wal_keep_size on the source database.`; throw new MissingReplicationSlotError( - `Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}` + `[PSYNC_S1146] Replication slot ${slotName} was invalidated ` + + `(reason: ${slot.invalidation_reason ?? 'unknown'}). ` + + `${fixGuidance}`, + { walStatus: 'lost', phase: 'streaming', invalidationReason: slot.invalidation_reason ?? undefined } ); } // Case 3 / 6 @@ -339,7 +370,10 @@ export class WalStream { if (snapshotDone) { // Case 5 // This will create a new slot, while keeping the current sync rules active - throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); + throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`, { + walStatus: 'missing', + phase: 'streaming' + }); } // Case 2 // This will clear data (if any) and re-create the same slot @@ -347,6 +381,132 @@ export class WalStream { } } + private async queryMaxSlotWalKeepSize(): Promise { + if (this.maxSlotWalKeepSizeQueried) { + return this.maxSlotWalKeepSize; + } + this.maxSlotWalKeepSizeQueried = true; + + try { + const rows = pgwire.pgwireRows( + await this.connections.pool.query({ + statement: `SELECT setting, unit FROM pg_settings WHERE name = 'max_slot_wal_keep_size'` + }) + ); + if (rows.length === 0) { + // PG < 13 or setting doesn't exist + return null; + } + const setting = Number(rows[0].setting); + if (setting < 0) { + // -1 = unlimited + this.maxSlotWalKeepSize = null; + return null; + } + // setting is in MB, convert to bytes + const unit = rows[0].unit; + const multiplier = unit === 'kB' ? 1024 : unit === '8kB' ? 8192 : 1024 * 1024; // default MB + this.maxSlotWalKeepSize = setting * multiplier; + return this.maxSlotWalKeepSize; + } catch (e) { + // Non-fatal — budget reporting is best-effort + this.logger.warn(`Could not query max_slot_wal_keep_size`, e); + return null; + } + } + + private async logWalBudget(slot: { safe_wal_size?: any; wal_status?: string }, now: number): Promise { + const maxSize = await this.queryMaxSlotWalKeepSize(); + + // No limit configured + if (maxSize == null) { + this.logger.info(`WAL budget: no limit configured (max_slot_wal_keep_size is unlimited).`); + return; + } + + const safeWalSize = slot.safe_wal_size != null ? Number(slot.safe_wal_size) : null; + if (safeWalSize == null) { + // safe_wal_size is null on PG < 13, or when no limit is set + return; + } + + const report = computeWalBudgetReport({ + safeWalSize, + maxSize, + walStatus: slot.wal_status ?? 'unknown', + prevSample: this.prevWalBudgetSample, + now + }); + + // Update sample for next rate calculation + this.prevWalBudgetSample = { safeWalSize, timestamp: now }; + + const budgetLine = formatWalBudgetLine(report); + + if (report.isWarning) { + this.logger.warn(budgetLine); + this.logger.warn( + `Replication slot may be invalidated before snapshot completes. ` + + `Increase max_slot_wal_keep_size on the source database.` + ); + } else { + this.logger.info(budgetLine); + } + } + + /** + * Check if the replication slot is still valid. Called after each chunk + * flush during snapshot to detect slot invalidation early. + * + * The query hits pg_replication_slots (shared memory, not a table scan) + * and costs ~1-2ms per round-trip — negligible next to the per-chunk + * storage flush. + */ + private async checkSlotHealth(): Promise { + // Ensure maxSlotWalKeepSize is populated for diagnostic context in error messages + await this.queryMaxSlotWalKeepSize(); + + const rows = pgwire.pgwireRows( + await this.connections.pool.query({ + statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', + params: [{ type: 'varchar', value: this.slot_name }] + }) + ); + + if (rows.length === 0) { + // Slot row gone from pg_replication_slots — dropped externally. + // Possible causes: pg_drop_replication_slot() call (operator, management tool, cleanup cron) + throw new MissingReplicationSlotError( + `[PSYNC_S1146] Replication slot ${this.slot_name} disappeared during snapshot.`, + { walStatus: 'missing', phase: 'snapshot' } + ); + } + + const walStatus = rows[0].wal_status; + if (walStatus === 'lost') { + // Postgres marked the slot invalid. Possible invalidation_reason values (PG 14+): + // - wal_removed: WAL growth exceeded max_slot_wal_keep_size (the primary case) + // - wal_level_insufficient: wal_level changed away from 'logical' + // - idle_timeout (PG 18+): idle_replication_slot_timeout expired + // - rows_removed: catalog rows needed by the slot were vacuumed (safe to retry — fresh slot won't need them) + throw new MissingReplicationSlotError( + `[PSYNC_S1146] Replication slot ${this.slot_name} was invalidated during snapshot` + + `${this.formatWalBudgetContext()}. ` + + `Increase max_slot_wal_keep_size on the source database.`, + { walStatus: 'lost', phase: 'snapshot', invalidationReason: rows[0].invalidation_reason ?? undefined } + ); + } + + await this.logWalBudget(rows[0], performance.now()); + } + + private formatWalBudgetContext(): string { + if (this.maxSlotWalKeepSize == null) { + return ''; + } + return ` (limit: ${formatBytes(this.maxSlotWalKeepSize)})`; + } + async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { const results = await db.query({ statement: `SELECT reltuples::bigint AS estimate @@ -597,6 +757,14 @@ WHERE oid = $1::regclass`, // Important: flush before marking progress await batch.flush(); + if (this.onSnapshotChunkFlushed) { + await this.onSnapshotChunkFlushed(); + } + const now = performance.now(); + if (now - this.lastSlotHealthCheckTime >= this.slotHealthCheckIntervalMs) { + this.lastSlotHealthCheckTime = now; + await this.checkSlotHealth(); + } if (limited == null) { let lastKey: Uint8Array | undefined; if (q instanceof ChunkedSnapshotQuery) { @@ -852,7 +1020,7 @@ WHERE oid = $1::regclass`, await this.streamChangesInternal(replicationConnection); } catch (e) { if (isReplicationSlotInvalidError(e)) { - throw new MissingReplicationSlotError(e.message, e); + throw new MissingReplicationSlotError(e.message, { walStatus: 'lost', phase: 'streaming', cause: e }); } throw e; } diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index d715b50c6..d865697c7 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -1,6 +1,7 @@ import { container, logger } from '@powersync/lib-services-framework'; +import { MissingReplicationSlotError, shouldRetryReplication } from './MissingReplicationSlotError.js'; import { PgManager } from './PgManager.js'; -import { MissingReplicationSlotError, sendKeepAlive, WalStream } from './WalStream.js'; +import { sendKeepAlive, WalStream } from './WalStream.js'; import { replication } from '@powersync/service-core'; import { getApplicationName } from '../utils/application-name.js'; @@ -91,8 +92,10 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob } if (e instanceof MissingReplicationSlotError) { - // This stops replication on this slot and restarts with a new slot - await this.options.storage.factory.restartReplication(this.storage.group_id); + if (shouldRetryReplication(e)) { + // This stops replication on this slot and restarts with a new slot + await this.options.storage.factory.restartReplication(this.storage.group_id); + } } // No need to rethrow - the error is already logged, and retry behavior is the same on error diff --git a/modules/module-postgres/src/replication/replication-index.ts b/modules/module-postgres/src/replication/replication-index.ts index 1bf207453..4cc41b63d 100644 --- a/modules/module-postgres/src/replication/replication-index.ts +++ b/modules/module-postgres/src/replication/replication-index.ts @@ -1,5 +1,7 @@ +export * from './MissingReplicationSlotError.js'; export * from './PgRelation.js'; export * from './replication-utils.js'; +export * from './wal-budget-utils.js'; export * from './WalStream.js'; export * from './WalStreamReplicationJob.js'; export * from './WalStreamReplicator.js'; diff --git a/modules/module-postgres/src/replication/wal-budget-utils.ts b/modules/module-postgres/src/replication/wal-budget-utils.ts new file mode 100644 index 000000000..3637824f7 --- /dev/null +++ b/modules/module-postgres/src/replication/wal-budget-utils.ts @@ -0,0 +1,83 @@ +export function formatBytes(bytes: number): string { + if (bytes >= 1024 * 1024 * 1024) { + return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)}GB`; + } else if (bytes >= 1024 * 1024) { + return `${(bytes / (1024 * 1024)).toFixed(1)}MB`; + } else if (bytes >= 1024) { + return `${(bytes / 1024).toFixed(1)}KB`; + } + return `${bytes}B`; +} + +export function formatDuration(hours: number): string { + if (hours >= 24) { + return `${(hours / 24).toFixed(1)} days`; + } else if (hours >= 1) { + return `${hours.toFixed(1)} hours`; + } + return `${Math.round(hours * 60)} minutes`; +} + +export interface WalBudgetSample { + safeWalSize: number; + timestamp: number; +} + +export interface WalBudgetReport { + budgetRemainingPct: number; + safeWalSize: number; + maxSize: number; + walStatus: string; + ratePerHour: number | null; + etaHours: number | null; + isWarning: boolean; +} + +export function computeWalBudgetReport(opts: { + safeWalSize: number; + maxSize: number; + walStatus: string; + prevSample: WalBudgetSample | null; + now: number; +}): WalBudgetReport { + const budgetRemainingPct = Math.round((opts.safeWalSize / opts.maxSize) * 100); + + let ratePerHour: number | null = null; + let etaHours: number | null = null; + + if (opts.prevSample != null) { + const elapsedMs = opts.now - opts.prevSample.timestamp; + const consumed = opts.prevSample.safeWalSize - opts.safeWalSize; + if (elapsedMs > 0 && consumed > 0) { + ratePerHour = (consumed / elapsedMs) * 3_600_000; + const eta = opts.safeWalSize / ratePerHour; + etaHours = eta < 48 ? eta : null; + } + } + + return { + budgetRemainingPct, + safeWalSize: opts.safeWalSize, + maxSize: opts.maxSize, + walStatus: opts.walStatus, + ratePerHour, + etaHours, + isWarning: budgetRemainingPct <= 50 + }; +} + +export function formatWalBudgetLine(report: WalBudgetReport): string { + let line = + `WAL budget: ${formatBytes(report.safeWalSize)} remaining of ` + + `${formatBytes(report.maxSize)} limit (${report.budgetRemainingPct}% remaining).`; + + if (report.ratePerHour != null) { + line += ` WAL consumption: ~${formatBytes(report.ratePerHour)}/hr.`; + if (report.etaHours != null) { + line += ` ETA to exhaustion: ~${formatDuration(report.etaHours)}.`; + } + } + + line += ` Slot status: ${report.walStatus}.`; + return line; +} diff --git a/modules/module-postgres/test/src/replication_retry.test.ts b/modules/module-postgres/test/src/replication_retry.test.ts new file mode 100644 index 000000000..a1d191eb0 --- /dev/null +++ b/modules/module-postgres/test/src/replication_retry.test.ts @@ -0,0 +1,40 @@ +import { + MissingReplicationSlotError, + shouldRetryReplication +} from '@module/replication/MissingReplicationSlotError.js'; +import { describe, expect, test } from 'vitest'; + +describe('shouldRetryReplication', () => { + test('blocks retry when slot lost during snapshot', () => { + const error = new MissingReplicationSlotError('test', { + walStatus: 'lost', + phase: 'snapshot' + }); + expect(shouldRetryReplication(error)).toBe(false); + }); + + test('allows retry when slot lost during streaming', () => { + const error = new MissingReplicationSlotError('test', { + walStatus: 'lost', + phase: 'streaming' + }); + expect(shouldRetryReplication(error)).toBe(true); + }); + + test('allows retry when slot is missing', () => { + const error = new MissingReplicationSlotError('test', { + walStatus: 'missing', + phase: 'snapshot' + }); + expect(shouldRetryReplication(error)).toBe(true); + }); + + test('allows retry when invalidation reason is rows_removed', () => { + const error = new MissingReplicationSlotError('test', { + walStatus: 'lost', + phase: 'snapshot', + invalidationReason: 'rows_removed' + }); + expect(shouldRetryReplication(error)).toBe(true); + }); +}); diff --git a/modules/module-postgres/test/src/wal_budget.test.ts b/modules/module-postgres/test/src/wal_budget.test.ts new file mode 100644 index 000000000..b4c52235a --- /dev/null +++ b/modules/module-postgres/test/src/wal_budget.test.ts @@ -0,0 +1,183 @@ +import { + computeWalBudgetReport, + formatBytes, + formatDuration, + formatWalBudgetLine +} from '@module/replication/wal-budget-utils.js'; +import { describe, expect, test } from 'vitest'; + +describe('formatBytes', () => { + test('formats bytes', () => { + expect(formatBytes(500)).toBe('500B'); + }); + + test('formats kilobytes', () => { + expect(formatBytes(1024)).toBe('1.0KB'); + expect(formatBytes(1536)).toBe('1.5KB'); + }); + + test('formats megabytes', () => { + expect(formatBytes(1024 * 1024)).toBe('1.0MB'); + expect(formatBytes(1.5 * 1024 * 1024)).toBe('1.5MB'); + }); + + test('formats gigabytes', () => { + expect(formatBytes(1024 * 1024 * 1024)).toBe('1.0GB'); + expect(formatBytes(10.5 * 1024 * 1024 * 1024)).toBe('10.5GB'); + }); +}); + +describe('formatDuration', () => { + test('formats minutes', () => { + expect(formatDuration(0.5)).toBe('30 minutes'); + }); + + test('formats hours', () => { + expect(formatDuration(1.0)).toBe('1.0 hours'); + expect(formatDuration(5.5)).toBe('5.5 hours'); + }); + + test('formats days', () => { + expect(formatDuration(24)).toBe('1.0 days'); + expect(formatDuration(48)).toBe('2.0 days'); + }); +}); + +describe('computeWalBudgetReport', () => { + const GB = 1024 * 1024 * 1024; + + test('computes budget percentage', () => { + const report = computeWalBudgetReport({ + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: null, + now: 1000 + }); + expect(report.budgetRemainingPct).toBe(80); + expect(report.isWarning).toBe(false); + }); + + test('warns at 50% threshold', () => { + const report = computeWalBudgetReport({ + safeWalSize: 5 * GB, + maxSize: 10 * GB, + walStatus: 'extended', + prevSample: null, + now: 1000 + }); + expect(report.budgetRemainingPct).toBe(50); + expect(report.isWarning).toBe(true); + }); + + test('warns below 50%', () => { + const report = computeWalBudgetReport({ + safeWalSize: 4.2 * GB, + maxSize: 10 * GB, + walStatus: 'extended', + prevSample: null, + now: 1000 + }); + expect(report.budgetRemainingPct).toBe(42); + expect(report.isWarning).toBe(true); + }); + + test('no rate without previous sample', () => { + const report = computeWalBudgetReport({ + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: null, + now: 1000 + }); + expect(report.ratePerHour).toBeNull(); + expect(report.etaHours).toBeNull(); + }); + + test('computes rate from two samples', () => { + const twoMinutesMs = 2 * 60 * 1000; + const report = computeWalBudgetReport({ + safeWalSize: 7 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: { safeWalSize: 8 * GB, timestamp: 0 }, + now: twoMinutesMs + }); + // 1 GB consumed in 2 minutes = 30 GB/hr + expect(report.ratePerHour).toBeCloseTo(30 * GB, -5); + // 7 GB remaining at 30 GB/hr ≈ 0.233 hours + expect(report.etaHours).toBeCloseTo(7 / 30, 3); + }); + + test('suppresses ETA when > 48 hours', () => { + const oneHourMs = 3_600_000; + const report = computeWalBudgetReport({ + safeWalSize: 9.99 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: { safeWalSize: 10 * GB, timestamp: 0 }, + now: oneHourMs + }); + // 0.01 GB/hr → ETA = 999 hours → suppressed + expect(report.ratePerHour).toBeCloseTo(0.01 * GB, -5); + expect(report.etaHours).toBeNull(); + }); + + test('null rate when no WAL consumed', () => { + const report = computeWalBudgetReport({ + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + prevSample: { safeWalSize: 8 * GB, timestamp: 0 }, + now: 60_000 + }); + // safe_wal_size unchanged → consumed = 0 → no rate + expect(report.ratePerHour).toBeNull(); + expect(report.etaHours).toBeNull(); + }); +}); + +describe('formatWalBudgetLine', () => { + const GB = 1024 * 1024 * 1024; + + test('formats basic budget line without rate', () => { + const line = formatWalBudgetLine({ + budgetRemainingPct: 80, + safeWalSize: 8 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + ratePerHour: null, + etaHours: null, + isWarning: false + }); + expect(line).toBe('WAL budget: 8.0GB remaining of 10.0GB limit (80% remaining). Slot status: reserved.'); + }); + + test('formats budget line with rate and ETA', () => { + const line = formatWalBudgetLine({ + budgetRemainingPct: 65, + safeWalSize: 6.5 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + ratePerHour: 1.2 * GB, + etaHours: 5.4, + isWarning: false + }); + expect(line).toContain('WAL consumption: ~1.2GB/hr.'); + expect(line).toContain('ETA to exhaustion: ~5.4 hours.'); + }); + + test('formats budget line with rate but no ETA (suppressed)', () => { + const line = formatWalBudgetLine({ + budgetRemainingPct: 99, + safeWalSize: 9.9 * GB, + maxSize: 10 * GB, + walStatus: 'reserved', + ratePerHour: 0.01 * GB, + etaHours: null, + isWarning: false + }); + expect(line).toContain('WAL consumption:'); + expect(line).not.toContain('ETA to exhaustion'); + }); +}); diff --git a/modules/module-postgres/test/src/wal_budget_api.test.ts b/modules/module-postgres/test/src/wal_budget_api.test.ts new file mode 100644 index 000000000..dcd6a1072 --- /dev/null +++ b/modules/module-postgres/test/src/wal_budget_api.test.ts @@ -0,0 +1,96 @@ +import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js'; +import { describe, expect, test } from 'vitest'; +import { describeWithStorage, StorageVersionTestContext } from './util.js'; +import { WalStreamTestContext, withMaxWalSize } from './wal_stream_utils.js'; + +describe('getSlotWalBudget', () => { + describeWithStorage({ timeout: 20_000 }, defineWalBudgetTests); +}); + +function defineWalBudgetTests({ factory, storageVersion }: StorageVersionTestContext) { + const openContext = (options?: Parameters[1]) => { + return WalStreamTestContext.open(factory, { ...options, storageVersion }); + }; + + test('returns WAL budget with configured limit', async () => { + await using context = await openContext(); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); + + // Start replication to create the slot + await context.replicateSnapshot(); + + const serverVersion = await context.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`wal_status not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + await using _walSize = await withMaxWalSize(pool, '1GB'); + + const adapter = new PostgresRouteAPIAdapter(pool); + const budget = await adapter.getSlotWalBudget({ + slotName: context.storage!.slot_name + }); + + expect(budget).toBeDefined(); + expect(budget!.wal_status).toBeTypeOf('string'); + expect(budget!.safe_wal_size).toBeTypeOf('number'); + expect(budget!.max_slot_wal_keep_size).toBeTypeOf('number'); + expect(budget!.max_slot_wal_keep_size).toBeGreaterThan(0); + }); + + test('returns undefined max when unlimited', async () => { + await using context = await openContext(); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); + + await context.replicateSnapshot(); + + const serverVersion = await context.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`wal_status not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + // Default is -1 (unlimited) — ensure it's set + await using _walSize = await withMaxWalSize(pool, '-1'); + + const adapter = new PostgresRouteAPIAdapter(pool); + const budget = await adapter.getSlotWalBudget({ + slotName: context.storage!.slot_name + }); + + expect(budget).toBeDefined(); + expect(budget!.wal_status).toBeTypeOf('string'); + expect(budget!.max_slot_wal_keep_size).toBeUndefined(); + // safe_wal_size is null when no limit is configured + expect(budget!.safe_wal_size).toBeUndefined(); + }); + + test('returns undefined when slot is missing', async () => { + await using context = await openContext(); + const { pool } = context; + + const adapter = new PostgresRouteAPIAdapter(pool); + const budget = await adapter.getSlotWalBudget({ + slotName: 'nonexistent_slot' + }); + + expect(budget).toBeUndefined(); + }); +} diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 5dd6aee65..ff4a3b140 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -1,4 +1,4 @@ -import { MissingReplicationSlotError } from '@module/replication/WalStream.js'; +import { MissingReplicationSlotError } from '@module/replication/MissingReplicationSlotError.js'; import { METRICS_HELPER, putOp, removeOp } from '@powersync/service-core-tests'; import { pgwireRows } from '@powersync/service-jpgwire'; import { JSONBig } from '@powersync/service-jsonbig'; @@ -470,6 +470,102 @@ bucket_definitions: } }); + test('slot lost during snapshot aborts early', { timeout: 120_000 }, async () => { + await using baseContext = await openContext({ doNotClear: true }); + + const serverVersion = await baseContext.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`max_slot_wal_keep_size not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + await using _walSize = await withMaxWalSize(baseContext.pool, '100MB'); + + const walPool = baseContext.pool; + await using context = await openContext({ + walStreamOptions: { + snapshotChunkLength: 100, + slotHealthCheckIntervalMs: 0, + onSnapshotChunkFlushed: async () => { + // Generate ~16MB WAL per call. Slot lost after ~7 chunks. + await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); + await walPool.query(`SELECT pg_switch_wal()`); + await walPool.query(`CHECKPOINT`); + } + } + }); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); + await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); + + // The hook generates WAL synchronously after each chunk flush. + // Post-implementation, the WAL check (at the same injection point) + // detects slot invalidation and aborts the snapshot. + await expect(async () => { + await context.replicateSnapshot(); + }).rejects.toThrowError(MissingReplicationSlotError); + + // Snapshot should NOT be marked as complete + const status = await context.storage!.getStatus(); + expect(status.snapshot_done).toBe(false); + }); + + test('slot invalidation error carries diagnostic context', { timeout: 120_000 }, async () => { + await using baseContext = await openContext({ doNotClear: true }); + + const serverVersion = await baseContext.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`max_slot_wal_keep_size not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + await using _walSize = await withMaxWalSize(baseContext.pool, '100MB'); + + const walPool = baseContext.pool; + await using context = await openContext({ + walStreamOptions: { + snapshotChunkLength: 100, + slotHealthCheckIntervalMs: 0, + onSnapshotChunkFlushed: async () => { + await walPool.query(`SELECT pg_logical_emit_message(true, 'test', 'x')`); + await walPool.query(`SELECT pg_switch_wal()`); + await walPool.query(`CHECKPOINT`); + } + } + }); + const { pool } = context; + + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); + await pool.query(`INSERT INTO test_data(description) SELECT 'row ' || g FROM generate_series(1, 1000) g`); + + let caughtError: any; + try { + await context.replicateSnapshot(); + } catch (e) { + caughtError = e; + } + + // Error should be a MissingReplicationSlotError with diagnostic context + expect(caughtError).toBeInstanceOf(MissingReplicationSlotError); + expect(caughtError.walStatus).toBe('lost'); + expect(caughtError.phase).toBe('snapshot'); + expect(caughtError.message).toContain('PSYNC_S1146'); + expect(caughtError.message).toContain('limit:'); + }); + test('old date format', async () => { await using context = await openContext(); await context.updateSyncRules(BASIC_SYNC_RULES); diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 94f1d3f35..22429269a 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -265,8 +265,10 @@ export async function withMaxWalSize(db: pgwire.PgClient, size: string) { try { const r1 = await db.query(`SHOW max_slot_wal_keep_size`); - await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '100MB'`); + await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '${size}'`); await db.query(`SELECT pg_reload_conf()`); + // Wait for the config reload to propagate to all backends + await new Promise((resolve) => setTimeout(resolve, 100)); const oldSize = r1.results[0].rows[0].decodeWithoutCustomTypes(0); @@ -274,6 +276,8 @@ export async function withMaxWalSize(db: pgwire.PgClient, size: string) { [Symbol.asyncDispose]: async () => { await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '${oldSize}'`); await db.query(`SELECT pg_reload_conf()`); + // Wait for the config reload to propagate to all backends + await new Promise((resolve) => setTimeout(resolve, 100)); } }; } catch (e) { diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index d98b7747c..f40a67a44 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -14,6 +14,16 @@ export interface ReplicationLagOptions { bucketStorage: SyncRulesBucketStorage; } +export interface SlotWalBudgetInfo { + wal_status: string; + safe_wal_size?: number; + max_slot_wal_keep_size?: number; +} + +export interface SlotWalBudgetOptions { + slotName: string; +} + /** * Describes all the methods currently required to service the sync API endpoints. */ @@ -49,6 +59,13 @@ export interface RouteAPI { */ getReplicationLagBytes(options: ReplicationLagOptions): Promise; + /** + * @returns WAL budget information for the replication slot, or undefined + * if the slot doesn't exist or the source doesn't support this. + * Only implemented by the Postgres adapter. + */ + getSlotWalBudget?(options: SlotWalBudgetOptions): Promise; + /** * Get the current LSN or equivalent replication HEAD position identifier. * diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 4100ec018..dbf09bfa3 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -4,7 +4,7 @@ import { ReplicationError, SyncRulesStatus, TableInfo } from '@powersync/service import * as storage from '../storage/storage-index.js'; import { syncConfigYamlErrorToReplicationError } from '../util/errors.js'; -import { RouteAPI } from './RouteAPI.js'; +import { RouteAPI, SlotWalBudgetInfo } from './RouteAPI.js'; export interface DiagnosticsOptions { /** @@ -63,6 +63,7 @@ export async function getSyncRulesStatus( const systemStorage = live_status ? bucketStorage.getInstance(sync_rules) : undefined; const status = await systemStorage?.getStatus(); let replication_lag_bytes: number | undefined = undefined; + let slot_wal_budget: SlotWalBudgetInfo | undefined = undefined; let tables_flat: TableInfo[] = []; @@ -88,6 +89,16 @@ export async function getSyncRulesStatus( // Ignore logger.warn(`Unable to get replication lag`, e); } + + if (apiHandler.getSlotWalBudget) { + try { + slot_wal_budget = await apiHandler.getSlotWalBudget({ + slotName: sync_rules.slot_name + }); + } catch (e) { + logger.warn(`Unable to get WAL budget`, e); + } + } } } else { const source_table_patterns = rules.getSourceTables(); @@ -182,6 +193,9 @@ export async function getSyncRulesStatus( last_checkpoint_ts: sync_rules.last_checkpoint_ts?.toISOString(), last_keepalive_ts: sync_rules.last_keepalive_ts?.toISOString(), replication_lag_bytes: replication_lag_bytes, + wal_status: slot_wal_budget?.wal_status, + safe_wal_size: slot_wal_budget?.safe_wal_size, + max_slot_wal_keep_size: slot_wal_budget?.max_slot_wal_keep_size, tables: tables_flat } ], diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index 1b17476c8..92bca2b22 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -186,6 +186,19 @@ export enum ErrorCode { */ PSYNC_S1145 = 'PSYNC_S1145', + /** + * Replication slot invalidated. + * + * The replication slot was invalidated by PostgreSQL, typically because + * WAL retention exceeded max_slot_wal_keep_size during a long-running + * snapshot. Increase max_slot_wal_keep_size on the source database and + * redeploy sync rules. + * + * Other causes: rows_removed (catalog rows needed by the slot were + * removed), wal_level_insufficient, idle_timeout (PG 18+). + */ + PSYNC_S1146 = 'PSYNC_S1146', + // ## PSYNC_S12xx: MySQL replication issues // ## PSYNC_S13xx: MongoDB replication issues diff --git a/packages/types/src/definitions.ts b/packages/types/src/definitions.ts index f02eae960..f65a977bd 100644 --- a/packages/types/src/definitions.ts +++ b/packages/types/src/definitions.ts @@ -73,6 +73,25 @@ export const SyncRulesStatus = t.object({ /** Replication lag in bytes. undefined if we cannot calculate this. */ replication_lag_bytes: t.number.optional(), + /** + * Replication slot WAL status (PG 13+). + * Values: 'reserved', 'extended', 'unreserved', 'lost'. + * Undefined for non-Postgres sources or PG < 13. + */ + wal_status: t.string.optional(), + + /** + * WAL budget remaining in bytes before potential slot invalidation + * (PG 13+, only when max_slot_wal_keep_size is set). + */ + safe_wal_size: t.number.optional(), + + /** + * Configured max_slot_wal_keep_size in bytes (PG 13+). + * Undefined when unlimited (-1) or not available. + */ + max_slot_wal_keep_size: t.number.optional(), + tables: t.array(TableInfo) }) ),