diff --git a/packages/boxel-cli/src/commands/realm/index.ts b/packages/boxel-cli/src/commands/realm/index.ts index 088a16b253..0616808544 100644 --- a/packages/boxel-cli/src/commands/realm/index.ts +++ b/packages/boxel-cli/src/commands/realm/index.ts @@ -7,6 +7,7 @@ import { registerPullCommand } from './pull'; import { registerPushCommand } from './push'; import { registerSyncCommand } from './sync'; import { registerWaitForReadyCommand } from './wait-for-ready'; +import { registerWatchCommand } from './watch'; export function registerRealmCommand(program: Command): void { let realm = program @@ -21,4 +22,5 @@ export function registerRealmCommand(program: Command): void { registerPushCommand(realm); registerSyncCommand(realm); registerWaitForReadyCommand(realm); + registerWatchCommand(realm); } diff --git a/packages/boxel-cli/src/commands/realm/watch.ts b/packages/boxel-cli/src/commands/realm/watch.ts new file mode 100644 index 0000000000..c5b7154c2d --- /dev/null +++ b/packages/boxel-cli/src/commands/realm/watch.ts @@ -0,0 +1,626 @@ +import { InvalidArgumentError, type Command } from 'commander'; +import * as fs from 'fs/promises'; +import * as path from 'path'; +import { RealmSyncBase, isProtectedFile } from '../../lib/realm-sync-base'; +import { + CheckpointManager, + type Checkpoint, + type CheckpointChange, +} from '../../lib/checkpoint-manager'; +import { + type SyncManifest, + computeFileHash, + loadManifest, + saveManifest, +} from '../../lib/sync-manifest'; +import type { ProfileManager } from '../../lib/profile-manager'; +import type { RealmAuthenticator } from '../../lib/realm-authenticator'; +import { resolveRealmAuthenticator } from '../../lib/auth-resolver'; +import { resolveRealmSecretSeed } from '../../lib/prompt'; +import { + acquireWatchLock, + releaseWatchLock, + type WatchLockInfo, +} from '../../lib/watch-lock'; +import { + FG_CYAN, + FG_GREEN, + FG_RED, + FG_YELLOW, + DIM, + RESET, +} from '../../lib/colors'; + +export interface WatchRealmSpec { + realmUrl: string; + localDir: string; +} + +interface PendingChange { + status: 'added' | 'modified' | 'deleted'; + mtime: number; +} + +export interface FlushResult { + pulled: string[]; + deleted: string[]; + checkpoint: Checkpoint | null; +} + +/** + * Watches a single realm by polling `_mtimes`, accumulating changes between + * ticks, and applying them in a debounced batch (download + delete + write + * a checkpoint). One instance per realm; `watchRealms()` orchestrates many. + */ +export class RealmWatcher extends RealmSyncBase { + readonly name: string; + private readonly debounceMs: number; + private readonly checkpointManager: CheckpointManager; + private lastKnownMtimes = new Map(); + private pendingChanges = new Map(); + private debounceTimer: NodeJS.Timeout | null = null; + private isShutdown = false; + + constructor( + spec: WatchRealmSpec, + authenticator: RealmAuthenticator, + options: { debounceMs: number }, + ) { + super({ realmUrl: spec.realmUrl, localDir: spec.localDir }, authenticator); + this.debounceMs = options.debounceMs; + this.checkpointManager = new CheckpointManager(spec.localDir); + this.name = deriveRealmName(this.normalizedRealmUrl); + } + + /** RealmSyncBase requires `sync()`. For the watcher, run one poll+apply. */ + async sync(): Promise { + await this.poll(); + await this.flushPending(); + } + + // Override: base swallows errors → empty map, which the watcher would + // read as "every file deleted" and wipe the local dir on a network blip. + protected override async getRemoteMtimes(): Promise> { + const url = `${this.normalizedRealmUrl}_mtimes`; + const response = await this.authenticator.authedRealmFetch(url, { + headers: { Accept: 'application/vnd.api+json' }, + }); + if (!response.ok) { + throw new Error( + `_mtimes fetch failed for ${this.normalizedRealmUrl}: ${response.status} ${response.statusText}`, + ); + } + const data = (await response.json()) as { + data?: { attributes?: { mtimes?: Record } }; + }; + const mtimes = new Map(); + for (const [fileUrl, mtime] of Object.entries( + data.data?.attributes?.mtimes ?? {}, + )) { + mtimes.set(fileUrl.replace(this.normalizedRealmUrl, ''), mtime); + } + return mtimes; + } + + get localDir(): string { + return this.options.localDir; + } + + get realmUrl(): string { + return this.normalizedRealmUrl; + } + + get pendingCount(): number { + return this.pendingChanges.size; + } + + /** + * Verify realm access (via the throw-on-error override), ensure the + * checkpoint history is initialized, and seed `lastKnownMtimes` from the + * on-disk manifest if one exists. + */ + async initialize(): Promise { + await this.getRemoteMtimes(); + + if (!(await this.checkpointManager.isInitialized())) { + await this.checkpointManager.init(); + } + + const manifest = await loadManifest(this.options.localDir); + if ( + manifest && + manifest.realmUrl === this.normalizedRealmUrl && + manifest.remoteMtimes + ) { + for (const [file, mtime] of Object.entries(manifest.remoteMtimes)) { + this.lastKnownMtimes.set(file, mtime); + } + } + } + + /** + * Poll the realm once and accumulate changes into `pendingChanges`. Returns + * true if the poll discovered changes that weren't already pending. + */ + async poll(): Promise { + const remoteMtimes = await this.getRemoteMtimes(); + let hasNewChanges = false; + + for (const [file, mtime] of remoteMtimes) { + if (isProtectedFile(file)) continue; + const last = this.lastKnownMtimes.get(file); + if (last === undefined) { + if (this.recordPending(file, { status: 'added', mtime })) { + hasNewChanges = true; + } + } else if (mtime > last) { + if (this.recordPending(file, { status: 'modified', mtime })) { + hasNewChanges = true; + } + } + } + + for (const file of this.lastKnownMtimes.keys()) { + if (isProtectedFile(file)) continue; + if (!remoteMtimes.has(file)) { + const pending = this.pendingChanges.get(file); + if (pending?.status !== 'deleted') { + this.pendingChanges.set(file, { status: 'deleted', mtime: 0 }); + hasNewChanges = true; + } + } + } + + return hasNewChanges; + } + + /** Apply all currently pending changes immediately, bypassing the debounce. */ + async flushPending(): Promise { + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = null; + } + + if (this.pendingChanges.size === 0) { + return { pulled: [], deleted: [], checkpoint: null }; + } + + // Snapshot then clear before any await — anything an interleaved poll() + // records during this flush rolls into the next one instead of being + // dropped by a trailing clear(). + const drained = new Map(this.pendingChanges); + this.pendingChanges.clear(); + + const pulled: string[] = []; + const deleted: string[] = []; + const changes: CheckpointChange[] = []; + + for (const [file, info] of drained) { + if (info.status === 'deleted') { + const localPath = path.join(this.options.localDir, file); + try { + await fs.unlink(localPath); + } catch (err: any) { + if (err.code !== 'ENOENT') throw err; + } + deleted.push(file); + changes.push({ file, status: 'deleted' }); + } else { + const localPath = path.join(this.options.localDir, file); + await this.downloadFile(file, localPath); + pulled.push(file); + changes.push({ file, status: info.status }); + } + } + + for (const [file, info] of drained) { + if (info.status === 'deleted') { + this.lastKnownMtimes.delete(file); + } else { + this.lastKnownMtimes.set(file, info.mtime); + } + } + + await this.persistManifest(pulled, deleted); + + const checkpoint = await this.checkpointManager.createCheckpoint( + 'remote', + changes, + ); + + return { pulled, deleted, checkpoint }; + } + + /** + * Schedule a debounced flush. Subsequent calls reset the timer so a burst + * of changes lands in a single checkpoint. + */ + scheduleFlush(onFlush?: (result: FlushResult) => void): void { + // Closes the race where a poll() in flight at cleanup() resolves AFTER + // shutdown() and would otherwise arm a new debounceTimer that nothing + // clears — i.e. work scheduled past the watcher's lifetime. + if (this.isShutdown) return; + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + } + this.debounceTimer = setTimeout(async () => { + this.debounceTimer = null; + try { + const result = await this.flushPending(); + onFlush?.(result); + } catch (err) { + console.error( + `${FG_RED}[${this.name}] Error applying changes:${RESET}`, + err, + ); + } + }, this.debounceMs); + } + + shutdown(): void { + // Set the flag before clearing the timer so a concurrent scheduleFlush() + // racing the in-flight poll path observes the shutdown state. + this.isShutdown = true; + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = null; + } + } + + private recordPending(file: string, change: PendingChange): boolean { + const existing = this.pendingChanges.get(file); + if (existing && existing.mtime === change.mtime) { + return false; + } + this.pendingChanges.set(file, change); + return true; + } + + // Mutate just the entries that changed in this flush instead of + // rehashing everything in lastKnownMtimes — keeps each apply O(changed). + private async persistManifest( + pulled: string[], + deleted: string[], + ): Promise { + const prior = await loadManifest(this.options.localDir); + const files: Record = prior?.files + ? { ...prior.files } + : {}; + + for (const file of deleted) { + delete files[file]; + } + for (const file of pulled) { + const localPath = path.join(this.options.localDir, file); + try { + files[file] = await computeFileHash(localPath); + } catch (err: any) { + if (err.code !== 'ENOENT') throw err; + } + } + + const remoteMtimes: Record = {}; + for (const [file, mtime] of this.lastKnownMtimes) { + if (mtime !== 0) { + remoteMtimes[file] = mtime; + } + } + + const manifest: SyncManifest = { + realmUrl: this.normalizedRealmUrl, + files, + }; + if (Object.keys(remoteMtimes).length > 0) { + manifest.remoteMtimes = remoteMtimes; + } + await saveManifest(this.options.localDir, manifest); + } +} + +export interface WatchRealmsOptions { + intervalMs?: number; + debounceMs?: number; + quiet?: boolean; + profileManager?: ProfileManager; + /** Pre-resolved realm secret seed (resolve via `resolveRealmSecretSeed` first). */ + realmSecretSeed?: string; + /** @internal Test hook: supply an already-constructed authenticator. */ + authenticator?: RealmAuthenticator; + /** Stops the watch loop when aborted. SIGINT/SIGTERM are wired up when omitted. */ + signal?: AbortSignal; +} + +export interface WatchRealmsResult { + watchers: RealmWatcher[]; + error?: string; +} + +/** + * Programmatic entry point. Returns when the abort signal fires (or the + * process receives SIGINT/SIGTERM when no signal is supplied). The CLI + * passes a single spec; the array shape exists for programmatic / test + * use. The authenticator is resolved once (from `specs[0].realmUrl`) and + * shared across all specs — multi-realm callers must use realms that + * share a profile / secret seed. + */ +export async function watchRealms( + specs: WatchRealmSpec[], + options: WatchRealmsOptions = {}, +): Promise { + if (specs.length === 0) { + return { watchers: [], error: 'No realms provided to watch.' }; + } + + const intervalMs = options.intervalMs ?? 30_000; + const debounceMs = options.debounceMs ?? 5_000; + const quiet = options.quiet ?? false; + + if (!Number.isFinite(intervalMs) || intervalMs <= 0) { + return { watchers: [], error: '`intervalMs` must be a positive number.' }; + } + if (!Number.isFinite(debounceMs) || debounceMs < 0) { + return { + watchers: [], + error: '`debounceMs` must be a non-negative number.', + }; + } + + let authenticator: RealmAuthenticator; + if (options.authenticator) { + authenticator = options.authenticator; + } else { + const resolution = resolveRealmAuthenticator({ + realmUrl: specs[0].realmUrl, + realmSecretSeed: options.realmSecretSeed, + profileManager: options.profileManager, + }); + if (!resolution.ok) { + return { watchers: [], error: resolution.error }; + } + authenticator = resolution.authenticator; + } + + // Acquire one lock per spec.localDir before initializing any watcher, so a + // failure rolls back all earlier locks rather than leaving them dangling. + const lockedDirs: string[] = []; + for (const spec of specs) { + const result = await acquireWatchLock(spec.localDir, spec.realmUrl); + if (!result.ok) { + for (const dir of lockedDirs) await releaseWatchLock(dir); + return { + watchers: [], + error: formatLockedError(spec.localDir, result.existing), + }; + } + if (result.staleOverwrote && !quiet) { + console.log( + `${DIM}[${timestamp()}] overwrote stale lock at ${spec.localDir}${RESET}`, + ); + } + lockedDirs.push(spec.localDir); + } + + const watchers: RealmWatcher[] = []; + for (const spec of specs) { + const watcher = new RealmWatcher(spec, authenticator, { + debounceMs, + }); + try { + await watcher.initialize(); + } catch (err) { + for (const w of watchers) w.shutdown(); + for (const dir of lockedDirs) await releaseWatchLock(dir); + return { + watchers: [], + error: `Failed to initialize watch on ${spec.realmUrl}: ${ + err instanceof Error ? err.message : String(err) + }`, + }; + } + watchers.push(watcher); + } + + if (!quiet) { + console.log( + `${FG_CYAN}\u21c5 Watching ${watchers.length} realm${watchers.length > 1 ? 's' : ''}:${RESET}`, + ); + for (const w of watchers) { + console.log(` ${w.name} ${DIM}\u2192${RESET} ${w.localDir}`); + } + console.log( + ` ${DIM}Interval: ${intervalMs / 1000}s, Debounce: ${debounceMs / 1000}s${RESET}`, + ); + console.log(` ${DIM}Press Ctrl+C to stop${RESET}\n`); + } + + const tickAll = async () => { + await Promise.all( + watchers.map(async (w) => { + try { + const hasNew = await w.poll(); + if (hasNew) { + if (!quiet) { + console.log( + `${DIM}[${timestamp()}]${RESET} [${w.name}] ${FG_YELLOW}\u26a1 ${w.pendingCount} change(s) detected${RESET}`, + ); + } + w.scheduleFlush((result) => { + if (!quiet) logFlush(w.name, result); + }); + } + } catch (err) { + console.error( + `${FG_RED}[${w.name}] poll error:${RESET}`, + err instanceof Error ? err.message : err, + ); + } + }), + ); + }; + + // Self-scheduling tick: the next setTimeout is only armed after the + // current tickAll resolves, so two polls can never overlap. + let stopped = false; + let timeoutId: NodeJS.Timeout | null = null; + const scheduleNextTick = () => { + if (stopped) return; + timeoutId = setTimeout(async () => { + timeoutId = null; + if (stopped) return; + await tickAll(); + scheduleNextTick(); + }, intervalMs); + }; + + await tickAll(); + scheduleNextTick(); + + await new Promise((resolve) => { + let sigintHandler: (() => void) | null = null; + let sigtermHandler: (() => void) | null = null; + + const cleanup = async () => { + if (stopped) return; + stopped = true; + if (timeoutId !== null) { + clearTimeout(timeoutId); + timeoutId = null; + } + for (const w of watchers) w.shutdown(); + if (sigintHandler) process.off('SIGINT', sigintHandler); + if (sigtermHandler) process.off('SIGTERM', sigtermHandler); + for (const dir of lockedDirs) { + try { + await releaseWatchLock(dir); + } catch { + // Best effort \u2014 a leftover lock will be detected as stale next run. + } + } + resolve(); + }; + + if (options.signal) { + if (options.signal.aborted) { + void cleanup(); + return; + } + options.signal.addEventListener('abort', () => void cleanup(), { + once: true, + }); + } else { + sigintHandler = () => { + if (!quiet) console.log(`\n${FG_CYAN}\u21c5 Watch stopped${RESET}`); + void cleanup(); + }; + sigtermHandler = sigintHandler; + process.on('SIGINT', sigintHandler); + process.on('SIGTERM', sigtermHandler); + } + }); + + return { watchers }; +} + +function formatLockedError(localDir: string, info: WatchLockInfo): string { + return ( + `A boxel realm watch process is already active for ${localDir} ` + + `(pid ${info.pid}, started ${info.startedAt}). Stop it before starting ` + + `a new one, or remove ${path.join(localDir, '.boxel-watch.lock')} if it's stale.` + ); +} + +function logFlush(name: string, result: FlushResult): void { + const total = result.pulled.length + result.deleted.length; + if (total === 0) return; + console.log( + `${DIM}[${timestamp()}]${RESET} [${name}] ${FG_GREEN}applied ${total} change(s)${RESET} (${result.pulled.length} pulled, ${result.deleted.length} deleted)`, + ); + if (result.checkpoint) { + const tag = result.checkpoint.isMajor ? '[MAJOR]' : '[minor]'; + console.log( + ` ${DIM}Checkpoint:${RESET} ${result.checkpoint.shortHash} ${tag} ${result.checkpoint.message}`, + ); + } +} + +function deriveRealmName(normalizedUrl: string): string { + const parts = normalizedUrl.replace(/\/$/, '').split('/'); + return parts.slice(-2).join('/'); +} + +function timestamp(): string { + return new Date().toLocaleTimeString(); +} + +function parsePositiveSeconds(name: string): (value: string) => number { + return (value: string) => { + const n = Number.parseFloat(value); + if (!Number.isFinite(n) || n <= 0) { + throw new InvalidArgumentError(`${name} must be a positive number.`); + } + return n; + }; +} + +function parseNonNegativeSeconds(name: string): (value: string) => number { + return (value: string) => { + const n = Number.parseFloat(value); + if (!Number.isFinite(n) || n < 0) { + throw new InvalidArgumentError(`${name} must be a non-negative number.`); + } + return n; + }; +} + +export function registerWatchCommand(realm: Command): void { + realm + .command('watch') + .description( + 'Watch a Boxel realm for server-side changes and pull them into a local directory', + ) + .argument( + '', + 'The URL of the realm to watch (e.g., https://app.boxel.ai/demo/)', + ) + .argument('', 'The local directory to write changes into') + .option( + '-i, --interval ', + 'Polling interval in seconds', + parsePositiveSeconds('--interval'), + 30, + ) + .option( + '-d, --debounce ', + 'Seconds to wait after a burst of changes before applying them', + parseNonNegativeSeconds('--debounce'), + 5, + ) + .option( + '--realm-secret-seed', + 'Administrative auth: prompt for a realm secret seed and mint a JWT locally instead of using a Matrix profile (env: BOXEL_REALM_SECRET_SEED)', + ) + .action( + async ( + realmUrl: string, + localDir: string, + options: { + interval: number; + debounce: number; + realmSecretSeed?: boolean; + }, + ) => { + const realmSecretSeed = await resolveRealmSecretSeed( + options.realmSecretSeed === true, + ); + const result = await watchRealms([{ realmUrl, localDir }], { + intervalMs: options.interval * 1000, + debounceMs: options.debounce * 1000, + realmSecretSeed, + }); + if (result.error) { + console.error(`${FG_RED}Error:${RESET} ${result.error}`); + process.exit(1); + } + }, + ); +} diff --git a/packages/boxel-cli/src/lib/colors.ts b/packages/boxel-cli/src/lib/colors.ts index c775d12227..1f2ff3e901 100644 --- a/packages/boxel-cli/src/lib/colors.ts +++ b/packages/boxel-cli/src/lib/colors.ts @@ -1,9 +1,14 @@ -// ANSI color codes -export const FG_GREEN = '\x1b[32m'; -export const FG_YELLOW = '\x1b[33m'; -export const FG_CYAN = '\x1b[36m'; -export const FG_MAGENTA = '\x1b[35m'; -export const FG_RED = '\x1b[31m'; -export const DIM = '\x1b[2m'; -export const BOLD = '\x1b[1m'; -export const RESET = '\x1b[0m'; +// Disable ANSI escapes when stdout isn't a TTY (piped to file/pager) or +// NO_COLOR is set — see https://no-color.org. Evaluated at module load. +const COLOR_ENABLED = process.stdout.isTTY === true && !process.env.NO_COLOR; + +const c = (code: string): string => (COLOR_ENABLED ? code : ''); + +export const FG_GREEN = c('\x1b[32m'); +export const FG_YELLOW = c('\x1b[33m'); +export const FG_CYAN = c('\x1b[36m'); +export const FG_MAGENTA = c('\x1b[35m'); +export const FG_RED = c('\x1b[31m'); +export const DIM = c('\x1b[2m'); +export const BOLD = c('\x1b[1m'); +export const RESET = c('\x1b[0m'); diff --git a/packages/boxel-cli/src/lib/watch-lock.ts b/packages/boxel-cli/src/lib/watch-lock.ts new file mode 100644 index 0000000000..d29b6ade17 --- /dev/null +++ b/packages/boxel-cli/src/lib/watch-lock.ts @@ -0,0 +1,81 @@ +import * as fs from 'fs/promises'; +import * as path from 'path'; + +const LOCK_FILE = '.boxel-watch.lock'; + +export interface WatchLockInfo { + pid: number; + startedAt: string; + realmUrl: string; +} + +export type WatchLockResult = + | { ok: true; staleOverwrote: boolean } + | { ok: false; existing: WatchLockInfo }; + +function lockPath(localDir: string): string { + return path.join(localDir, LOCK_FILE); +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch (err: any) { + // EPERM means the process exists but we can't signal it — still alive. + return err?.code === 'EPERM'; + } +} + +async function readLock(localDir: string): Promise { + try { + const raw = await fs.readFile(lockPath(localDir), 'utf8'); + const parsed = JSON.parse(raw) as Partial; + if ( + typeof parsed.pid !== 'number' || + typeof parsed.startedAt !== 'string' || + typeof parsed.realmUrl !== 'string' + ) { + return null; + } + return parsed as WatchLockInfo; + } catch { + return null; + } +} + +export async function acquireWatchLock( + localDir: string, + realmUrl: string, +): Promise { + await fs.mkdir(localDir, { recursive: true }); + const existing = await readLock(localDir); + let staleOverwrote = false; + if (existing && isProcessAlive(existing.pid)) { + return { ok: false, existing }; + } + if (existing) { + staleOverwrote = true; + } + const info: WatchLockInfo = { + pid: process.pid, + startedAt: new Date().toISOString(), + realmUrl, + }; + await fs.writeFile(lockPath(localDir), JSON.stringify(info, null, 2) + '\n'); + return { ok: true, staleOverwrote }; +} + +export async function releaseWatchLock(localDir: string): Promise { + try { + await fs.unlink(lockPath(localDir)); + } catch (err: any) { + if (err?.code !== 'ENOENT') throw err; + } +} + +export async function readWatchLock( + localDir: string, +): Promise { + return readLock(localDir); +} diff --git a/packages/boxel-cli/tests/integration/realm-watch.test.ts b/packages/boxel-cli/tests/integration/realm-watch.test.ts new file mode 100644 index 0000000000..50d2c0fb95 --- /dev/null +++ b/packages/boxel-cli/tests/integration/realm-watch.test.ts @@ -0,0 +1,543 @@ +import '../helpers/setup-realm-server'; +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { RealmWatcher, watchRealms } from '../../src/commands/realm/watch'; +import { CheckpointManager } from '../../src/lib/checkpoint-manager'; +import { ProfileManager } from '../../src/lib/profile-manager'; +import type { RealmAuthenticator } from '../../src/lib/realm-authenticator'; +import { + startTestRealmServer, + stopTestRealmServer, + createTestProfileDir, + setupTestProfile, + TEST_REALM_SERVER_URL, +} from '../helpers/integration'; + +let profileManager: ProfileManager; +let cleanupProfile: () => void; +let realmUrl: string; +const localDirs: string[] = []; + +function makeLocalDir(): string { + let dir = fs.mkdtempSync(path.join(os.tmpdir(), 'boxel-watch-int-')); + localDirs.push(dir); + return dir; +} + +function buildFileUrl(realm: string, relPath: string): string { + let base = realm.endsWith('/') ? realm : `${realm}/`; + return `${base}${relPath.replace(/^\/+/, '')}`; +} + +async function writeRemoteFile( + realm: string, + relPath: string, + content: string, +): Promise { + let response = await profileManager.authedRealmFetch( + buildFileUrl(realm, relPath), + { + method: 'POST', + headers: { + 'Content-Type': 'text/plain;charset=UTF-8', + Accept: 'application/vnd.card+source', + }, + body: content, + }, + ); + if (!response.ok) { + throw new Error( + `writeRemoteFile ${relPath} failed: ${response.status} ${response.statusText}`, + ); + } +} + +async function deleteRemoteFile(realm: string, relPath: string): Promise { + let response = await profileManager.authedRealmFetch( + buildFileUrl(realm, relPath), + { + method: 'DELETE', + headers: { Accept: 'application/vnd.card+source' }, + }, + ); + if (!response.ok && response.status !== 404) { + throw new Error( + `deleteRemoteFile ${relPath} failed: ${response.status} ${response.statusText}`, + ); + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +beforeAll(async () => { + // Realm starts empty; tests seed remote files via authedRealmFetch so they + // produce realistic mtimes that change between writes. + await startTestRealmServer(); + realmUrl = `${TEST_REALM_SERVER_URL}/test/`; + + let testProfile = createTestProfileDir(); + profileManager = testProfile.profileManager; + cleanupProfile = testProfile.cleanup; + await setupTestProfile(profileManager); +}); + +afterAll(async () => { + for (let dir of localDirs) { + fs.rmSync(dir, { recursive: true, force: true }); + } + cleanupProfile?.(); + await stopTestRealmServer(); +}); + +describe('realm watch (integration)', () => { + it('treats remote files as added on the first poll and pulls them', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'first-poll.gts', 'export const a = 1;\n'); + + let watcher = new RealmWatcher({ realmUrl, localDir }, profileManager, { + debounceMs: 0, + quiet: true, + }); + await watcher.initialize(); + + let hasChanges = await watcher.poll(); + expect(hasChanges).toBe(true); + expect(watcher.pendingCount).toBeGreaterThanOrEqual(1); + + let result = await watcher.flushPending(); + expect(result.pulled).toContain('first-poll.gts'); + expect( + fs.readFileSync(path.join(localDir, 'first-poll.gts'), 'utf8'), + ).toContain('a = 1'); + + expect(result.checkpoint).not.toBeNull(); + expect(result.checkpoint!.source).toBe('remote'); + + let checkpoints = await new CheckpointManager(localDir).getCheckpoints(); + expect(checkpoints.length).toBe(1); + + watcher.shutdown(); + await deleteRemoteFile(realmUrl, 'first-poll.gts'); + }); + + it('detects remote modifications across ticks and pulls them', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'mod.gts', 'export const v = 1;\n'); + + let watcher = new RealmWatcher({ realmUrl, localDir }, profileManager, { + debounceMs: 0, + quiet: true, + }); + await watcher.initialize(); + await watcher.poll(); + await watcher.flushPending(); + + expect(fs.readFileSync(path.join(localDir, 'mod.gts'), 'utf8')).toContain( + 'v = 1', + ); + + // Realm mtimes are second-precision — wait so the next write bumps it. + await sleep(1100); + await writeRemoteFile(realmUrl, 'mod.gts', 'export const v = 2;\n'); + + let hasChanges = await watcher.poll(); + expect(hasChanges).toBe(true); + let result = await watcher.flushPending(); + expect(result.pulled).toContain('mod.gts'); + expect(fs.readFileSync(path.join(localDir, 'mod.gts'), 'utf8')).toContain( + 'v = 2', + ); + + let checkpoints = await new CheckpointManager(localDir).getCheckpoints(); + // One per applied poll. + expect(checkpoints.length).toBe(2); + + watcher.shutdown(); + await deleteRemoteFile(realmUrl, 'mod.gts'); + }); + + it('detects remote deletions and removes the local copy', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'doomed.gts', 'export const x = 1;\n'); + + let watcher = new RealmWatcher({ realmUrl, localDir }, profileManager, { + debounceMs: 0, + quiet: true, + }); + await watcher.initialize(); + await watcher.poll(); + await watcher.flushPending(); + expect(fs.existsSync(path.join(localDir, 'doomed.gts'))).toBe(true); + + await deleteRemoteFile(realmUrl, 'doomed.gts'); + + let hasChanges = await watcher.poll(); + expect(hasChanges).toBe(true); + let result = await watcher.flushPending(); + expect(result.deleted).toContain('doomed.gts'); + expect(fs.existsSync(path.join(localDir, 'doomed.gts'))).toBe(false); + + watcher.shutdown(); + }); + + it('groups bursts of remote changes into a single debounced flush', async () => { + let localDir = makeLocalDir(); + + let watcher = new RealmWatcher({ realmUrl, localDir }, profileManager, { + debounceMs: 75, + quiet: true, + }); + await watcher.initialize(); + + let flushes: Array<{ pulled: string[]; deleted: string[] }> = []; + let flushSettled = new Promise((resolve) => { + // Trigger two polls in quick succession; debounce should coalesce. + let onFlush = (result: { pulled: string[]; deleted: string[] }) => { + flushes.push(result); + resolve(); + }; + + (async () => { + await writeRemoteFile(realmUrl, 'burst-a.gts', 'export const a = 1;\n'); + await watcher.poll(); + watcher.scheduleFlush(onFlush); + + await writeRemoteFile(realmUrl, 'burst-b.gts', 'export const b = 2;\n'); + await watcher.poll(); + // Reset the timer — second call within debounceMs. + watcher.scheduleFlush(onFlush); + })(); + }); + + await flushSettled; + // Allow a brief grace period in case a stray timer slipped through. + await sleep(40); + + expect(flushes.length).toBe(1); + expect(flushes[0].pulled.sort()).toEqual(['burst-a.gts', 'burst-b.gts']); + + watcher.shutdown(); + await deleteRemoteFile(realmUrl, 'burst-a.gts'); + await deleteRemoteFile(realmUrl, 'burst-b.gts'); + }); + + it('runs the watchRealms loop end-to-end and stops on AbortSignal', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'loop.gts', 'export const loop = 1;\n'); + + let controller = new AbortController(); + let runPromise = watchRealms([{ realmUrl, localDir }], { + profileManager, + intervalMs: 50, + debounceMs: 25, + quiet: true, + signal: controller.signal, + }); + + // Wait long enough for the initial tick + debounced flush + at least one + // re-poll, then trigger shutdown. + await sleep(400); + controller.abort(); + let result = await runPromise; + + expect(result.error).toBeUndefined(); + expect(result.watchers.length).toBe(1); + expect(fs.readFileSync(path.join(localDir, 'loop.gts'), 'utf8')).toContain( + 'loop = 1', + ); + + let checkpoints = await new CheckpointManager(localDir).getCheckpoints(); + expect(checkpoints.length).toBeGreaterThanOrEqual(1); + expect(checkpoints[0].source).toBe('remote'); + + await deleteRemoteFile(realmUrl, 'loop.gts'); + }); + + it('returns an error when the realm URL is unreachable', async () => { + let localDir = makeLocalDir(); + let result = await watchRealms( + [{ realmUrl: 'http://127.0.0.1:1/nope/', localDir }], + { profileManager, intervalMs: 50, debounceMs: 0, quiet: true }, + ); + expect(result.error).toBeDefined(); + expect(result.watchers).toEqual([]); + }); + + it('returns an error when no active profile is configured', async () => { + let localDir = makeLocalDir(); + let emptyDir = fs.mkdtempSync(path.join(os.tmpdir(), 'boxel-watch-empty-')); + let emptyManager = new ProfileManager(emptyDir); + try { + let result = await watchRealms([{ realmUrl, localDir }], { + profileManager: emptyManager, + intervalMs: 50, + debounceMs: 0, + quiet: true, + }); + expect(result.error).toContain('No active profile'); + expect(result.watchers).toEqual([]); + } finally { + fs.rmSync(emptyDir, { recursive: true, force: true }); + } + }); + + it('rejects an empty list of realms', async () => { + let result = await watchRealms([], { + profileManager, + quiet: true, + }); + expect(result.error).toContain('No realms'); + }); + + it('does not delete local files when a poll fails', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'survives.gts', 'export const s = 1;\n'); + + let watcher = new RealmWatcher({ realmUrl, localDir }, profileManager, { + debounceMs: 0, + quiet: true, + }); + await watcher.initialize(); + await watcher.poll(); + await watcher.flushPending(); + expect(fs.existsSync(path.join(localDir, 'survives.gts'))).toBe(true); + + // Force the next poll to fail (simulating a transient fetch error). The + // file must remain on disk and `lastKnownMtimes` must be untouched, so a + // subsequent successful poll observes no change. + (watcher as any).getRemoteMtimes = async () => { + throw new Error('simulated network failure'); + }; + await expect(watcher.poll()).rejects.toThrow('simulated network failure'); + expect(watcher.pendingCount).toBe(0); + expect(fs.existsSync(path.join(localDir, 'survives.gts'))).toBe(true); + + watcher.shutdown(); + await deleteRemoteFile(realmUrl, 'survives.gts'); + }); + + it('blocks a second concurrent watch against the same localDir', async () => { + let localDir = makeLocalDir(); + + let firstController = new AbortController(); + let firstRun = watchRealms([{ realmUrl, localDir }], { + profileManager, + intervalMs: 1000, + debounceMs: 25, + quiet: true, + signal: firstController.signal, + }); + + // Wait for the first run to acquire the lock. + await sleep(150); + + let lockPath = path.join(localDir, '.boxel-watch.lock'); + expect(fs.existsSync(lockPath)).toBe(true); + + let secondResult = await watchRealms([{ realmUrl, localDir }], { + profileManager, + intervalMs: 1000, + debounceMs: 25, + quiet: true, + }); + expect(secondResult.error).toBeDefined(); + expect(secondResult.error).toContain(`pid ${process.pid}`); + expect(secondResult.watchers).toEqual([]); + + firstController.abort(); + await firstRun; + expect(fs.existsSync(lockPath)).toBe(false); + }); + + it('overwrites a stale lock left by a process that no longer exists', async () => { + let localDir = makeLocalDir(); + let lockPath = path.join(localDir, '.boxel-watch.lock'); + fs.writeFileSync( + lockPath, + JSON.stringify({ + pid: 999_999_999, + startedAt: '2020-01-01T00:00:00.000Z', + realmUrl, + }), + ); + + let controller = new AbortController(); + let run = watchRealms([{ realmUrl, localDir }], { + profileManager, + intervalMs: 1000, + debounceMs: 25, + quiet: true, + signal: controller.signal, + }); + + await sleep(150); + let parsed = JSON.parse(fs.readFileSync(lockPath, 'utf8')); + expect(parsed.pid).toBe(process.pid); + + controller.abort(); + let result = await run; + expect(result.error).toBeUndefined(); + expect(fs.existsSync(lockPath)).toBe(false); + }); + + it('does not arm a debounceTimer after shutdown when a poll resolves post-cleanup', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'race.gts', 'export const r = 1;\n'); + + // Pre-sync so the watch starts in a steady state — initialize() and the + // initial tickAll find no new changes and no flush is scheduled. + let primer = new RealmWatcher({ realmUrl, localDir }, profileManager, { + debounceMs: 0, + quiet: true, + }); + await primer.initialize(); + await primer.poll(); + await primer.flushPending(); + primer.shutdown(); + + let baseline = await new CheckpointManager(localDir).getCheckpoints(); + expect(baseline.length).toBe(1); + + // Bump the remote so the next-tick poll WOULD detect a change and call + // scheduleFlush() — which is the path the fix gates. + await sleep(1100); // mtime is second-precision + await writeRemoteFile(realmUrl, 'race.gts', 'export const r = 2;\n'); + + // Gate the 3rd _mtimes call (1: initialize, 2: initial tickAll's poll, + // 3: the next scheduled tick — the one we want in flight at abort time). + let mtimesCallCount = 0; + let releaseGate: () => void = () => {}; + let gateOpened: () => void = () => {}; + let gate = new Promise((resolve) => { + releaseGate = resolve; + }); + let entered = new Promise((resolve) => { + gateOpened = resolve; + }); + let gatedAuth: RealmAuthenticator = { + authedRealmFetch: async (input, init) => { + let url = typeof input === 'string' ? input : input.toString(); + if (url.endsWith('_mtimes')) { + mtimesCallCount++; + if (mtimesCallCount === 3) { + gateOpened(); + await gate; + } + } + return profileManager.authedRealmFetch(input, init); + }, + }; + + let controller = new AbortController(); + let runPromise = watchRealms([{ realmUrl, localDir }], { + authenticator: gatedAuth, + intervalMs: 50, + debounceMs: 100, + quiet: true, + signal: controller.signal, + }); + + // Wait for the next tick to enter the gated _mtimes call. + await entered; + + // Abort while the in-flight poll is blocked. cleanup() runs synchronously + // through stopped=true → w.shutdown() before awaiting releaseWatchLock. + controller.abort(); + let result = await runPromise; + expect(result.error).toBeUndefined(); + + // Now release the poll. Without the fix, its continuation calls + // scheduleFlush() on the (already shut down) watcher and arms a fresh + // debounceTimer that fires post-cleanup, writing files and a new + // checkpoint after watchRealms() has returned. + releaseGate(); + + // Wait long enough for any (buggy) debounceTimer to fire (debounceMs=100). + await sleep(300); + + let checkpoints = await new CheckpointManager(localDir).getCheckpoints(); + expect(checkpoints.length).toBe(baseline.length); + // The bumped remote content was NOT pulled — the gated scheduleFlush is a + // no-op after shutdown, so the local file stays at r = 1. + expect(fs.readFileSync(path.join(localDir, 'race.gts'), 'utf8')).toContain( + 'r = 1', + ); + expect(fs.existsSync(path.join(localDir, '.boxel-watch.lock'))).toBe(false); + + await deleteRemoteFile(realmUrl, 'race.gts'); + }); + + it('removes SIGINT/SIGTERM handlers when the watch is stopped via signal', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'sigint.gts', 'export const x = 1;\n'); + + // Snapshot pre-existing listeners so we don't conflate with vitest's own + // signal handling — just check that watchRealms registers exactly one + // and removes it on cleanup. + let originalSigint = [...process.listeners('SIGINT')]; + let originalSigterm = [...process.listeners('SIGTERM')]; + + // No `signal` supplied → the SIGINT/SIGTERM branch in watchRealms runs. + let runPromise = watchRealms([{ realmUrl, localDir }], { + profileManager, + intervalMs: 1000, + debounceMs: 25, + quiet: true, + }); + + await sleep(150); + + let addedSigint = process + .listeners('SIGINT') + .filter((l) => !originalSigint.includes(l)); + let addedSigterm = process + .listeners('SIGTERM') + .filter((l) => !originalSigterm.includes(l)); + expect(addedSigint).toHaveLength(1); + expect(addedSigterm).toHaveLength(1); + + // Invoke the registered handler directly instead of process.emit('SIGINT'), + // which would also trigger any unrelated SIGINT listeners on the runner. + (addedSigint[0] as () => void)(); + + let result = await runPromise; + expect(result.error).toBeUndefined(); + expect(process.listeners('SIGINT')).toEqual(originalSigint); + expect(process.listeners('SIGTERM')).toEqual(originalSigterm); + expect(fs.existsSync(path.join(localDir, '.boxel-watch.lock'))).toBe(false); + + await deleteRemoteFile(realmUrl, 'sigint.gts'); + }); + + it('downgrades a pending modify to a delete when the remote file disappears', async () => { + let localDir = makeLocalDir(); + await writeRemoteFile(realmUrl, 'flip.gts', 'export const x = 1;\n'); + + let watcher = new RealmWatcher({ realmUrl, localDir }, profileManager, { + debounceMs: 0, + quiet: true, + }); + await watcher.initialize(); + await watcher.poll(); + await watcher.flushPending(); + + await sleep(1100); + await writeRemoteFile(realmUrl, 'flip.gts', 'export const x = 2;\n'); + await watcher.poll(); + expect(watcher.pendingCount).toBe(1); + + await deleteRemoteFile(realmUrl, 'flip.gts'); + await watcher.poll(); + + let result = await watcher.flushPending(); + expect(result.deleted).toContain('flip.gts'); + expect(result.pulled).not.toContain('flip.gts'); + expect(fs.existsSync(path.join(localDir, 'flip.gts'))).toBe(false); + + watcher.shutdown(); + }); +});