Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/bright-walls-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-types': patch
'@powersync/service-core': patch
'@powersync/service-module-postgres': patch
'@powersync/service-errors': patch
---

Detect WAL slot invalidation mid-snapshot, warn on WAL budget depletion, block futile retries, and surface WAL budget in the diagnostics API.
50 changes: 50 additions & 0 deletions modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,56 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`,
});
}

async getSlotWalBudget(options: api.SlotWalBudgetOptions): Promise<api.SlotWalBudgetInfo | undefined> {
const { slotName } = options;

// Query slot status
const slotResult = await lib_postgres.retriedQuery(this.pool, {
statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1',
params: [{ type: 'varchar', value: slotName }]
});
const [slot] = pgwire.pgwireRows(slotResult);
if (!slot) {
return undefined;
}

const walStatus = slot.wal_status;
if (walStatus == null) {
// PG < 13 — wal_status column doesn't exist
return undefined;
}

const result: api.SlotWalBudgetInfo = {
wal_status: String(walStatus)
};

// Query max_slot_wal_keep_size
try {
const settingsResult = await lib_postgres.retriedQuery(this.pool, {
statement: `SELECT setting, unit FROM pg_settings WHERE name = 'max_slot_wal_keep_size'`
});
const [row] = pgwire.pgwireRows(settingsResult);
if (row) {
const setting = Number(row.setting);
if (setting >= 0) {
const unit = String(row.unit ?? 'MB');
const multiplier = unit === 'kB' ? 1024 : unit === '8kB' ? 8192 : 1024 * 1024; // default MB
result.max_slot_wal_keep_size = setting * multiplier;
}
// setting < 0 means unlimited — leave undefined
}
} catch (e) {
// Best-effort — budget info is informational
}

// safe_wal_size (PG 13+, only populated when max_slot_wal_keep_size is set)
if (slot.safe_wal_size != null) {
result.safe_wal_size = Number(slot.safe_wal_size);
}

return result;
}

async getReplicationHead(): Promise<string> {
// On most Postgres versions, pg_logical_emit_message() returns the correct LSN.
// However, on Aurora (Postgres compatible), it can return an entirely different LSN,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Replication slot WAL status from `pg_replication_slots.wal_status` (PG 13+).
* - `reserved`: WAL needed by the slot is within `max_wal_size`
* - `extended`: WAL needed exceeds `max_wal_size` but is protected by `max_slot_wal_keep_size`
* - `unreserved`: WAL may be removed at next checkpoint — slot is at risk
* - `lost`: WAL has been removed — slot is invalidated and unusable
* - `missing`: synthetic value — the slot row is absent from `pg_replication_slots`
*/
export type WalStatus = 'reserved' | 'extended' | 'unreserved' | 'lost' | 'missing';

/**
* Replication phase when the error was detected.
* - `snapshot`: during an active full-table scan (snapshotTable) — long-running, retry may be futile
* - `streaming`: during WAL streaming, at startup, or between replication cycles — retry is safe
*/
export type ReplicationPhase = 'snapshot' | 'streaming';

export class MissingReplicationSlotError extends Error {
Comment thread
Sleepful marked this conversation as resolved.
/** Slot WAL status at the time the error was detected. */
walStatus: WalStatus;
/** Replication phase when the error occurred — controls retry behavior. */
phase: ReplicationPhase;
/** PG 14+ invalidation reason from `pg_replication_slots.invalidation_reason`. */
invalidationReason?: string;

constructor(
message: string,
options: {
walStatus: WalStatus;
phase: ReplicationPhase;
invalidationReason?: string;
cause?: any;
}
) {
super(message);
this.cause = options.cause;
this.walStatus = options.walStatus;
this.phase = options.phase;
this.invalidationReason = options.invalidationReason;
}
}

/**
* Determines whether replication should be retried after a slot invalidation.
*
* Returns false when retry would be futile (e.g. slot lost during snapshot
* due to WAL budget exhaustion — retrying would repeat the same long snapshot
* and likely fail again).
*
* Blocks retry when walStatus is 'lost' during snapshot phase (unless the
* invalidation reason is 'rows_removed', which is not a WAL budget issue).
* Allows retry in all other cases.
*/
export function shouldRetryReplication(error: MissingReplicationSlotError): boolean {
if (error.walStatus === 'lost' && error.phase === 'snapshot' && error.invalidationReason !== 'rows_removed') {
return false;
}
return true;
}
Comment thread
Sleepful marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ErrorRateLimiter } from '@powersync/service-core';
import { setTimeout } from 'timers/promises';
import { MissingReplicationSlotError } from './WalStream.js';
import { MissingReplicationSlotError } from './MissingReplicationSlotError.js';

export class PostgresErrorRateLimiter implements ErrorRateLimiter {
nextAllowed: number = Date.now();
Expand Down
Loading
Loading