diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 000000000..9d8d780ba --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,18 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Attach to serve (tsx)", + "type": "node", + "request": "attach", + "port": 9229, + "restart": true, + "skipFiles": [ + "/**" + ] + } + ] +} \ No newline at end of file diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 09fa9f988..fdb134e33 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -397,6 +397,7 @@ export interface components { data: { [key: string]: unknown; }; + recordDeleted?: boolean; /** * Format: date-time * @description ISO 8601 timestamp when the record was emitted by the source. diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index fc749bb69..187abed5a 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1220,6 +1220,9 @@ "additionalProperties": {}, "description": "The record payload as a key-value map." }, + "recordDeleted": { + "type": "boolean" + }, "emitted_at": { "type": "string", "format": "date-time", diff --git a/apps/engine/src/lib/pipeline.test.ts b/apps/engine/src/lib/pipeline.test.ts index f0e6953d0..451d3ce2a 100644 --- a/apps/engine/src/lib/pipeline.test.ts +++ b/apps/engine/src/lib/pipeline.test.ts @@ -156,6 +156,52 @@ describe('enforceCatalog()', () => { }) }) + it('strips the `deleted` field from data even when no json_schema is configured', async () => { + const msgs: Message[] = [ + { + type: 'record', + record: { + stream: 'customers', + recordDeleted: true, + data: { id: 'cus_1', name: 'Alice', deleted: true }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + }, + ] + const result = await drain(enforceCatalog(catalog([{ name: 'customers' }]))(toAsync(msgs))) + expect(result).toHaveLength(1) + expect((result[0] as any).record.data).toEqual({ id: 'cus_1', name: 'Alice' }) + expect((result[0] as any).record.recordDeleted).toBe(true) + }) + + it('strips the `deleted` field from data even when json_schema declares it', async () => { + const msgs: Message[] = [ + { + type: 'record', + record: { + stream: 'customers', + data: { id: 'cus_1', deleted: false }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + }, + ] + const result = await drain( + enforceCatalog( + catalog([ + { + name: 'customers', + json_schema: { + type: 'object', + properties: { id: { type: 'string' }, deleted: { type: 'boolean' } }, + }, + }, + ]) + )(toAsync(msgs)) + ) + expect(result).toHaveLength(1) + expect((result[0] as any).record.data).toEqual({ id: 'cus_1' }) + }) + it('drops record with unknown stream and logs error', async () => { const msgs: Message[] = [ { diff --git a/apps/engine/src/lib/pipeline.ts b/apps/engine/src/lib/pipeline.ts index 26a6c1330..de0ca7efb 100644 --- a/apps/engine/src/lib/pipeline.ts +++ b/apps/engine/src/lib/pipeline.ts @@ -9,6 +9,9 @@ import { log } from '../logger.js' // MARK: - enforceCatalog +/** Fields that destinations never persist; tombstoning is signalled via `recordDeleted`. */ +const STRIPPED_RECORD_FIELDS = new Set(['deleted']) + /** * Drop messages for streams not in the catalog and apply per-stream field filtering. * Passes non-data messages (log, trace, catalog) through unchanged. @@ -26,19 +29,13 @@ export function enforceCatalog( continue } const props = cs.stream.json_schema?.properties as Record | undefined - if (props) { - yield { - ...msg, - record: { - ...msg.record, - data: Object.fromEntries( - Object.entries(msg.record.data).filter(([key]) => key in props) - ), - }, - } - } else { - yield msg + const filtered: Record = {} + for (const [key, value] of Object.entries(msg.record.data)) { + if (STRIPPED_RECORD_FIELDS.has(key)) continue + if (props && !(key in props)) continue + filtered[key] = value } + yield { ...msg, record: { ...msg.record, data: filtered } } } else if (msg.type === 'source_state') { if (msg.source_state.state_type === 'global') { yield msg // global state needs no catalog validation diff --git a/e2e/stripe-delete.test.ts b/e2e/stripe-delete.test.ts new file mode 100644 index 000000000..274774089 --- /dev/null +++ b/e2e/stripe-delete.test.ts @@ -0,0 +1,335 @@ +/** + * Verifies that a `customer.deleted` Stripe event tombstones the row in both + * the Postgres and Google Sheets destinations. + * + * Each suite creates a Stripe customer, lets the engine sync it via WebSocket, + * deletes it via the Stripe API, and waits for the destination to reflect the + * deletion. Sheets is skipped when GOOGLE_* env vars are missing. + */ +import pg from 'pg' +import Stripe from 'stripe' +import { google } from 'googleapis' +import { afterAll, beforeAll, expect, it } from 'vitest' +import source from '@stripe/sync-source-stripe' +import destinationPostgres from '@stripe/sync-destination-postgres' +import destinationSheets, { readSheet } from '@stripe/sync-destination-google-sheets' +import { createEngine } from '@stripe/sync-engine' +import type { ConnectorResolver } from '@stripe/sync-engine' +import type { DestinationOutput } from '@stripe/sync-protocol' +import { drain } from '@stripe/sync-protocol' +import { describeWithEnv } from './test-helpers.js' + +const POSTGRES_URL = + process.env.POSTGRES_URL ?? 'postgresql://postgres:postgres@localhost:5432/postgres' +const ts = new Date() + .toISOString() + .replace(/[-:T.Z]/g, '') + .slice(0, 15) +const STREAM = 'customers' +const BACKFILL_LIMIT = 5 + +// MARK: - Helpers + +/** Drain a pipeline iterator in the background until it finishes or `stop()` is called. */ +function backgroundDrain(iter: AsyncIterator): { + done: Promise + stop: () => Promise +} { + let stopped = false + const done = (async () => { + while (!stopped) { + const { done: iterDone } = await iter.next() + if (iterDone) return + } + })() + const stop = async () => { + stopped = true + await Promise.race([ + iter.return?.(undefined as never) ?? Promise.resolve(), + new Promise((r) => setTimeout(r, 5_000)), + ]) + await Promise.race([done, new Promise((r) => setTimeout(r, 5_000))]) + } + return { done, stop } +} + +async function pollUntil( + fn: () => Promise, + timeoutMs: number, + intervalMs: number +): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + if (await fn()) return true + await new Promise((r) => setTimeout(r, intervalMs)) + } + return false +} + +/** + * Mimic the service's `simulate_webhook_sync` endpoint: fetch events from + * Stripe and pipe them as push-mode input to `pipeline_sync`. The source + * skips backfill/websocket entirely when `$stdin` is provided. + */ +async function replayStripeEvents( + engine: Awaited>, + pipelineFactory: () => Parameters[0], + stripe: Stripe, + createdAfter: number +): Promise { + const events: unknown[] = [] + let startingAfter: string | undefined + for (;;) { + const page = await stripe.events.list({ + created: { gt: createdAfter }, + limit: 100, + ...(startingAfter ? { starting_after: startingAfter } : {}), + }) + events.push(...page.data) + if (!page.has_more) break + startingAfter = page.data.at(-1)!.id + } + events.reverse() + + const input = (async function* () { + for (const e of events) yield e + })() + + // drain — finite input, finite output + for await (const msg of engine.pipeline_sync(pipelineFactory(), {}, input)) { + void msg + } +} + +// MARK: - Postgres + +describeWithEnv('stripe customer.deleted → postgres', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY }) => { + const SCHEMA = `e2e_del_pg_${ts}` + let pool: pg.Pool + let stripe: Stripe + + const resolver: ConnectorResolver = { + resolveSource: async (name) => { + if (name !== 'stripe') throw new Error(`Unknown source: ${name}`) + return source + }, + resolveDestination: async (name) => { + if (name !== 'postgres') throw new Error(`Unknown destination: ${name}`) + return destinationPostgres + }, + sources: () => new Map(), + destinations: () => new Map(), + } + + function makePipeline() { + return { + source: { + type: 'stripe', + stripe: { + api_key: STRIPE_API_KEY, + backfill_limit: BACKFILL_LIMIT, + websocket: true, + }, + }, + destination: { + type: 'postgres', + postgres: { url: POSTGRES_URL, schema: SCHEMA }, + }, + streams: [{ name: STREAM }], + } + } + + beforeAll(async () => { + pool = new pg.Pool({ connectionString: POSTGRES_URL }) + await pool.query('SELECT 1') + await pool.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + stripe = new Stripe(STRIPE_API_KEY) + console.log(`\n Postgres: ${POSTGRES_URL} (schema: ${SCHEMA})`) + }) + + afterAll(async () => { + if (!pool) return + if (!process.env.KEEP_TEST_DATA) { + await pool.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + } + await pool.end() + }) + + it('hard-deletes the row when customer.deleted arrives', async () => { + const engine = await createEngine(resolver) + const pipeline = makePipeline() + await drain(engine.pipeline_setup(pipeline)) + const iter = engine.pipeline_sync(pipeline)[Symbol.asyncIterator]() + const drainer = backgroundDrain(iter) + + let customerId: string | undefined + try { + const customer = await stripe.customers.create({ + name: `e2e-del-pg-${Date.now()}`, + email: `e2e-del-pg-${Date.now()}@test.local`, + }) + customerId = customer.id + + console.log(`Waiting for customer ${customerId} to appear in Postgres...`) + const appeared = await pollUntil( + async () => { + const { rows } = await pool.query(`SELECT 1 FROM "${SCHEMA}"."customers" WHERE id = $1`, [ + customerId, + ]) + return rows.length > 0 + }, + 60_000, + 1_000 + ) + expect(appeared, `customer ${customerId} never appeared in postgres`).toBe(true) + console.log(`Customer ${customerId} appeared in Postgres. Deleting via Stripe API...`) + await stripe.customers.del(customerId) + console.log(`Customer ${customerId} deleted in Stripe.`) + customerId = undefined + + const removed = await pollUntil( + async () => { + const { rows } = await pool.query(`SELECT 1 FROM "${SCHEMA}"."customers" WHERE id = $1`, [ + customer.id, + ]) + return rows.length === 0 + }, + 60_000, + 1_000 + ) + expect(removed, `customer ${customer.id} was never tombstoned in postgres`).toBe(true) + console.log(` Postgres delete verified: ${customer.id}`) + } finally { + await drainer.stop() + if (customerId) { + try { + await stripe.customers.del(customerId) + } catch {} + } + } + }, 180_000) +}) + +// MARK: - Google Sheets + +describeWithEnv( + 'stripe customer.deleted → google sheets', + ['STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN'], + ({ STRIPE_API_KEY, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET, GOOGLE_REFRESH_TOKEN }) => { + let stripe: Stripe + let sheetsClient: ReturnType + let driveClient: ReturnType + let spreadsheetId = process.env.GOOGLE_SPREADSHEET_ID ?? '' + let createdSpreadsheetHere = false + + const resolver: ConnectorResolver = { + resolveSource: async (name) => { + if (name !== 'stripe') throw new Error(`Unknown source: ${name}`) + return source + }, + resolveDestination: async (name) => { + if (name !== 'google_sheets') throw new Error(`Unknown destination: ${name}`) + return destinationSheets + }, + sources: () => new Map(), + destinations: () => new Map(), + } + + function makePipeline() { + return { + source: { + type: 'stripe', + stripe: { api_key: STRIPE_API_KEY }, + }, + destination: { + type: 'google_sheets', + google_sheets: { + client_id: GOOGLE_CLIENT_ID, + client_secret: GOOGLE_CLIENT_SECRET, + refresh_token: GOOGLE_REFRESH_TOKEN, + ...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}), + spreadsheet_title: `e2e-del-sheets-${ts}`, + batch_size: 50, + }, + }, + streams: [{ name: STREAM }], + } + } + + beforeAll(async () => { + stripe = new Stripe(STRIPE_API_KEY) + const auth = new google.auth.OAuth2(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET) + auth.setCredentials({ refresh_token: GOOGLE_REFRESH_TOKEN }) + sheetsClient = google.sheets({ version: 'v4', auth }) + driveClient = google.drive({ version: 'v3', auth }) + }) + + afterAll(async () => { + if (createdSpreadsheetHere && spreadsheetId && !process.env.KEEP_TEST_DATA) { + try { + await driveClient.files.delete({ fileId: spreadsheetId }) + } catch {} + } + }) + + it('removes the row when customer.deleted arrives', async () => { + const engine = await createEngine(resolver) + const replayFrom = Math.floor(Date.now() / 1000) - 5 + + // If GOOGLE_SPREADSHEET_ID is provided, reuse it; otherwise the destination + // creates one and emits the new id via destination_config. + for await (const m of engine.pipeline_setup(makePipeline())) { + if ( + m.type === 'control' && + m.control.control_type === 'destination_config' && + typeof m.control.destination_config.spreadsheet_id === 'string' && + m.control.destination_config.spreadsheet_id !== spreadsheetId + ) { + spreadsheetId = m.control.destination_config.spreadsheet_id + createdSpreadsheetHere = true + } + } + expect(spreadsheetId, 'no spreadsheet_id available (env or destination)').toBeTruthy() + console.log(`\n Sheets: https://docs.google.com/spreadsheets/d/${spreadsheetId}/`) + + let customerId: string | undefined + try { + // create customer; replay events through push-mode sync until the row appears + const customer = await stripe.customers.create({ + name: `e2e-del-sheets-${Date.now()}`, + email: `e2e-del-sheets-${Date.now()}@test.local`, + }) + customerId = customer.id + await new Promise((r) => setTimeout(r, 1500)) + await replayStripeEvents(engine, makePipeline, stripe, replayFrom) + + const rowsAfterCreate = await readSheet(sheetsClient, spreadsheetId, STREAM) + const idIdx = (rowsAfterCreate[0] ?? []).indexOf('id') + expect(idIdx, 'id column missing in sheet header').toBeGreaterThanOrEqual(0) + expect( + rowsAfterCreate.slice(1).some((row) => row[idIdx] === customer.id), + `customer ${customer.id} never appeared in sheet` + ).toBe(true) + + // delete customer; replay again, this time the customer.deleted tombstone removes the row + await stripe.customers.del(customerId) + customerId = undefined + await new Promise((r) => setTimeout(r, 1500)) + await replayStripeEvents(engine, makePipeline, stripe, replayFrom) + + const rowsAfterDelete = await readSheet(sheetsClient, spreadsheetId, STREAM) + expect( + rowsAfterDelete.slice(1).some((row) => row[idIdx] === customer.id), + `customer ${customer.id} was never removed from sheet` + ).toBe(false) + console.log(` Sheets delete verified: ${customer.id}`) + } finally { + if (customerId) { + try { + await stripe.customers.del(customerId) + } catch {} + } + } + }, 120_000) + } +) diff --git a/packages/destination-google-sheets/__tests__/memory-sheets.test.ts b/packages/destination-google-sheets/__tests__/memory-sheets.test.ts index 3bd9ce642..752ca30a5 100644 --- a/packages/destination-google-sheets/__tests__/memory-sheets.test.ts +++ b/packages/destination-google-sheets/__tests__/memory-sheets.test.ts @@ -177,6 +177,35 @@ describe('createMemorySheets', () => { ]) }) + it('pasteData — drops one trailing empty cell like Google Sheets', async () => { + const { sheets, getData } = createMemorySheets() + + const { data } = await sheets.spreadsheets.create({ + requestBody: { properties: { title: 'T' } }, + }) + const id = data.spreadsheetId! + const meta = await sheets.spreadsheets.get({ spreadsheetId: id }) + const sheetId = meta.data.sheets![0].properties!.sheetId! + + await sheets.spreadsheets.batchUpdate({ + spreadsheetId: id, + requestBody: { + requests: [ + { + pasteData: { + coordinate: { sheetId, rowIndex: 0, columnIndex: 0 }, + data: '\x1f\x1f\x1f', + delimiter: '\x1f', + type: 'PASTE_VALUES', + }, + }, + ], + }, + }) + + expect(getData(id, 'Sheet1')).toEqual([['', '', '']]) + }) + it('get — throws on non-existent spreadsheet', async () => { const { sheets } = createMemorySheets() diff --git a/packages/destination-google-sheets/__tests__/memory-sheets.ts b/packages/destination-google-sheets/__tests__/memory-sheets.ts index f02451016..9fb3c4de2 100644 --- a/packages/destination-google-sheets/__tests__/memory-sheets.ts +++ b/packages/destination-google-sheets/__tests__/memory-sheets.ts @@ -273,6 +273,7 @@ export function createMemorySheets() { const rowLines = raw.length === 0 ? [] : raw.split('\n') for (let i = 0; i < rowLines.length; i++) { const cells = rowLines[i].split(delimiter) + if (cells[cells.length - 1] === '') cells.pop() const target: unknown[] = (tab.values[rowIndex + i] ?? []).slice() for (let j = 0; j < cells.length; j++) { target[columnIndex + j] = cells[j] diff --git a/packages/destination-google-sheets/src/index.test.ts b/packages/destination-google-sheets/src/index.test.ts index 060b398ab..a1d1d6aff 100644 --- a/packages/destination-google-sheets/src/index.test.ts +++ b/packages/destination-google-sheets/src/index.test.ts @@ -54,7 +54,12 @@ let nextRecordTs = Math.floor(Date.now() / 1000) function record(stream: string, data: Record): DestinationInput { return { type: 'record', - record: { stream, data: { _updated_at: nextRecordTs++, ...data }, emitted_at: now }, + record: { + stream, + data: { _updated_at: nextRecordTs++, ...data }, + emitted_at: now, + recordDeleted: data.deleted === true, + }, } } diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index 298bbf0ef..ff0a41b9b 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -614,7 +614,9 @@ export function createDestination( } if (deleteRowNumbers.size > 0) { - const blankRow = new Array(headers.length).fill('') + // Google sheets API omits trailing blank cells, so we add + // an extra empty cell. + const blankRow = new Array(headers.length + 1).fill('') const deleteList = [...deleteRowNumbers].sort((a, b) => a - b) // Phase 1 — donate pending appends into deleted slots. If @@ -758,7 +760,7 @@ export function createDestination( for await (const msg of $stdin) { if (msg.type === 'record') { recordCount++ - const { stream, data } = msg.record + const { stream, data, recordDeleted } = msg.record const cleanData: Record = stripSystemFields(data) const newerThanField = streamNewerThanField.get(stream) if ( @@ -803,7 +805,7 @@ export function createDestination( ? serializeRowKey(primaryKey, cleanData) : undefined - if (cleanData['deleted'] === true) { + if (recordDeleted === true) { deleteBuffers.get(stream)!.push({ rowKey, rowNumber }) } else if (rowNumber !== undefined) { // 1. Explicit _row_number (backwards compat with service layer) diff --git a/packages/destination-postgres/src/index.test.ts b/packages/destination-postgres/src/index.test.ts index e09e857b4..d85ce6d05 100644 --- a/packages/destination-postgres/src/index.test.ts +++ b/packages/destination-postgres/src/index.test.ts @@ -1,7 +1,7 @@ import { execSync } from 'child_process' import pg from 'pg' import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest' -import destination, { upsertMany, type Config } from './index.js' +import destination, { deleteMany, upsertMany, writeMany, type Config } from './index.js' import type { ConfiguredCatalog, DestinationInput, @@ -536,6 +536,161 @@ describe('upsertMany standalone', () => { }) }) +describe('deleteMany / writeMany', () => { + beforeEach(async () => { + await drain(destination.setup!({ config: makeConfig(), catalog })) + }) + + it('hard-deletes existing rows by primary key', async () => { + const testPool = new pg.Pool({ connectionString }) + try { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + testPool, + SCHEMA, + 'customers', + [ + { id: 'cus_keep', name: 'Keep', _updated_at: ts }, + { id: 'cus_drop', name: 'Drop', _updated_at: ts }, + ], + ['id'], + '_updated_at' + ) + + const result = await deleteMany(testPool, SCHEMA, 'customers', [{ id: 'cus_drop' }], ['id']) + expect(result.deleted_count).toBe(1) + + const { rows } = await pool.query(`SELECT id FROM "${SCHEMA}".customers ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_keep' }]) + } finally { + await testPool.end() + } + }) + + it('deletes are terminal regardless of timestamp ordering', async () => { + const testPool = new pg.Pool({ connectionString }) + try { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + testPool, + SCHEMA, + 'customers', + [{ id: 'cus_fresh', name: 'Fresh', _updated_at: ts + 10 }], + ['id'], + '_updated_at' + ) + + const result = await deleteMany(testPool, SCHEMA, 'customers', [{ id: 'cus_fresh' }], ['id']) + expect(result.deleted_count).toBe(1) + + const { rows } = await pool.query(`SELECT count(*)::int AS n FROM "${SCHEMA}".customers`) + expect(rows[0].n).toBe(0) + } finally { + await testPool.end() + } + }) + + it('writeMany routes a mixed batch to upsert and delete paths', async () => { + const testPool = new pg.Pool({ connectionString }) + try { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + testPool, + SCHEMA, + 'customers', + [{ id: 'cus_old', name: 'Old', _updated_at: ts }], + ['id'], + '_updated_at' + ) + + const result = await writeMany( + testPool, + SCHEMA, + 'customers', + [ + { data: { id: 'cus_new', name: 'New', _updated_at: ts + 1 } }, + { recordDeleted: true, data: { id: 'cus_old', _updated_at: ts + 1 } }, + ], + ['id'], + '_updated_at' + ) + expect(result.created_count).toBe(1) + expect(result.deleted_count).toBe(1) + + const { rows } = await pool.query(`SELECT id FROM "${SCHEMA}".customers ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_new' }]) + } finally { + await testPool.end() + } + }) + + it('deleteMany no-ops on empty array', async () => { + const testPool = new pg.Pool({ connectionString }) + try { + const result = await deleteMany(testPool, SCHEMA, 'customers', [], ['id']) + expect(result).toEqual({ deleted_count: 0 }) + } finally { + await testPool.end() + } + }) + + it('deletes only the matching tenant row for composite (id, _account_id) PK', async () => { + const testPool = new pg.Pool({ connectionString }) + try { + const compositeCatalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customers', + primary_key: [['id'], ['_account_id']], + newer_than_field: '_updated_at', + json_schema: { + type: 'object', + properties: { + id: { type: 'string' }, + _account_id: { type: 'string' }, + }, + }, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'overwrite', + }, + ], + } + await pool.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + await drain(destination.setup!({ config: makeConfig(), catalog: compositeCatalog })) + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + testPool, + SCHEMA, + 'customers', + [ + { id: 'cus_1', name: 'Alice (A)', _account_id: 'acct_AAA', _updated_at: ts }, + { id: 'cus_1', name: 'Alice (B)', _account_id: 'acct_BBB', _updated_at: ts }, + ], + ['id', '_account_id'], + '_updated_at' + ) + + const result = await deleteMany( + testPool, + SCHEMA, + 'customers', + [{ id: 'cus_1', _account_id: 'acct_AAA' }], + ['id', '_account_id'] + ) + expect(result.deleted_count).toBe(1) + + const { rows } = await pool.query( + `SELECT _account_id FROM "${SCHEMA}".customers ORDER BY _account_id` + ) + expect(rows).toEqual([{ _account_id: 'acct_BBB' }]) + } finally { + await testPool.end() + } + }) +}) + describe('schema-driven CHECK constraints', () => { function catalogWith(enumValues: string[], column = '_account_id'): ConfiguredCatalog { return { diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 08f7cd610..411f986b4 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -2,6 +2,9 @@ import pg from 'pg' import type { PoolConfig } from 'pg' import type { Destination } from '@stripe/sync-protocol' import { + ident, + identList, + qualifiedTable, sql, sslConfigFromConnectionString, stripSslParams, @@ -54,15 +57,46 @@ export async function buildPoolConfig(config: Config): Promise { throw new Error('Either url/connection_string or aws config is required') } -// MARK: - upsertMany +// MARK: - writeMany / upsertMany / deleteMany export interface UpsertManyResult { created_count: number updated_count: number - deleted_count: number skipped_count: number } +export interface DeleteManyResult { + deleted_count: number +} + +export interface WriteManyResult extends UpsertManyResult, DeleteManyResult {} + +/** + * Apply a mixed batch of live records and tombstones to a Postgres table. + * Records with `deleted: true` are routed to {@link deleteMany} (hard delete); + * everything else goes through {@link upsertMany}. + * + * Existing soft-deleted rows from prior deployments are intentionally not + * cleaned up — no production user is on the soft-delete code path. + */ +export async function writeMany( + pool: pg.Pool, + schema: string, + table: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + entries: Record[], + primaryKeyColumns: string[] = ['id'], + newerThanField: string +): Promise { + const tombstones = entries.filter((e) => e.recordDeleted === true).map((r) => r.data) + const liveRecords = entries.filter((e) => e.recordDeleted !== true).map((r) => r.data) + + const u = await upsertMany(pool, schema, table, liveRecords, primaryKeyColumns, newerThanField) + const d = await deleteMany(pool, schema, table, tombstones, primaryKeyColumns) + + return { ...u, deleted_count: d.deleted_count } +} + /** * Upsert records into a Postgres table; lifts `[newer_than_field]` from the * source-stamped record into the legacy `_updated_at` timestamptz column (DDR-009). @@ -77,7 +111,11 @@ export async function upsertMany( newerThanField: string ): Promise { if (!entries.length) - return { created_count: 0, updated_count: 0, deleted_count: 0, skipped_count: 0 } + return { + created_count: 0, + updated_count: 0, + skipped_count: 0, + } const records = entries.map((e) => { const ts = e[newerThanField] as unknown @@ -89,12 +127,45 @@ export async function upsertMany( return { _raw_data: e, _updated_at: new Date(ts * 1000).toISOString() } }) - return await upsertWithStats( - pool, - records, - { schema, table, primaryKeyColumns, newerThanColumn: newerThanField }, - `"_raw_data"->>'deleted' = 'true'` - ) + return await upsertWithStats(pool, records, { + schema, + table, + primaryKeyColumns, + newerThanColumn: newerThanField, + }) +} + +/** + * Hard-delete rows by primary key. No `newer_than_field` guard: deletion is + * terminal — once an object is deleted it cannot be undeleted. + */ +export async function deleteMany( + pool: pg.Pool, + schema: string, + table: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + entries: Record[], + primaryKeyColumns: string[] = ['id'] +): Promise { + if (!entries.length) return { deleted_count: 0 } + + const params: unknown[] = [] + const valueRows = entries.map((e) => { + const cells = primaryKeyColumns.map((pk) => { + params.push(String(e[pk])) + return `$${params.length}::text` + }) + return `(${cells.join(', ')})` + }) + + const tbl = qualifiedTable(schema, table) + const pkJoin = primaryKeyColumns.map((c) => `t.${ident(c)} = d.${ident(c)}`).join(' AND ') + const stmt = `DELETE FROM ${tbl} t +USING (VALUES ${valueRows.join(', ')}) AS d(${identList(primaryKeyColumns)}) +WHERE ${pkJoin}` + + const result = await pool.query(stmt, params) + return { deleted_count: result.rowCount ?? 0 } } // MARK: - Named exports @@ -354,7 +425,7 @@ const destination = { 'dest write: flush start' ) try { - const stats = await upsertMany(pool, config.schema, streamName, buffer, pk, newerThan) + const stats = await writeMany(pool, config.schema, streamName, buffer, pk, newerThan) log.debug( { stream: streamName, @@ -406,7 +477,7 @@ const destination = { await connectAndRelease(pool, 'write') for await (const msg of $stdin) { if (msg.type === 'record') { - const { stream, data } = msg.record + const { stream } = msg.record if (failedStreams.has(stream)) { log.debug({ stream }, 'dest write: skipping record for failed stream') @@ -418,7 +489,7 @@ const destination = { } const buffer = streamBuffers.get(stream)! - buffer.push(data as Record) + buffer.push(msg.record as Record) if (buffer.length >= batchSize) { const err = await flushStream(stream) diff --git a/packages/destination-postgres/src/schemaProjection.test.ts b/packages/destination-postgres/src/schemaProjection.test.ts index 1c6f539a1..dd421dbe6 100644 --- a/packages/destination-postgres/src/schemaProjection.test.ts +++ b/packages/destination-postgres/src/schemaProjection.test.ts @@ -10,7 +10,6 @@ const SAMPLE_JSON_SCHEMA: Record = { properties: { id: { type: 'string' }, created: { type: 'integer' }, - deleted: { type: 'boolean' }, metadata: { type: 'object' }, expires_at: { type: 'string', format: 'date-time' }, }, @@ -33,9 +32,10 @@ describe('jsonSchemaToColumns', () => { const byName = Object.fromEntries(columns.map((c) => [c.name, c])) expect(byName.created.pgType).toBe('bigint') - expect(byName.deleted.pgType).toBe('boolean') expect(byName.metadata.pgType).toBe('jsonb') expect(byName.expires_at.pgType).toBe('text') // date-time → text for safety + // `deleted` is intentionally filtered out — tombstones are consumed by deleteMany + expect(byName.deleted).toBeUndefined() }) it('skips the id column (generated separately)', () => { @@ -78,9 +78,9 @@ describe('buildCreateTableWithSchema', () => { const alterStmts = stmts.filter((s) => s.includes('ADD COLUMN IF NOT EXISTS')) expect(alterStmts.length).toBe(1) expect(alterStmts[0]).toContain('ADD COLUMN IF NOT EXISTS "created"') - expect(alterStmts[0]).toContain('ADD COLUMN IF NOT EXISTS "deleted"') expect(alterStmts[0]).toContain('ADD COLUMN IF NOT EXISTS "metadata"') expect(alterStmts[0]).toContain('ADD COLUMN IF NOT EXISTS "expires_at"') + expect(alterStmts[0]).not.toContain('"deleted"') // No FK constraint expect(stmts.some((s) => s.includes('FOREIGN KEY'))).toBe(false) @@ -240,9 +240,9 @@ describe('buildCreateTableDDL', () => { expect(ddl).toContain('"created" bigint GENERATED ALWAYS AS') expect(ddl).toContain('ADD COLUMN IF NOT EXISTS "created"') - expect(ddl).toContain('ADD COLUMN IF NOT EXISTS "deleted"') expect(ddl).toContain('ADD COLUMN IF NOT EXISTS "metadata"') expect(ddl).toContain('ADD COLUMN IF NOT EXISTS "expires_at"') + expect(ddl).not.toContain('"deleted"') expect(ddl).toContain('"_updated_at" timestamptz NOT NULL DEFAULT now()') expect(ddl).toContain('DROP TRIGGER IF EXISTS handle_updated_at') diff --git a/packages/openapi/__tests__/specParser.test.ts b/packages/openapi/__tests__/specParser.test.ts index 986466900..850f90dde 100644 --- a/packages/openapi/__tests__/specParser.test.ts +++ b/packages/openapi/__tests__/specParser.test.ts @@ -19,9 +19,9 @@ describe('SpecParser', () => { const customers = parsed.tables.find((table) => table.tableName === 'customers') expect(customers?.columns).toEqual([ { name: 'created', type: 'bigint', nullable: false }, - { name: 'deleted', type: 'boolean', nullable: false }, { name: 'object', type: 'text', nullable: false }, ]) + expect(customers?.columns).not.toContainEqual(expect.objectContaining({ name: 'deleted' })) const checkoutSessions = parsed.tables.find((table) => table.tableName === 'checkout_sessions') expect(checkoutSessions?.columns).toContainEqual({ @@ -53,16 +53,14 @@ describe('SpecParser', () => { const subscriptionItems = parsed.tables.find( (table) => table.tableName === 'subscription_items' ) - expect(subscriptionItems?.columns).toContainEqual({ - name: 'deleted', - type: 'boolean', - nullable: true, - }) expect(subscriptionItems?.columns).toContainEqual({ name: 'subscription', type: 'text', nullable: true, }) + expect(subscriptionItems?.columns).not.toContainEqual( + expect.objectContaining({ name: 'deleted' }) + ) }) it('is deterministic regardless of schema key order', () => { diff --git a/packages/openapi/runtimeMappings.ts b/packages/openapi/runtimeMappings.ts index 423986662..517988eef 100644 --- a/packages/openapi/runtimeMappings.ts +++ b/packages/openapi/runtimeMappings.ts @@ -29,7 +29,6 @@ export const OPENAPI_COMPATIBILITY_COLUMNS: Record = { { name: 'amount_discount', type: 'bigint', nullable: true }, { name: 'amount_tax', type: 'bigint', nullable: true }, ], - customers: [{ name: 'deleted', type: 'boolean', nullable: true }], early_fraud_warnings: [{ name: 'payment_intent', type: 'text', nullable: true }], features: [ { name: 'object', type: 'text', nullable: true }, @@ -39,8 +38,5 @@ export const OPENAPI_COMPATIBILITY_COLUMNS: Record = { { name: 'livemode', type: 'boolean', nullable: true }, { name: 'metadata', type: 'json', nullable: true }, ], - subscription_items: [ - { name: 'deleted', type: 'boolean', nullable: true }, - { name: 'subscription', type: 'text', nullable: true }, - ], + subscription_items: [{ name: 'subscription', type: 'text', nullable: true }], } diff --git a/packages/openapi/specParser.ts b/packages/openapi/specParser.ts index 449e49a9c..9e7fc44fe 100644 --- a/packages/openapi/specParser.ts +++ b/packages/openapi/specParser.ts @@ -18,6 +18,7 @@ const RESERVED_COLUMNS = new Set([ '_last_synced_at', '_updated_at', '_account_id', + 'deleted', ]) export { OPENAPI_RESOURCE_TABLE_ALIASES } diff --git a/packages/protocol/src/helpers.ts b/packages/protocol/src/helpers.ts index 6d35fbd80..974d9bcca 100644 --- a/packages/protocol/src/helpers.ts +++ b/packages/protocol/src/helpers.ts @@ -182,7 +182,7 @@ export function createSourceMessageFactory< TRecordData extends Record, >() { return { - record(payload: { stream: string; data: TRecordData; emitted_at: string }): RecordMessage { + record(payload: { stream: string; data: TRecordData; emitted_at: string, recordDeleted?: boolean }): RecordMessage { return { type: 'record', record: payload } }, diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index b31501cee..f66846ddc 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -184,6 +184,7 @@ export const RecordPayload = z .object({ stream: z.string().describe('Stream (table) name this record belongs to.'), data: z.record(z.string(), z.unknown()).describe('The record payload as a key-value map.'), + recordDeleted: z.boolean().optional(), emitted_at: z .string() .datetime() diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index cf0b4b5a5..cf418f530 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -1459,8 +1459,13 @@ describe('StripeSource', () => { expect(messages).toHaveLength(2) expect(messages[0]).toMatchObject({ type: 'record', - record: { stream: 'customers', data: { id: 'cus_1', object: 'customer', deleted: true } }, + record: { + stream: 'customers', + recordDeleted: true, + data: { id: 'cus_1', object: 'customer' }, + }, }) + expect(messages[1]).toMatchObject({ type: 'source_state', source_state: { @@ -1476,7 +1481,7 @@ describe('StripeSource', () => { } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) - // product.deleted event — the object may not have deleted: true in its body + // product.deleted event: the object may not have deleted: true in its body. const event = makeEvent({ id: 'evt_del_2', type: 'product.deleted', @@ -1491,7 +1496,11 @@ describe('StripeSource', () => { expect(messages).toHaveLength(2) expect(messages[0]).toMatchObject({ type: 'record', - record: { stream: 'products', data: { id: 'prod_1', object: 'product', deleted: true } }, + record: { + stream: 'products', + recordDeleted: true, + data: { id: 'prod_1', object: 'product' }, + }, }) }) diff --git a/packages/source-stripe/src/process-event.ts b/packages/source-stripe/src/process-event.ts index 0aef1659a..3ba59be3c 100644 --- a/packages/source-stripe/src/process-event.ts +++ b/packages/source-stripe/src/process-event.ts @@ -171,9 +171,9 @@ export async function* processStripeEvent( yield msg.record({ stream: resourceConfig.tableName, emitted_at: new Date().toISOString(), + recordDeleted: true, data: { ...dataObject, - deleted: true, [newerThanField(resourceConfig.tableName)]: _updated_at, ...(accountId ? { _account_id: accountId } : {}), }, diff --git a/packages/util-postgres/src/upsert.test.ts b/packages/util-postgres/src/upsert.test.ts index 5f7d5122b..2bad1da16 100644 --- a/packages/util-postgres/src/upsert.test.ts +++ b/packages/util-postgres/src/upsert.test.ts @@ -653,7 +653,6 @@ describe('upsertWithStats', () => { expect(result).toEqual({ created_count: 3, updated_count: 0, - deleted_count: 0, skipped_count: 0, }) }) @@ -680,7 +679,6 @@ describe('upsertWithStats', () => { expect(result).toEqual({ created_count: 0, updated_count: 2, - deleted_count: 0, skipped_count: 0, }) }) @@ -707,7 +705,6 @@ describe('upsertWithStats', () => { expect(result).toEqual({ created_count: 0, updated_count: 0, - deleted_count: 0, skipped_count: 2, }) }) @@ -731,7 +728,6 @@ describe('upsertWithStats', () => { expect(result).toEqual({ created_count: 2, updated_count: 1, - deleted_count: 0, skipped_count: 0, }) }) @@ -759,7 +755,6 @@ describe('upsertWithStats', () => { expect(result).toEqual({ created_count: 1, updated_count: 1, - deleted_count: 0, skipped_count: 1, }) }) @@ -769,61 +764,6 @@ describe('upsertWithStats', () => { expect(result).toEqual({ created_count: 0, updated_count: 0, - deleted_count: 0, - skipped_count: 0, - }) - }) - }) - - describe('soft delete', () => { - beforeEach(async () => { - table = nextTable() - await pool.query(` - CREATE TABLE "${table}" ( - _raw_data jsonb NOT NULL, - id text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED, - PRIMARY KEY (id) - ) - `) - }) - - it('classifies soft-deleted inserts as deleted', async () => { - const result = await upsertWithStats( - pool, - [ - { _raw_data: { id: '1', name: 'Alice' } }, - { _raw_data: { id: '2', name: 'Bob' } }, - { _raw_data: { id: '3', name: 'Gone', deleted: true } }, - ], - { table, primaryKeyColumns: ['id'] }, - "_raw_data->>'deleted'" - ) - - expect(result).toEqual({ - created_count: 2, - updated_count: 0, - deleted_count: 1, - skipped_count: 0, - }) - }) - - it('classifies soft-deleted updates as deleted', async () => { - await upsert(pool, [{ _raw_data: { id: '1', name: 'Alice' } }], { - table, - primaryKeyColumns: ['id'], - }) - - const result = await upsertWithStats( - pool, - [{ _raw_data: { id: '1', name: 'Alice', deleted: true } }], - { table, primaryKeyColumns: ['id'] }, - "_raw_data->>'deleted'" - ) - - expect(result).toEqual({ - created_count: 0, - updated_count: 0, - deleted_count: 1, skipped_count: 0, }) }) diff --git a/packages/util-postgres/src/upsert.ts b/packages/util-postgres/src/upsert.ts index 8737f6162..d3939ddae 100644 --- a/packages/util-postgres/src/upsert.ts +++ b/packages/util-postgres/src/upsert.ts @@ -109,14 +109,11 @@ export type UpsertOptions = { type BuildUpsertSqlOptions = UpsertOptions & { /** Append RETURNING (xmax = 0) AS _sync_created instead of RETURNING *. */ returningWriteStats?: boolean - /** SQL expression for soft-delete detection, added to RETURNING when returningWriteStats is true. */ - softDeleteExpression?: string } export type UpsertResult = { created_count: number updated_count: number - deleted_count: number skipped_count: number } @@ -156,7 +153,6 @@ export function buildUpsertSql( skipNoopUpdates = true, returning = false, returningWriteStats = false, - softDeleteExpression, } = options // Derive column list from the first record — all records must have the same shape. @@ -228,9 +224,6 @@ export function buildUpsertSql( if (returningWriteStats) { const parts = returning ? ['*'] : [] parts.push('(xmax = 0) AS _sync_created') - if (softDeleteExpression) { - parts.push(`(${softDeleteExpression})::boolean AS _sync_deleted`) - } sql += `\nRETURNING ${parts.join(', ')}` } else if (returning) { sql += '\nRETURNING *' @@ -270,29 +263,23 @@ export async function upsert( } /** - * Upsert with created/updated/deleted/skipped breakdown. - * - * Uses Postgres `xmax = 0` to distinguish inserts from updates, and an - * optional `softDeleteExpression` to classify soft-deleted records. + * Upsert with created/updated/skipped breakdown. * - * @param softDeleteExpression - SQL expression that evaluates to a boolean - * indicating a soft-deleted record, e.g. `"_raw_data->>'deleted'"`. + * Uses Postgres `xmax = 0` to distinguish inserts from updates. */ export async function upsertWithStats( client: { query(text: string, values?: unknown[]): Promise }, records: Record[], - options: UpsertOptions, - softDeleteExpression?: string + options: UpsertOptions ): Promise { if (records.length === 0) { - return { created_count: 0, updated_count: 0, deleted_count: 0, skipped_count: 0 } + return { created_count: 0, updated_count: 0, skipped_count: 0 } } const { sql, params } = buildUpsertSql(records, { ...options, returningWriteStats: true, returning: false, - softDeleteExpression, }) let result: pg.QueryResult @@ -315,13 +302,9 @@ export async function upsertWithStats( let created_count = 0 let updated_count = 0 - let deleted_count = 0 for (const row of result.rows) { - const isDeleted = softDeleteExpression ? Boolean(row._sync_deleted) : false - if (isDeleted) { - deleted_count++ - } else if (row._sync_created) { + if (row._sync_created) { created_count++ } else { updated_count++ @@ -330,5 +313,5 @@ export async function upsertWithStats( const skipped_count = records.length - result.rows.length - return { created_count, updated_count, deleted_count, skipped_count } + return { created_count, updated_count, skipped_count } } diff --git a/scripts/test-google-sheet-pipeline-sync.ts b/scripts/test-google-sheet-pipeline-sync.ts new file mode 100644 index 000000000..1c5de3f59 --- /dev/null +++ b/scripts/test-google-sheet-pipeline-sync.ts @@ -0,0 +1,83 @@ +const STREAM_NAME = ""; // e.g. products +const OBJECT_NAME = ""; // e.g. product +const OBJECT_ID = ""; // e.g. prod_1234 + +const res = await fetch("http://localhost:4010/pipeline_sync", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + pipeline: { + source: { + type: "stripe", + stripe: { + api_key: process.env.STRIPE_API_KEY, + account_id: process.env.STRIPE_ACCOUNT_ID, + api_version: process.env.STRIPE_API_VERSION, + base_url: process.env.STRIPE_API_BASE_URL, + webhook_url: process.env.STRIPE_WEBHOOK_URL, + }, + }, + destination: { + type: "google_sheets", + google_sheets: { + client_id: process.env.GOOGLE_CLIENT_ID, + client_secret: process.env.GOOGLE_CLIENT_SECRET, + access_token: "", + refresh_token: process.env.GOOGLE_OAUTH_REFRESH_TOKEN, + spreadsheet_id: process.env.GOOGLE_SHEET_ID, + spreadsheet_title: STREAM_NAME, + }, + }, + streams: [{ name: STREAM_NAME }], + }, + stdin: [ + { + type: "source_input", + source_input: { + id: "evt_1TRJWRBoxBhC7kEnoXEMDaQR", // Random + object: "event", + api_version: process.env.STRIPE_API_VERSION, + created: 1777413026, + data: { + object: { + id: OBJECT_ID, + object: OBJECT_NAME, + active: true, + attributes: [], + created: 1777413026, + default_price: "price_1234", + description: null, + features: [], + images: [], + livemode: true, + marketing_features: [], + metadata: {}, + name: "test", + package_dimensions: null, + shippable: null, + statement_descriptor: null, + tax_code: "txcd_1234", + tax_details: { performance_location: null, tax_code: "txcd_1234" }, + type: "service", + unit_label: null, + updated: 1777415340, + url: null, + }, + previous_attributes: { default_price: null }, + }, + livemode: true, + pending_webhooks: 3, + request: { + id: "req_1234", + idempotency_key: "1234", + }, + type: `${OBJECT_NAME}.deleted`, + }, + }, + ], + time_limit: 1790.0, + }), +}); + +console.log(res.status, res.statusText); +console.log(await res.text());