diff --git a/.gitignore b/.gitignore index 733217c..0ab6dac 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ Desktop.ini node_modules/ dist/ *.tgz +.metabase/ +upload.log diff --git a/README.md b/README.md index 9e76dac..bd1874d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ This repository contains the specification, examples, and a CLI that converts th ## Specification -The format is defined in **[core-spec/v1/spec.md](core-spec/v1/spec.md)** (v1.1.0). It covers entity keys, field types, folder structure, sampled field values, and the shape of each entity. +The format is defined in **[core-spec/v1/spec.md](core-spec/v1/spec.md)** (v1.0.3). It covers entity keys, field types, folder structure, sampled field values, and the shape of each entity. Reference output for the Sample Database lives in **[examples/v1/](examples/v1/)** — both the raw `metadata.json` returned by the endpoint and the extracted YAML tree. @@ -22,17 +22,39 @@ Reference output for the Sample Database lives in **[examples/v1/](examples/v1/) Metadata is fetched on demand from a running Metabase instance via `GET /api/database/metadata`. The response is a flat JSON document with three arrays — `databases`, `tables`, and `fields` — streamed so that even warehouses with very large schemas can be exported without exhausting server memory. -Authenticate with either a session token (`X-Metabase-Session`) or an API key (`X-API-Key`): +Authenticate with an API key (`X-API-Key`) or session token (`X-Metabase-Session`). + +### Downloading metadata + +The CLI can fetch `metadata.json`, `field-values.json`, and extract the YAML tree in one streaming pass: ```sh -curl "$METABASE_URL/api/database/metadata" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o metadata.json +export METABASE_API_KEY=... +bunx @metabase/database-metadata download-metadata "$METABASE_URL" ``` +With no flags, the command writes: + +- `.metabase/metadata.json` +- `.metabase/field-values.json` +- `.metabase/databases/` — extracted YAML tree + +Flags override any default or opt out of individual steps: + +| Flag | Default | Purpose | +|------|---------|---------| +| `--metadata ` | `.metabase/metadata.json` | Where to write the raw metadata JSON | +| `--field-values ` | `.metabase/field-values.json` | Where to write the raw field-values JSON | +| `--extract ` | `.metabase/databases` | Where to extract the YAML tree | +| `--no-field-values` | — | Skip downloading field values | +| `--no-extract` | — | Skip YAML extraction | +| `--api-key ` | `METABASE_API_KEY` env var | API key | + +Files are streamed to disk directly — responses are never fully buffered in memory, so multi-GB exports stay lean. + ### Extracting metadata to YAML -The CLI turns that JSON into the human- and agent-friendly YAML tree described in the spec: +If you already have a `metadata.json` on disk (e.g. downloaded via `curl`), you can skip the download and extract directly: ```sh bunx @metabase/database-metadata extract-metadata @@ -43,13 +65,9 @@ bunx @metabase/database-metadata extract-metadata ### Extracting field values -Metabase keeps a sampled list of distinct values for each field that's low-cardinality enough to enumerate (the same list that powers filter dropdowns in the UI). Fetch it and extract it alongside the metadata: +Metabase keeps a sampled list of distinct values for each field that's low-cardinality enough to enumerate (the same list that powers filter dropdowns in the UI). ```sh -curl "$METABASE_URL/api/database/field-values" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o field-values.json - bunx @metabase/database-metadata extract-field-values ``` @@ -59,6 +77,28 @@ bunx @metabase/database-metadata extract-field-values ` | `.metabase/metadata.json` | Path to the metadata JSON to upload | +| `--field-values ` | `.metabase/field-values.json` | Path to the field-values JSON | +| `--no-field-values` | — | Skip uploading field values | +| `--api-key ` | `METABASE_API_KEY` env var | API key | + +The source JSON files are streamed through `@streamparser/json-node` — they are never fully loaded into memory, so 100 GB+ exports upload fine. Rows are sent in batches of 2000 per HTTP POST (matching the server's per-transaction batch size) with HTTP keep-alive, so each request is one clean server-side transaction. + +Exits non-zero if any step reports row-level errors, or if the server acknowledges fewer rows than were sent in a batch (so CI can catch partial imports). + ### Extracting the spec The bundled spec can be extracted to any file — convenient for agents that need to read it locally: @@ -112,25 +152,17 @@ cp .env.template .env ### 4. Fetch and extract on demand -With `.env` populated, the end-to-end flow is: +With `.env` populated, the end-to-end flow is a single command: ```sh set -a; source .env; set +a -mkdir -p .metabase -curl -sf "$METABASE_URL/api/database/metadata" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o .metabase/metadata.json - -curl -sf "$METABASE_URL/api/database/field-values" \ - -H "X-API-Key: $METABASE_API_KEY" \ - -o .metabase/field-values.json - rm -rf .metabase/databases -bunx @metabase/database-metadata extract-metadata .metabase/metadata.json .metabase/databases -bunx @metabase/database-metadata extract-field-values .metabase/metadata.json .metabase/field-values.json .metabase/databases +bunx @metabase/database-metadata download-metadata "$METABASE_URL" ``` +That downloads `.metabase/metadata.json`, `.metabase/field-values.json`, and extracts the YAML tree into `.metabase/databases/`. Use `--no-field-values` or `--no-extract` to skip parts of the pipeline. + After this, tools and agents should read the YAML tree under `.metabase/databases/` — not `metadata.json` or `field-values.json`, which exist only as input to the extractors. ## Publishing to NPM diff --git a/bin/cli.test.ts b/bin/cli.test.ts index f3af7c2..aaa6f0d 100644 --- a/bin/cli.test.ts +++ b/bin/cli.test.ts @@ -14,6 +14,8 @@ type RunResult = { exitCode: number; }; +type UploadLine = { id: number }; + function runCli(args: string[]): RunResult { const proc = Bun.spawnSync({ cmd: ["bun", "run", CLI, ...args], @@ -121,6 +123,179 @@ describe("cli", () => { }); }); + describe("upload-metadata", () => { + it("errors when is missing", () => { + const { stderr, exitCode } = runCli(["upload-metadata"]); + expect(exitCode).toBe(1); + expect(stderr).toContain(""); + }); + + it("errors when no api key is set", () => { + const proc = Bun.spawnSync({ + cmd: ["bun", "run", CLI, "upload-metadata", "http://127.0.0.1:1"], + cwd: REPO_ROOT, + env: { ...process.env, METABASE_API_KEY: "" }, + }); + expect(proc.exitCode).toBe(1); + expect(proc.stderr.toString()).toContain("API key is required"); + }); + + it("uploads against a mock server end-to-end", async () => { + const server = Bun.serve({ + port: 0, + async fetch(request) { + const url = new URL(request.url); + const body = await request.text(); + const inLines = body + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0); + + let response = ""; + switch (url.pathname) { + case "/api/database/metadata/databases": + case "/api/database/metadata/tables": + case "/api/database/metadata/fields": + for (const line of inLines) { + const { id } = JSON.parse(line) as UploadLine; + response += JSON.stringify({ old_id: id, new_id: id }) + "\n"; + } + break; + case "/api/database/metadata/fields/finalize": + for (const line of inLines) { + const { id } = JSON.parse(line) as UploadLine; + response += JSON.stringify({ id, ok: true }) + "\n"; + } + break; + default: + return new Response("not found", { status: 404 }); + } + return new Response(response, { + headers: { "Content-Type": "application/x-ndjson" }, + }); + }, + }); + try { + // NB: must use async Bun.spawn — spawnSync would block the parent + // event loop and deadlock with the in-process mock server. + const proc = Bun.spawn({ + cmd: [ + "bun", + "run", + CLI, + "upload-metadata", + `http://127.0.0.1:${server.port}`, + "--metadata", + EXAMPLE_INPUT, + "--no-field-values", + ], + cwd: REPO_ROOT, + env: { ...process.env, METABASE_API_KEY: "ci-key" }, + stdout: "pipe", + stderr: "pipe", + }); + const [stdoutText, stderrText, exitCode] = await Promise.all([ + new Response(proc.stdout).text(), + new Response(proc.stderr).text(), + proc.exited, + ]); + expect(exitCode).toBe(0); + expect(stdoutText).toContain("Databases:"); + expect(stdoutText).toContain("Finalized:"); + expect(stdoutText).not.toContain("Values:"); + expect(stderrText).toBe(""); + } finally { + await server.stop(); + } + }); + }); + + describe("download-metadata", () => { + let workdir: string; + + beforeEach(() => { + workdir = mkdtempSync(join(tmpdir(), "download-metadata-cli-")); + }); + + afterEach(() => { + rmSync(workdir, { recursive: true, force: true }); + }); + + it("errors when is missing", () => { + const { stderr, exitCode } = runCli(["download-metadata"]); + expect(exitCode).toBe(1); + expect(stderr).toContain(""); + }); + + // End-to-end streaming + path-override via a spawned CLI against a mock + // server. Defaults-in-cwd behaviour is covered by library tests in + // src/download-metadata.test.ts — attempting the same with cwd=tmpdir + // inside bun:test reliably hangs Bun.spawn (unrelated to CLI logic). + it("overrides output paths via flags and writes each file", async () => { + const EXAMPLE_METADATA_PATH = join(REPO_ROOT, EXAMPLE_INPUT); + const EXAMPLE_VALUES_PATH = join(REPO_ROOT, EXAMPLE_FIELD_VALUES); + const server = Bun.serve({ + port: 0, + fetch(request) { + const url = new URL(request.url); + if (url.pathname === "/api/database/metadata") { + return new Response(Bun.file(EXAMPLE_METADATA_PATH)); + } + if (url.pathname === "/api/database/field-values") { + return new Response(Bun.file(EXAMPLE_VALUES_PATH)); + } + return new Response("not found", { status: 404 }); + }, + }); + try { + const metadataFile = join(workdir, "custom-metadata.json"); + const fieldValuesFile = join(workdir, "custom-values.json"); + const extractFolder = join(workdir, "custom-databases"); + const proc = Bun.spawn({ + cmd: [ + "bun", + "run", + CLI, + "download-metadata", + `http://127.0.0.1:${server.port}`, + "--metadata", + metadataFile, + "--field-values", + fieldValuesFile, + "--extract", + extractFolder, + ], + cwd: REPO_ROOT, + env: { ...process.env, METABASE_API_KEY: "ci-key" }, + stdout: "pipe", + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([ + new Response(proc.stdout).text(), + new Response(proc.stderr).text(), + proc.exited, + ]); + expect(stderr).toBe(""); + expect(exitCode).toBe(0); + expect(stdout).toContain("Metadata:"); + expect(stdout).toContain("Field values:"); + expect(stdout).toContain("Extracted to:"); + expect(existsSync(metadataFile)).toBe(true); + expect(existsSync(fieldValuesFile)).toBe(true); + expect( + existsSync( + join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", + ), + ), + ).toBe(true); + } finally { + await server.stop(); + } + }); + }); + describe("extract-spec", () => { let workdir: string; diff --git a/bin/cli.ts b/bin/cli.ts index 9145591..b74361c 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -2,15 +2,33 @@ import { parseArgs } from "node:util"; +import { downloadMetadata } from "../src/download-metadata.js"; import { extractFieldValues } from "../src/extract-field-values.js"; import { extractMetadata } from "../src/extract-metadata.js"; import { extractSpec } from "../src/extract-spec.js"; +import { + uploadMetadata, + type UploadMetadataResult, + type UploadStepStats, +} from "../src/upload-metadata.js"; type ParsedValues = { file?: string; help?: boolean; + metadata?: string; + "field-values"?: string; + extract?: string; + "no-field-values"?: boolean; + "no-extract"?: boolean; + "api-key"?: string; }; +const DEFAULT_PATHS = { + metadata: ".metabase/metadata.json", + fieldValues: ".metabase/field-values.json", + extract: ".metabase/databases", +} as const; + const HELP = `Usage: database-metadata [arguments] [options] Commands: @@ -26,6 +44,23 @@ Commands: extract-spec Copy the bundled spec.md into a target file --file Destination file (default: ./spec.md) + upload-metadata Stream metadata + field values to a target + Metabase instance via NDJSON. + --metadata Override metadata.json path (default: .metabase/metadata.json) + --field-values Override field-values.json path (default: .metabase/field-values.json) + --no-field-values Skip uploading field values + --api-key API key. Defaults to METABASE_API_KEY env var. + + download-metadata Stream metadata + field values from a + Metabase instance into .metabase/ and + extract the YAML tree by default. + --metadata Override metadata.json path (default: .metabase/metadata.json) + --field-values Override field-values.json path (default: .metabase/field-values.json) + --extract Override YAML extract folder (default: .metabase/databases) + --no-field-values Skip downloading field values + --no-extract Skip YAML extraction + --api-key API key. Defaults to METABASE_API_KEY env var. + Options: -h, --help Show this help message`; @@ -35,6 +70,12 @@ function parseArguments() { options: { file: { type: "string" }, help: { type: "boolean", short: "h", default: false }, + metadata: { type: "string" }, + "field-values": { type: "string" }, + extract: { type: "string" }, + "no-field-values": { type: "boolean", default: false }, + "no-extract": { type: "boolean", default: false }, + "api-key": { type: "string" }, }, }); } @@ -80,13 +121,133 @@ function handleExtractFieldValues(positionals: string[]): void { process.exit(0); } +async function handleUploadMetadata( + positionals: string[], + values: ParsedValues, +): Promise { + const instanceUrl = positionals[1]; + + if (!instanceUrl) { + console.error("Error: argument is required"); + process.exit(1); + } + + const apiKey = values["api-key"] ?? process.env.METABASE_API_KEY; + if (!apiKey) { + console.error( + "Error: API key is required (pass --api-key or set METABASE_API_KEY)", + ); + process.exit(1); + } + + const metadataFile = values.metadata ?? DEFAULT_PATHS.metadata; + const fieldValuesFile = values["no-field-values"] + ? undefined + : (values["field-values"] ?? DEFAULT_PATHS.fieldValues); + + const stats = await uploadMetadata({ + metadataFile, + fieldValuesFile, + instanceUrl, + apiKey, + }); + console.log(formatUploadReport(stats, Boolean(fieldValuesFile))); + process.exit(hasAnyErrors(stats) ? 1 : 0); +} + +function formatStepLine(label: string, step: UploadStepStats): string { + const total = step.mapped + step.errors; + return `${label} ${step.mapped}/${total} mapped (${step.errors} errors)`; +} + +function formatFieldsLine(stats: UploadMetadataResult["fieldsInsert"]): string { + const total = stats.mapped + stats.errors; + return `Fields: ${stats.mapped}/${total} mapped (${stats.inserted} inserted, ${stats.matched} matched, ${stats.errors} errors)`; +} + +function formatFinalizeLine( + finalize: UploadStepStats, + insertedCount: number, +): string { + const base = formatStepLine("Finalized: ", finalize); + if (insertedCount === 0 && finalize.errors === 0) { + return `${base} — no newly-inserted fields to finalize`; + } + return base; +} + +function formatUploadReport( + stats: UploadMetadataResult, + fieldValuesRan: boolean, +): string { + const lines = [ + formatStepLine("Databases: ", stats.databases), + formatStepLine("Tables: ", stats.tables), + formatFieldsLine(stats.fieldsInsert), + formatFinalizeLine(stats.fieldsFinalize, stats.fieldsInsert.inserted), + ]; + if (fieldValuesRan) { + lines.push(formatStepLine("Values: ", stats.fieldValues)); + } + return lines.join("\n"); +} + +function hasAnyErrors(stats: UploadMetadataResult): boolean { + return Object.values(stats).some((step) => step.errors > 0); +} + +async function handleDownloadMetadata( + positionals: string[], + values: ParsedValues, +): Promise { + const instanceUrl = positionals[1]; + + if (!instanceUrl) { + console.error("Error: argument is required"); + process.exit(1); + } + + const apiKey = values["api-key"] ?? process.env.METABASE_API_KEY; + if (!apiKey) { + console.error( + "Error: API key is required (pass --api-key or set METABASE_API_KEY)", + ); + process.exit(1); + } + + const metadataFile = values.metadata ?? DEFAULT_PATHS.metadata; + const fieldValuesFile = values["no-field-values"] + ? undefined + : (values["field-values"] ?? DEFAULT_PATHS.fieldValues); + const extractFolder = values["no-extract"] + ? undefined + : (values.extract ?? DEFAULT_PATHS.extract); + + const result = await downloadMetadata({ + instanceUrl, + apiKey, + metadataFile, + fieldValuesFile, + extractFolder, + }); + const lines = [`Metadata: ${result.metadataFile}`]; + if (result.fieldValuesFile) { + lines.push(`Field values: ${result.fieldValuesFile}`); + } + if (result.extractFolder) { + lines.push(`Extracted to: ${result.extractFolder}`); + } + console.log(lines.join("\n")); + process.exit(0); +} + function handleExtractSpec(values: ParsedValues): void { const { target } = extractSpec({ file: values.file ?? "spec.md" }); console.log(`Spec extracted to ${target}`); process.exit(0); } -function main(): void { +async function main(): Promise { const { values, positionals } = parseArguments(); const command = positionals[0]; @@ -102,10 +263,17 @@ function main(): void { return handleExtractFieldValues(positionals); case "extract-spec": return handleExtractSpec(values); + case "upload-metadata": + return handleUploadMetadata(positionals, values); + case "download-metadata": + return handleDownloadMetadata(positionals, values); default: console.error(`Unknown command: ${command}`); process.exit(1); } } -main(); +main().catch((error) => { + console.error(error instanceof Error ? error.message : String(error)); + process.exit(1); +}); diff --git a/bun.lock b/bun.lock index de5ae96..9d047e0 100644 --- a/bun.lock +++ b/bun.lock @@ -5,6 +5,7 @@ "": { "name": "@metabase/database-metadata", "dependencies": { + "@streamparser/json-node": "^0.0.22", "js-yaml": "^4.1.0", }, "devDependencies": { @@ -14,12 +15,65 @@ "@types/node": "^25.6.0", "eslint": "^10.2.1", "oxfmt": "^0.45.0", + "tsx": "^4.21.0", "typescript": "^6.0.3", "typescript-eslint": "^8.58.2", }, }, }, "packages": { + "@esbuild/aix-ppc64": ["@esbuild/aix-ppc64@0.27.7", "", { "os": "aix", "cpu": "ppc64" }, "sha512-EKX3Qwmhz1eMdEJokhALr0YiD0lhQNwDqkPYyPhiSwKrh7/4KRjQc04sZ8db+5DVVnZ1LmbNDI1uAMPEUBnQPg=="], + + "@esbuild/android-arm": ["@esbuild/android-arm@0.27.7", "", { "os": "android", "cpu": "arm" }, "sha512-jbPXvB4Yj2yBV7HUfE2KHe4GJX51QplCN1pGbYjvsyCZbQmies29EoJbkEc+vYuU5o45AfQn37vZlyXy4YJ8RQ=="], + + "@esbuild/android-arm64": ["@esbuild/android-arm64@0.27.7", "", { "os": "android", "cpu": "arm64" }, "sha512-62dPZHpIXzvChfvfLJow3q5dDtiNMkwiRzPylSCfriLvZeq0a1bWChrGx/BbUbPwOrsWKMn8idSllklzBy+dgQ=="], + + "@esbuild/android-x64": ["@esbuild/android-x64@0.27.7", "", { "os": "android", "cpu": "x64" }, "sha512-x5VpMODneVDb70PYV2VQOmIUUiBtY3D3mPBG8NxVk5CogneYhkR7MmM3yR/uMdITLrC1ml/NV1rj4bMJuy9MCg=="], + + "@esbuild/darwin-arm64": ["@esbuild/darwin-arm64@0.27.7", "", { "os": "darwin", "cpu": "arm64" }, "sha512-5lckdqeuBPlKUwvoCXIgI2D9/ABmPq3Rdp7IfL70393YgaASt7tbju3Ac+ePVi3KDH6N2RqePfHnXkaDtY9fkw=="], + + "@esbuild/darwin-x64": ["@esbuild/darwin-x64@0.27.7", "", { "os": "darwin", "cpu": "x64" }, "sha512-rYnXrKcXuT7Z+WL5K980jVFdvVKhCHhUwid+dDYQpH+qu+TefcomiMAJpIiC2EM3Rjtq0sO3StMV/+3w3MyyqQ=="], + + "@esbuild/freebsd-arm64": ["@esbuild/freebsd-arm64@0.27.7", "", { "os": "freebsd", "cpu": "arm64" }, "sha512-B48PqeCsEgOtzME2GbNM2roU29AMTuOIN91dsMO30t+Ydis3z/3Ngoj5hhnsOSSwNzS+6JppqWsuhTp6E82l2w=="], + + "@esbuild/freebsd-x64": ["@esbuild/freebsd-x64@0.27.7", "", { "os": "freebsd", "cpu": "x64" }, "sha512-jOBDK5XEjA4m5IJK3bpAQF9/Lelu/Z9ZcdhTRLf4cajlB+8VEhFFRjWgfy3M1O4rO2GQ/b2dLwCUGpiF/eATNQ=="], + + "@esbuild/linux-arm": ["@esbuild/linux-arm@0.27.7", "", { "os": "linux", "cpu": "arm" }, "sha512-RkT/YXYBTSULo3+af8Ib0ykH8u2MBh57o7q/DAs3lTJlyVQkgQvlrPTnjIzzRPQyavxtPtfg0EopvDyIt0j1rA=="], + + "@esbuild/linux-arm64": ["@esbuild/linux-arm64@0.27.7", "", { "os": "linux", "cpu": "arm64" }, "sha512-RZPHBoxXuNnPQO9rvjh5jdkRmVizktkT7TCDkDmQ0W2SwHInKCAV95GRuvdSvA7w4VMwfCjUiPwDi0ZO6Nfe9A=="], + + "@esbuild/linux-ia32": ["@esbuild/linux-ia32@0.27.7", "", { "os": "linux", "cpu": "ia32" }, "sha512-GA48aKNkyQDbd3KtkplYWT102C5sn/EZTY4XROkxONgruHPU72l+gW+FfF8tf2cFjeHaRbWpOYa/uRBz/Xq1Pg=="], + + "@esbuild/linux-loong64": ["@esbuild/linux-loong64@0.27.7", "", { "os": "linux", "cpu": "none" }, "sha512-a4POruNM2oWsD4WKvBSEKGIiWQF8fZOAsycHOt6JBpZ+JN2n2JH9WAv56SOyu9X5IqAjqSIPTaJkqN8F7XOQ5Q=="], + + "@esbuild/linux-mips64el": ["@esbuild/linux-mips64el@0.27.7", "", { "os": "linux", "cpu": "none" }, "sha512-KabT5I6StirGfIz0FMgl1I+R1H73Gp0ofL9A3nG3i/cYFJzKHhouBV5VWK1CSgKvVaG4q1RNpCTR2LuTVB3fIw=="], + + "@esbuild/linux-ppc64": ["@esbuild/linux-ppc64@0.27.7", "", { "os": "linux", "cpu": "ppc64" }, "sha512-gRsL4x6wsGHGRqhtI+ifpN/vpOFTQtnbsupUF5R5YTAg+y/lKelYR1hXbnBdzDjGbMYjVJLJTd2OFmMewAgwlQ=="], + + "@esbuild/linux-riscv64": ["@esbuild/linux-riscv64@0.27.7", "", { "os": "linux", "cpu": "none" }, "sha512-hL25LbxO1QOngGzu2U5xeXtxXcW+/GvMN3ejANqXkxZ/opySAZMrc+9LY/WyjAan41unrR3YrmtTsUpwT66InQ=="], + + "@esbuild/linux-s390x": ["@esbuild/linux-s390x@0.27.7", "", { "os": "linux", "cpu": "s390x" }, "sha512-2k8go8Ycu1Kb46vEelhu1vqEP+UeRVj2zY1pSuPdgvbd5ykAw82Lrro28vXUrRmzEsUV0NzCf54yARIK8r0fdw=="], + + "@esbuild/linux-x64": ["@esbuild/linux-x64@0.27.7", "", { "os": "linux", "cpu": "x64" }, "sha512-hzznmADPt+OmsYzw1EE33ccA+HPdIqiCRq7cQeL1Jlq2gb1+OyWBkMCrYGBJ+sxVzve2ZJEVeePbLM2iEIZSxA=="], + + "@esbuild/netbsd-arm64": ["@esbuild/netbsd-arm64@0.27.7", "", { "os": "none", "cpu": "arm64" }, "sha512-b6pqtrQdigZBwZxAn1UpazEisvwaIDvdbMbmrly7cDTMFnw/+3lVxxCTGOrkPVnsYIosJJXAsILG9XcQS+Yu6w=="], + + "@esbuild/netbsd-x64": ["@esbuild/netbsd-x64@0.27.7", "", { "os": "none", "cpu": "x64" }, "sha512-OfatkLojr6U+WN5EDYuoQhtM+1xco+/6FSzJJnuWiUw5eVcicbyK3dq5EeV/QHT1uy6GoDhGbFpprUiHUYggrw=="], + + "@esbuild/openbsd-arm64": ["@esbuild/openbsd-arm64@0.27.7", "", { "os": "openbsd", "cpu": "arm64" }, "sha512-AFuojMQTxAz75Fo8idVcqoQWEHIXFRbOc1TrVcFSgCZtQfSdc1RXgB3tjOn/krRHENUB4j00bfGjyl2mJrU37A=="], + + "@esbuild/openbsd-x64": ["@esbuild/openbsd-x64@0.27.7", "", { "os": "openbsd", "cpu": "x64" }, "sha512-+A1NJmfM8WNDv5CLVQYJ5PshuRm/4cI6WMZRg1by1GwPIQPCTs1GLEUHwiiQGT5zDdyLiRM/l1G0Pv54gvtKIg=="], + + "@esbuild/openharmony-arm64": ["@esbuild/openharmony-arm64@0.27.7", "", { "os": "none", "cpu": "arm64" }, "sha512-+KrvYb/C8zA9CU/g0sR6w2RBw7IGc5J2BPnc3dYc5VJxHCSF1yNMxTV5LQ7GuKteQXZtspjFbiuW5/dOj7H4Yw=="], + + "@esbuild/sunos-x64": ["@esbuild/sunos-x64@0.27.7", "", { "os": "sunos", "cpu": "x64" }, "sha512-ikktIhFBzQNt/QDyOL580ti9+5mL/YZeUPKU2ivGtGjdTYoqz6jObj6nOMfhASpS4GU4Q/Clh1QtxWAvcYKamA=="], + + "@esbuild/win32-arm64": ["@esbuild/win32-arm64@0.27.7", "", { "os": "win32", "cpu": "arm64" }, "sha512-7yRhbHvPqSpRUV7Q20VuDwbjW5kIMwTHpptuUzV+AA46kiPze5Z7qgt6CLCK3pWFrHeNfDd1VKgyP4O+ng17CA=="], + + "@esbuild/win32-ia32": ["@esbuild/win32-ia32@0.27.7", "", { "os": "win32", "cpu": "ia32" }, "sha512-SmwKXe6VHIyZYbBLJrhOoCJRB/Z1tckzmgTLfFYOfpMAx63BJEaL9ExI8x7v0oAO3Zh6D/Oi1gVxEYr5oUCFhw=="], + + "@esbuild/win32-x64": ["@esbuild/win32-x64@0.27.7", "", { "os": "win32", "cpu": "x64" }, "sha512-56hiAJPhwQ1R4i+21FVF7V8kSD5zZTdHcVuRFMW0hn753vVfQN8xlx4uOPT4xoGH0Z/oVATuR82AiqSTDIpaHg=="], + "@eslint-community/eslint-utils": ["@eslint-community/eslint-utils@4.9.1", "", { "dependencies": { "eslint-visitor-keys": "^3.4.3" }, "peerDependencies": { "eslint": "^6.0.0 || ^7.0.0 || >=8.0.0" } }, "sha512-phrYmNiYppR7znFEdqgfWHXR6NCkZEK7hwWDHZUjit/2/U0r6XvkDl0SYnoM51Hq7FhCGdLDT6zxCCOY1hexsQ=="], "@eslint-community/regexpp": ["@eslint-community/regexpp@4.12.2", "", {}, "sha512-EriSTlt5OC9/7SXkRSCAhfSxxoSUgBm33OH+IkwbdpgoqsSsUg7y3uh+IICI/Qg4BBWr3U2i39RpmycbxMq4ew=="], @@ -84,6 +138,10 @@ "@oxfmt/binding-win32-x64-msvc": ["@oxfmt/binding-win32-x64-msvc@0.45.0", "", { "os": "win32", "cpu": "x64" }, "sha512-w5MMTRCK1dpQeRA+HHqXQXyN33DlG/N2LOYxJmaT4fJjcmZrbNnqw7SmIk7I2/a2493PPLZ+2E/Ar6t2iKVMug=="], + "@streamparser/json": ["@streamparser/json@0.0.22", "", {}, "sha512-b6gTSBjJ8G8SuO3Gbbj+zXbVx8NSs1EbpbMKpzGLWMdkR+98McH9bEjSz3+0mPJf68c5nxa3CrJHp5EQNXM6zQ=="], + + "@streamparser/json-node": ["@streamparser/json-node@0.0.22", "", { "dependencies": { "@streamparser/json": "^0.0.22" } }, "sha512-sJT2ptNRwqB1lIsQrQlCoWk5rF4tif9wDh+7yluAGijJamAhrHGYpFB/Zg3hJeceoZypi74ftXk8DHzwYpbZSg=="], + "@types/bun": ["@types/bun@1.3.12", "", { "dependencies": { "bun-types": "1.3.12" } }, "sha512-DBv81elK+/VSwXHDlnH3Qduw+KxkTIWi7TXkAeh24zpi5l0B2kUg9Ga3tb4nJaPcOFswflgi/yAvMVBPrxMB+A=="], "@types/esrecurse": ["@types/esrecurse@4.3.1", "", {}, "sha512-xJBAbDifo5hpffDBuHl0Y8ywswbiAp/Wi7Y/GtAgSlZyIABppyurxVueOPE8LUQOxdlgi6Zqce7uoEpqNTeiUw=="], @@ -136,6 +194,8 @@ "deep-is": ["deep-is@0.1.4", "", {}, "sha512-oIPzksmTg4/MriiaYGO+okXDT7ztn/w3Eptv/+gSIdMdKsJo0u4CfYNFJPy+4SKMuCqGw2wxnA+URMg3t8a/bQ=="], + "esbuild": ["esbuild@0.27.7", "", { "optionalDependencies": { "@esbuild/aix-ppc64": "0.27.7", "@esbuild/android-arm": "0.27.7", "@esbuild/android-arm64": "0.27.7", "@esbuild/android-x64": "0.27.7", "@esbuild/darwin-arm64": "0.27.7", "@esbuild/darwin-x64": "0.27.7", "@esbuild/freebsd-arm64": "0.27.7", "@esbuild/freebsd-x64": "0.27.7", "@esbuild/linux-arm": "0.27.7", "@esbuild/linux-arm64": "0.27.7", "@esbuild/linux-ia32": "0.27.7", "@esbuild/linux-loong64": "0.27.7", "@esbuild/linux-mips64el": "0.27.7", "@esbuild/linux-ppc64": "0.27.7", "@esbuild/linux-riscv64": "0.27.7", "@esbuild/linux-s390x": "0.27.7", "@esbuild/linux-x64": "0.27.7", "@esbuild/netbsd-arm64": "0.27.7", "@esbuild/netbsd-x64": "0.27.7", "@esbuild/openbsd-arm64": "0.27.7", "@esbuild/openbsd-x64": "0.27.7", "@esbuild/openharmony-arm64": "0.27.7", "@esbuild/sunos-x64": "0.27.7", "@esbuild/win32-arm64": "0.27.7", "@esbuild/win32-ia32": "0.27.7", "@esbuild/win32-x64": "0.27.7" }, "bin": { "esbuild": "bin/esbuild" } }, "sha512-IxpibTjyVnmrIQo5aqNpCgoACA/dTKLTlhMHihVHhdkxKyPO1uBBthumT0rdHmcsk9uMonIWS0m4FljWzILh3w=="], + "escape-string-regexp": ["escape-string-regexp@4.0.0", "", {}, "sha512-TtpcNJ3XAzx3Gq8sWRzJaVajRs0uVxA2YAkdb1jm2YkPz4G6egUFAyA3n5vtEIZefPk5Wa4UXbKuS5fKkJWdgA=="], "eslint": ["eslint@10.2.1", "", { "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.2", "@eslint/config-array": "^0.23.5", "@eslint/config-helpers": "^0.5.5", "@eslint/core": "^1.2.1", "@eslint/plugin-kit": "^0.7.1", "@humanfs/node": "^0.16.6", "@humanwhocodes/module-importer": "^1.0.1", "@humanwhocodes/retry": "^0.4.2", "@types/estree": "^1.0.6", "ajv": "^6.14.0", "cross-spawn": "^7.0.6", "debug": "^4.3.2", "escape-string-regexp": "^4.0.0", "eslint-scope": "^9.1.2", "eslint-visitor-keys": "^5.0.1", "espree": "^11.2.0", "esquery": "^1.7.0", "esutils": "^2.0.2", "fast-deep-equal": "^3.1.3", "file-entry-cache": "^8.0.0", "find-up": "^5.0.0", "glob-parent": "^6.0.2", "ignore": "^5.2.0", "imurmurhash": "^0.1.4", "is-glob": "^4.0.0", "json-stable-stringify-without-jsonify": "^1.0.1", "minimatch": "^10.2.4", "natural-compare": "^1.4.0", "optionator": "^0.9.3" }, "peerDependencies": { "jiti": "*" }, "optionalPeers": ["jiti"], "bin": { "eslint": "bin/eslint.js" } }, "sha512-wiyGaKsDgqXvF40P8mDwiUp/KQjE1FdrIEJsM8PZ3XCiniTMXS3OHWWUe5FI5agoCnr8x4xPrTDZuxsBlNHl+Q=="], @@ -170,6 +230,10 @@ "flatted": ["flatted@3.4.2", "", {}, "sha512-PjDse7RzhcPkIJwy5t7KPWQSZ9cAbzQXcafsetQoD7sOJRQlGikNbx7yZp2OotDnJyrDcbyRq3Ttb18iYOqkxA=="], + "fsevents": ["fsevents@2.3.3", "", { "os": "darwin" }, "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw=="], + + "get-tsconfig": ["get-tsconfig@4.14.0", "", { "dependencies": { "resolve-pkg-maps": "^1.0.0" } }, "sha512-yTb+8DXzDREzgvYmh6s9vHsSVCHeC0G3PI5bEXNBHtmshPnO+S5O7qgLEOn0I5QvMy6kpZN8K1NKGyilLb93wA=="], + "glob-parent": ["glob-parent@6.0.2", "", { "dependencies": { "is-glob": "^4.0.3" } }, "sha512-XxwI8EOhVQgWp6iDL+3b0r86f4d6AX6zSU55HfB4ydCEuXLXc5FcYeOu+nnGftS4TEju/11rt4KJPTMgbfmv4A=="], "ignore": ["ignore@5.3.2", "", {}, "sha512-hsBTNUqQTDwkWtcdYI2i06Y/nUBEsNEDJKjWdigLvegy8kDuJAS8uRlpkkcQpyEXL0Z/pjDy5HBmMjRCJ2gq+g=="], @@ -220,6 +284,8 @@ "punycode": ["punycode@2.3.1", "", {}, "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg=="], + "resolve-pkg-maps": ["resolve-pkg-maps@1.0.0", "", {}, "sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw=="], + "semver": ["semver@7.7.4", "", { "bin": { "semver": "bin/semver.js" } }, "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA=="], "shebang-command": ["shebang-command@2.0.0", "", { "dependencies": { "shebang-regex": "^3.0.0" } }, "sha512-kHxr2zZpYtdmrN1qDjrrX/Z1rR1kG8Dx+gkpK1G4eXmvXswmcE1hTWBWYUzlraYw1/yZp6YuDY77YtvbN0dmDA=="], @@ -232,6 +298,8 @@ "ts-api-utils": ["ts-api-utils@2.5.0", "", { "peerDependencies": { "typescript": ">=4.8.4" } }, "sha512-OJ/ibxhPlqrMM0UiNHJ/0CKQkoKF243/AEmplt3qpRgkW8VG7IfOS41h7V8TjITqdByHzrjcS/2si+y4lIh8NA=="], + "tsx": ["tsx@4.21.0", "", { "dependencies": { "esbuild": "~0.27.0", "get-tsconfig": "^4.7.5" }, "optionalDependencies": { "fsevents": "~2.3.3" }, "bin": { "tsx": "dist/cli.mjs" } }, "sha512-5C1sg4USs1lfG0GFb2RLXsdpXqBSEhAaA/0kPL01wxzpMqLILNxIxIOKiILz+cdg/pLnOUxFYOR5yhHU666wbw=="], + "type-check": ["type-check@0.4.0", "", { "dependencies": { "prelude-ls": "^1.2.1" } }, "sha512-XleUoc9uwGXqjWwXaUTZAmzMcFZ5858QA2vvx1Ur5xIcixXIP+8LnFDgRplU30us6teqdlskFfu+ae4K79Ooew=="], "typescript": ["typescript@6.0.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-y2TvuxSZPDyQakkFRPZHKFm+KKVqIisdg9/CZwm9ftvKXLP8NRWj38/ODjNbr43SsoXqNuAisEf1GdCxqWcdBw=="], diff --git a/core-spec/v1/spec.md b/core-spec/v1/spec.md index abaa627..3d3633d 100644 --- a/core-spec/v1/spec.md +++ b/core-spec/v1/spec.md @@ -1,6 +1,6 @@ # Metabase Database Metadata Format -**Version:** 1.1.0 +**Version:** 1.0.3 ## Overview diff --git a/package.json b/package.json index ac9fc58..a63fd5f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@metabase/database-metadata", - "version": "1.0.2", + "version": "1.0.3", "description": "CLI tool to extract Metabase database metadata into YAML files", "license": "SEE LICENSE IN LICENSE.txt", "repository": { @@ -39,6 +39,7 @@ "test": "bun test" }, "dependencies": { + "@streamparser/json-node": "^0.0.22", "js-yaml": "^4.1.0" }, "devDependencies": { diff --git a/src/download-metadata.test.ts b/src/download-metadata.test.ts new file mode 100644 index 0000000..2d8774a --- /dev/null +++ b/src/download-metadata.test.ts @@ -0,0 +1,174 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { existsSync, mkdtempSync, readFileSync, rmSync, statSync } from "fs"; +import { tmpdir } from "os"; +import { join, resolve } from "path"; + +import { downloadMetadata } from "./download-metadata.js"; + +const REPO_ROOT = resolve(import.meta.dirname, ".."); +const EXAMPLE_METADATA = join(REPO_ROOT, "examples/v1/metadata.json"); +const EXAMPLE_FIELD_VALUES = join(REPO_ROOT, "examples/v1/field-values.json"); + +type MockServerControl = { + baseUrl: string; + apiKeysSeen: string[]; + stop: () => Promise; +}; + +type MockServerOptions = { + metadataStatus?: number; + fieldValuesStatus?: number; +}; + +function startMockServer(options: MockServerOptions = {}): MockServerControl { + const apiKeysSeen: string[] = []; + const metadataStatus = options.metadataStatus ?? 200; + const fieldValuesStatus = options.fieldValuesStatus ?? 200; + + const server = Bun.serve({ + port: 0, + fetch(request) { + const url = new URL(request.url); + apiKeysSeen.push(request.headers.get("X-API-Key") ?? ""); + if (url.pathname === "/api/database/metadata") { + if (metadataStatus !== 200) { + return new Response("boom", { status: metadataStatus }); + } + return new Response(Bun.file(EXAMPLE_METADATA)); + } + if (url.pathname === "/api/database/field-values") { + if (fieldValuesStatus !== 200) { + return new Response("boom", { status: fieldValuesStatus }); + } + return new Response(Bun.file(EXAMPLE_FIELD_VALUES)); + } + return new Response("not found", { status: 404 }); + }, + }); + + return { + baseUrl: `http://127.0.0.1:${server.port}`, + apiKeysSeen, + stop: () => server.stop(), + }; +} + +describe("downloadMetadata", () => { + let workdir: string; + let mock: MockServerControl; + + beforeEach(() => { + workdir = mkdtempSync(join(tmpdir(), "download-metadata-")); + mock = startMockServer(); + }); + + afterEach(async () => { + await mock.stop(); + rmSync(workdir, { recursive: true, force: true }); + }); + + it("streams metadata.json to the configured path", async () => { + const metadataFile = join(workdir, "nested", "metadata.json"); + const result = await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "test-key", + metadataFile, + }); + expect(result.metadataFile).toBe(metadataFile); + expect(existsSync(metadataFile)).toBe(true); + const downloaded = readFileSync(metadataFile, "utf8"); + const expected = readFileSync(EXAMPLE_METADATA, "utf8"); + expect(downloaded).toBe(expected); + expect(mock.apiKeysSeen).toEqual(["test-key"]); + }); + + it("downloads field-values only when a path is given", async () => { + const metadataFile = join(workdir, "metadata.json"); + const fieldValuesFile = join(workdir, "values.json"); + const result = await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + fieldValuesFile, + }); + expect(result.fieldValuesFile).toBe(fieldValuesFile); + expect(existsSync(fieldValuesFile)).toBe(true); + expect(statSync(fieldValuesFile).size).toBeGreaterThan(0); + }); + + it("extracts YAML when an extract folder is given", async () => { + const metadataFile = join(workdir, "metadata.json"); + const fieldValuesFile = join(workdir, "values.json"); + const extractFolder = join(workdir, "databases"); + const result = await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + fieldValuesFile, + extractFolder, + }); + expect(result.extractFolder).toBe(extractFolder); + + const ordersYaml = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", + ); + expect(existsSync(ordersYaml)).toBe(true); + + const stateValues = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/PEOPLE/STATE.yaml", + ); + expect(existsSync(stateValues)).toBe(true); + }); + + it("skips field-values extraction when no field-values path is given", async () => { + const metadataFile = join(workdir, "metadata.json"); + const extractFolder = join(workdir, "databases"); + await downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + extractFolder, + }); + const ordersYaml = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/ORDERS.yaml", + ); + expect(existsSync(ordersYaml)).toBe(true); + const stateValues = join( + extractFolder, + "Sample Database/schemas/PUBLIC/tables/PEOPLE/STATE.yaml", + ); + expect(existsSync(stateValues)).toBe(false); + }); + + it("throws on non-200 metadata response and does not write the file", async () => { + await mock.stop(); + mock = startMockServer({ metadataStatus: 401 }); + const metadataFile = join(workdir, "metadata.json"); + await expect( + downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + }), + ).rejects.toThrow(/401/); + expect(existsSync(metadataFile)).toBe(false); + }); + + it("throws on non-200 field-values response", async () => { + await mock.stop(); + mock = startMockServer({ fieldValuesStatus: 500 }); + const metadataFile = join(workdir, "metadata.json"); + const fieldValuesFile = join(workdir, "values.json"); + await expect( + downloadMetadata({ + instanceUrl: mock.baseUrl, + apiKey: "k", + metadataFile, + fieldValuesFile, + }), + ).rejects.toThrow(/500/); + }); +}); diff --git a/src/download-metadata.ts b/src/download-metadata.ts new file mode 100644 index 0000000..3d6f34b --- /dev/null +++ b/src/download-metadata.ts @@ -0,0 +1,89 @@ +import { createWriteStream, mkdirSync } from "node:fs"; +import { dirname } from "node:path"; +import { Readable } from "node:stream"; +import { pipeline } from "node:stream/promises"; + +import { extractFieldValues } from "./extract-field-values.js"; +import { extractMetadata } from "./extract-metadata.js"; + +export const DOWNLOAD_PATHS = { + metadata: "/api/database/metadata", + fieldValues: "/api/database/field-values", +} as const; + +export type DownloadMetadataOptions = { + instanceUrl: string; + apiKey: string; + metadataFile: string; + fieldValuesFile?: string; + extractFolder?: string; +}; + +export type DownloadMetadataResult = { + metadataFile: string; + fieldValuesFile?: string; + extractFolder?: string; +}; + +function joinUrl(baseUrl: string, path: string): string { + return `${baseUrl.replace(/\/$/, "")}${path}`; +} + +async function streamDownload( + url: string, + apiKey: string, + destination: string, +): Promise { + const response = await fetch(url, { + headers: { "X-API-Key": apiKey }, + }); + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error( + `GET ${url} failed: ${response.status} ${response.statusText} ${text}`.trim(), + ); + } + if (!response.body) { + throw new Error(`GET ${url} returned an empty body`); + } + mkdirSync(dirname(destination), { recursive: true }); + await pipeline( + Readable.fromWeb(response.body as Parameters[0]), + createWriteStream(destination), + ); +} + +export async function downloadMetadata({ + instanceUrl, + apiKey, + metadataFile, + fieldValuesFile, + extractFolder, +}: DownloadMetadataOptions): Promise { + await streamDownload( + joinUrl(instanceUrl, DOWNLOAD_PATHS.metadata), + apiKey, + metadataFile, + ); + + if (fieldValuesFile) { + await streamDownload( + joinUrl(instanceUrl, DOWNLOAD_PATHS.fieldValues), + apiKey, + fieldValuesFile, + ); + } + + if (extractFolder) { + extractMetadata({ inputFile: metadataFile, outputFolder: extractFolder }); + if (fieldValuesFile) { + extractFieldValues({ + metadataFile, + fieldValuesFile, + outputFolder: extractFolder, + }); + } + } + + return { metadataFile, fieldValuesFile, extractFolder }; +} diff --git a/src/index.ts b/src/index.ts index c904ea4..e0e9b87 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,3 +13,14 @@ export { type ExtractSpecOptions, type ExtractSpecResult, } from "./extract-spec.js"; +export { + uploadMetadata, + type UploadMetadataOptions, + type UploadMetadataResult, + type UploadStepStats, +} from "./upload-metadata.js"; +export { + downloadMetadata, + type DownloadMetadataOptions, + type DownloadMetadataResult, +} from "./download-metadata.js"; diff --git a/src/ndjson.ts b/src/ndjson.ts new file mode 100644 index 0000000..5da7e4c --- /dev/null +++ b/src/ndjson.ts @@ -0,0 +1,137 @@ +export type PostNdjsonOptions = { + url: string; + apiKey: string; + requests: AsyncIterable; + onResponse: (response: Res, index: number) => void | Promise; + /** Max rows per HTTP request. Default 2000. */ + batchSize?: number; +}; + +// Cap rows-per-HTTP-POST so each request stays within one server-side DB +// transaction. The Metabase NDJSON endpoints partition inserts in groups of +// 2000 per transaction; sending more than that per POST forces multiple +// transactions inside a single request, during which the server stops reading +// body bytes long enough for Jetty's idle timeout to drop the tail. Matching +// the server's 2000 keeps every POST to exactly one transaction with minimum +// round-trips. +const DEFAULT_BATCH_SIZE = 2000; + +export async function postNdjson({ + url, + apiKey, + requests, + onResponse, + batchSize = DEFAULT_BATCH_SIZE, +}: PostNdjsonOptions): Promise { + let globalIndex = 0; + + for await (const batch of batchAsyncIterable(requests, batchSize)) { + const batchOffset = globalIndex; + await postNdjsonBatch({ + url, + apiKey, + batch, + onResponse: (response, localIndex) => + onResponse(response, batchOffset + localIndex), + }); + globalIndex += batch.length; + } +} + +async function* batchAsyncIterable( + source: AsyncIterable, + size: number, +): AsyncGenerator { + let current: T[] = []; + for await (const item of source) { + current.push(item); + if (current.length >= size) { + yield current; + current = []; + } + } + if (current.length > 0) { + yield current; + } +} + +type PostBatchOptions = { + url: string; + apiKey: string; + batch: Req[]; + onResponse: (response: Res, index: number) => void | Promise; +}; + +async function postNdjsonBatch({ + url, + apiKey, + batch, + onResponse, +}: PostBatchOptions): Promise { + if (batch.length === 0) { + return; + } + + const body = new TextEncoder().encode( + batch.map((value) => JSON.stringify(value)).join("\n") + "\n", + ); + + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/x-ndjson", + "X-API-Key": apiKey, + }, + body, + }); + + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error( + `POST ${url} failed: ${response.status} ${response.statusText} ${text}`.trim(), + ); + } + if (!response.body) { + throw new Error(`POST ${url} returned an empty body`); + } + + let received = 0; + for await (const parsedLine of parseNdjsonStream(response.body)) { + await onResponse(parsedLine, received); + received += 1; + } + + if (received < batch.length) { + throw new Error( + `POST ${url}: server acknowledged ${received} of ${batch.length} sent rows — ${batch.length - received} rows dropped by the server (likely a per-row error terminated the stream)`, + ); + } +} + +export async function* parseNdjsonStream( + stream: AsyncIterable, +): AsyncGenerator { + const decoder = new TextDecoder(); + let pending = ""; + + for await (const chunk of stream) { + const buffer = pending + decoder.decode(chunk, { stream: true }); + const lastNewline = buffer.lastIndexOf("\n"); + if (lastNewline === -1) { + pending = buffer; + continue; + } + for (const line of buffer.slice(0, lastNewline).split("\n")) { + const trimmed = line.trim(); + if (trimmed.length > 0) { + yield JSON.parse(trimmed) as T; + } + } + pending = buffer.slice(lastNewline + 1); + } + + const trailing = pending.trim(); + if (trailing.length > 0) { + yield JSON.parse(trailing) as T; + } +} diff --git a/src/stream-json.ts b/src/stream-json.ts new file mode 100644 index 0000000..1fe696b --- /dev/null +++ b/src/stream-json.ts @@ -0,0 +1,22 @@ +import { createReadStream } from "node:fs"; +import { JSONParser } from "@streamparser/json-node"; + +type ParsedElement = { value: T }; + +export async function* streamJsonElements( + filePath: string, + jsonPath: string, +): AsyncGenerator { + const parser = new JSONParser({ paths: [jsonPath], keepStack: false }); + const fileStream = createReadStream(filePath); + fileStream.pipe(parser); + try { + for await (const chunk of parser) { + yield (chunk as ParsedElement).value; + } + } finally { + fileStream.unpipe(parser); + fileStream.destroy(); + parser.destroy(); + } +} diff --git a/src/upload-metadata.test.ts b/src/upload-metadata.test.ts new file mode 100644 index 0000000..d9f54ee --- /dev/null +++ b/src/upload-metadata.test.ts @@ -0,0 +1,359 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { join, resolve } from "path"; + +import { parseNdjsonStream } from "./ndjson.js"; +import { uploadMetadata } from "./upload-metadata.js"; + +const REPO_ROOT = resolve(import.meta.dirname, ".."); +const EXAMPLE_METADATA = join(REPO_ROOT, "examples/v1/metadata.json"); +const EXAMPLE_FIELD_VALUES = join(REPO_ROOT, "examples/v1/field-values.json"); + +const DB_OFFSET = 1000; +const TABLE_OFFSET = 2000; +const FIELD_OFFSET = 3000; + +type RecordedCall = { + path: string; + contentType: string; + transferEncoding: string | null; + contentLength: string | null; + apiKey: string | null; + lines: unknown[]; +}; + +type MockServerControl = { + baseUrl: string; + calls: RecordedCall[]; + stop: () => Promise; + setFieldInsertBehavior: (behavior: FieldInsertBehavior) => void; + setFieldFailure: (oldId: number) => void; +}; + +type FieldInsertBehavior = "new" | "existing" | "alternate"; + +type IdLine = { id: number }; +type TableLine = { id: number; db_id: number }; +type FieldInsertLine = Record & { table_id: number }; +type FinalizeLine = { + id: number; + parent_id: number | null; + fk_target_field_id: number | null; +}; +type FieldValuesLine = { field_id: number }; + +async function readNdjsonLines( + stream: ReadableStream, +): Promise { + const lines: unknown[] = []; + for await (const line of parseNdjsonStream(stream)) { + lines.push(line); + } + return lines; +} + +function ndjsonStreamResponse(responses: AsyncIterable): Response { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + async start(controller) { + try { + for await (const response of responses) { + controller.enqueue(encoder.encode(JSON.stringify(response) + "\n")); + } + controller.close(); + } catch (error) { + controller.error(error); + } + }, + }); + return new Response(body, { + headers: { "Content-Type": "application/x-ndjson" }, + }); +} + +function startMockServer(): MockServerControl { + const calls: RecordedCall[] = []; + let fieldInsertBehavior: FieldInsertBehavior = "new"; + const fieldFailures = new Set(); + let fieldInsertCounter = 0; + + const server = Bun.serve({ + port: 0, + async fetch(request) { + const url = new URL(request.url); + const path = url.pathname; + const contentType = request.headers.get("Content-Type") ?? ""; + const transferEncoding = request.headers.get("Transfer-Encoding"); + const contentLength = request.headers.get("Content-Length"); + const apiKey = request.headers.get("X-API-Key"); + const lines = request.body ? await readNdjsonLines(request.body) : []; + calls.push({ + path, + contentType, + transferEncoding, + contentLength, + apiKey, + lines, + }); + + switch (path) { + case "/api/database/metadata/databases": { + async function* responses() { + for (const line of lines as IdLine[]) { + yield { old_id: line.id, new_id: line.id + DB_OFFSET }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/metadata/tables": { + async function* responses() { + for (const line of lines as IdLine[]) { + yield { old_id: line.id, new_id: line.id + TABLE_OFFSET }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/metadata/fields": { + async function* responses() { + for (const line of lines as IdLine[]) { + if (fieldFailures.has(line.id)) { + yield { + old_id: line.id, + error: "invalid_table_id", + detail: "test failure", + }; + continue; + } + const newId = line.id + FIELD_OFFSET; + const inserted = + fieldInsertBehavior === "new" || + (fieldInsertBehavior === "alternate" && + fieldInsertCounter++ % 2 === 0); + yield inserted + ? { old_id: line.id, new_id: newId } + : { old_id: line.id, existing_id: newId }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/metadata/fields/finalize": { + async function* responses() { + for (const line of lines as IdLine[]) { + yield { id: line.id, ok: true }; + } + } + return ndjsonStreamResponse(responses()); + } + case "/api/database/field-values": { + async function* responses() { + for (const line of lines as FieldValuesLine[]) { + yield { field_id: line.field_id, created: true }; + } + } + return ndjsonStreamResponse(responses()); + } + default: + return new Response("not found", { status: 404 }); + } + }, + }); + + return { + baseUrl: `http://127.0.0.1:${server.port}`, + calls, + stop: () => server.stop(), + setFieldInsertBehavior: (behavior) => { + fieldInsertBehavior = behavior; + fieldInsertCounter = 0; + }, + setFieldFailure: (oldId) => { + fieldFailures.add(oldId); + }, + }; +} + +describe("uploadMetadata", () => { + let mock: MockServerControl; + + beforeEach(() => { + mock = startMockServer(); + }); + + afterEach(async () => { + await mock.stop(); + }); + + it("runs the full pipeline and remaps ids across passes", async () => { + const stats = await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + fieldValuesFile: EXAMPLE_FIELD_VALUES, + instanceUrl: mock.baseUrl, + apiKey: "test-key", + onWarning: () => {}, + }); + + expect(stats).toEqual({ + databases: { mapped: 1, errors: 0 }, + tables: { mapped: 8, errors: 0 }, + fieldsInsert: { mapped: 71, errors: 0, inserted: 71, matched: 0 }, + fieldsFinalize: { mapped: 71, errors: 0 }, + fieldValues: { mapped: 4, errors: 0 }, + }); + + const paths = mock.calls.map((call) => call.path); + // The first three steps are strictly sequential (each feeds the next's + // id map); finalize and field-values are kicked off concurrently once + // the field id map is populated. + expect(paths.slice(0, 3)).toEqual([ + "/api/database/metadata/databases", + "/api/database/metadata/tables", + "/api/database/metadata/fields", + ]); + expect(paths.slice(3).sort()).toEqual([ + "/api/database/field-values", + "/api/database/metadata/fields/finalize", + ]); + + for (const call of mock.calls) { + expect(call.contentType).toBe("application/x-ndjson"); + expect(call.apiKey).toBe("test-key"); + } + }); + + it("rewrites db_id on tables using the step-1 mapping", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const tableCall = mock.calls.find( + (call) => call.path === "/api/database/metadata/tables", + )!; + const sampleDbNewId = 1 + DB_OFFSET; + for (const line of tableCall.lines as TableLine[]) { + expect(line.db_id).toBe(sampleDbNewId); + } + }); + + it("rewrites table_id on fields using the step-3 mapping and strips fk/parent on insert", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const fieldsCall = mock.calls.find( + (call) => call.path === "/api/database/metadata/fields", + )!; + for (const line of fieldsCall.lines as FieldInsertLine[]) { + expect(line.table_id).toBeGreaterThanOrEqual(TABLE_OFFSET + 1); + expect(line.table_id).toBeLessThanOrEqual(TABLE_OFFSET + 8); + expect(line).not.toHaveProperty("parent_id"); + expect(line).not.toHaveProperty("fk_target_field_id"); + } + }); + + it("sends remapped parent_id and fk_target_field_id in finalize", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const finalizeCall = mock.calls.find( + (call) => call.path === "/api/database/metadata/fields/finalize", + )!; + const lines = finalizeCall.lines as FinalizeLine[]; + + for (const line of lines) { + expect(line.id).toBeGreaterThanOrEqual(FIELD_OFFSET + 1); + if (line.fk_target_field_id !== null) { + expect(line.fk_target_field_id).toBeGreaterThanOrEqual( + FIELD_OFFSET + 1, + ); + } + } + + const fkCount = lines.filter( + (line) => line.fk_target_field_id !== null, + ).length; + expect(fkCount).toBeGreaterThan(0); + }); + + it("skips non-inserted rows in finalize (existing_id responses)", async () => { + mock.setFieldInsertBehavior("existing"); + const stats = await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + + expect(stats.fieldsInsert.mapped).toBe(71); + expect(stats.fieldsInsert.inserted).toBe(0); + expect(stats.fieldsInsert.matched).toBe(71); + expect(stats.fieldsFinalize.mapped).toBe(0); + expect(stats.fieldsFinalize.errors).toBe(0); + }); + + it("rewrites field_id on field-values using the step-3 mapping", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + fieldValuesFile: EXAMPLE_FIELD_VALUES, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const valuesCall = mock.calls.find( + (call) => call.path === "/api/database/field-values", + )!; + for (const line of valuesCall.lines as FieldValuesLine[]) { + expect(line.field_id).toBeGreaterThanOrEqual(FIELD_OFFSET + 1); + } + }); + + it("counts per-row errors without aborting the pipeline", async () => { + mock.setFieldFailure(1); + const warnings: string[] = []; + const stats = await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + fieldValuesFile: EXAMPLE_FIELD_VALUES, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: (message) => warnings.push(message), + }); + + expect(stats.fieldsInsert.errors).toBe(1); + expect(stats.fieldsInsert.mapped).toBe(70); + expect(stats.fieldsFinalize.mapped).toBe(70); + expect(warnings.some((w) => w.includes("Field 1"))).toBe(true); + }); + + it("delivers a framed request body to the server", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + // node:http picks Transfer-Encoding: chunked for unknown-length bodies and + // Content-Length for bodies that fit in a single write buffer. Either is + // fine — the point is that the bytes made it to the server intact. + for (const call of mock.calls) { + const hasFraming = + call.transferEncoding === "chunked" || call.contentLength !== null; + expect(hasFraming).toBe(true); + } + }); + + it("skips the field-values step when the file is not provided", async () => { + await uploadMetadata({ + metadataFile: EXAMPLE_METADATA, + instanceUrl: mock.baseUrl, + apiKey: "k", + onWarning: () => {}, + }); + const paths = mock.calls.map((call) => call.path); + expect(paths).not.toContain("/api/database/field-values"); + }); +}); diff --git a/src/upload-metadata.ts b/src/upload-metadata.ts new file mode 100644 index 0000000..acb0542 --- /dev/null +++ b/src/upload-metadata.ts @@ -0,0 +1,478 @@ +import { postNdjson } from "./ndjson.js"; +import { streamJsonElements } from "./stream-json.js"; + +export const API_PATHS = { + databases: "/api/database/metadata/databases", + tables: "/api/database/metadata/tables", + fields: "/api/database/metadata/fields", + fieldsFinalize: "/api/database/metadata/fields/finalize", + fieldValues: "/api/database/field-values", +} as const; + +const JSON_PATHS = { + databases: "$.databases.*", + tables: "$.tables.*", + fields: "$.fields.*", + fieldValues: "$.field_values.*", +} as const; + +export type UploadMetadataOptions = { + metadataFile: string; + fieldValuesFile?: string; + instanceUrl: string; + apiKey: string; + onWarning?: (message: string) => void; +}; + +export type UploadStepStats = { + mapped: number; + errors: number; +}; + +export type UploadFieldInsertStats = UploadStepStats & { + inserted: number; + matched: number; +}; + +export type UploadMetadataResult = { + databases: UploadStepStats; + tables: UploadStepStats; + fieldsInsert: UploadFieldInsertStats; + fieldsFinalize: UploadStepStats; + fieldValues: UploadStepStats; +}; + +type DatabaseEntry = { + id: number; + name: string; + engine: string; +}; + +type TableEntry = { + id: number; + db_id: number; + name: string; + schema: string | null; + description?: string | null; +}; + +type FieldEntry = { + id: number; + table_id: number; + name: string; + base_type?: string; + database_type?: string; + description?: string | null; + semantic_type?: string | null; + effective_type?: string | null; + coercion_strategy?: string | null; + parent_id?: number | null; + fk_target_field_id?: number | null; +}; + +type FieldValuesEntry = { + field_id: number; + values: unknown[]; + has_more_values?: boolean; + human_readable_values?: string[]; +}; + +type DatabaseRequest = { + id: number; + name: string; + engine: string; +}; + +type TableRequest = { + id: number; + db_id: number; + name: string; + schema: string | null; + description?: string | null; +}; + +type FieldInsertRequest = { + id: number; + table_id: number; + name: string; + base_type?: string; + database_type?: string; + description?: string | null; + semantic_type?: string | null; + effective_type?: string | null; + coercion_strategy?: string | null; +}; + +type FieldFinalizeRequest = { + id: number; + parent_id: number | null; + fk_target_field_id: number | null; +}; + +type FieldValuesRequest = { + field_id: number; + values: unknown[]; + has_more_values: boolean; + human_readable_values?: string[]; +}; + +type IdMapResponse = + | { old_id: number; new_id: number } + | { old_id: number; existing_id: number } + | { old_id?: number; line?: number; error: string; detail?: string }; + +type FieldFinalizeResponse = + | { id: number; ok: true } + | { id?: number; line?: number; error: string; detail?: string }; + +type FieldValuesResponse = + | { field_id: number; created: true } + | { field_id: number; updated: true } + | { field_id?: number; line?: number; error: string; detail?: string }; + +type RecordIdMapResponseOptions = { + response: IdMapResponse; + responseIndex: number; + stats: UploadStepStats; + idMap: Map; + label: string; + onInserted?: (oldId: number) => void; + onMatched?: (oldId: number) => void; +}; + +function joinUrl(baseUrl: string, path: string): string { + return `${baseUrl.replace(/\/$/, "")}${path}`; +} + +function emptyStats(): UploadStepStats { + return { mapped: 0, errors: 0 }; +} + +function emptyFieldInsertStats(): UploadFieldInsertStats { + return { mapped: 0, errors: 0, inserted: 0, matched: 0 }; +} + +function formatError( + label: string, + id: number | undefined, + response: { error?: string; detail?: string; line?: number }, + requestIndex?: number, +): string { + const locator = buildErrorLocator(id, response.line, requestIndex); + const detailSuffix = response.detail ? ` — ${response.detail}` : ""; + return `${label}${locator}: ${response.error ?? "unknown error"}${detailSuffix}`; +} + +function buildErrorLocator( + id: number | undefined, + serverLine: number | undefined, + requestIndex: number | undefined, +): string { + if (id !== undefined) { + return ` ${id}`; + } + if (serverLine !== undefined) { + return ` (source line #${serverLine})`; + } + if (requestIndex !== undefined) { + return ` (response #${requestIndex})`; + } + return ""; +} + +function pickDatabaseRequest(db: DatabaseEntry): DatabaseRequest { + return { id: db.id, name: db.name, engine: db.engine }; +} + +function pickTableRequest(table: TableEntry, dbId: number): TableRequest { + const request: TableRequest = { + id: table.id, + db_id: dbId, + name: table.name, + schema: table.schema, + }; + if (table.description !== undefined) { + request.description = table.description; + } + return request; +} + +function pickFieldInsertRequest( + field: FieldEntry, + tableId: number, +): FieldInsertRequest { + const request: FieldInsertRequest = { + id: field.id, + table_id: tableId, + name: field.name, + }; + if (field.base_type !== undefined) { + request.base_type = field.base_type; + } + if (field.database_type !== undefined) { + request.database_type = field.database_type; + } + if (field.description !== undefined) { + request.description = field.description; + } + if (field.semantic_type !== undefined) { + request.semantic_type = field.semantic_type; + } + if (field.effective_type !== undefined) { + request.effective_type = field.effective_type; + } + if (field.coercion_strategy !== undefined) { + request.coercion_strategy = field.coercion_strategy; + } + return request; +} + +function pickFieldValuesRequest( + entry: FieldValuesEntry, + fieldId: number, +): FieldValuesRequest { + const request: FieldValuesRequest = { + field_id: fieldId, + values: entry.values, + has_more_values: entry.has_more_values ?? false, + }; + if (entry.human_readable_values !== undefined) { + request.human_readable_values = entry.human_readable_values; + } + return request; +} + +export async function uploadMetadata({ + metadataFile, + fieldValuesFile, + instanceUrl, + apiKey, + onWarning, +}: UploadMetadataOptions): Promise { + const warn = onWarning ?? ((message: string) => console.warn(message)); + + const databaseIdMap = new Map(); + const tableIdMap = new Map(); + const fieldIdMap = new Map(); + const insertedFieldIds = new Set(); + + const result: UploadMetadataResult = { + databases: emptyStats(), + tables: emptyStats(), + fieldsInsert: emptyFieldInsertStats(), + fieldsFinalize: emptyStats(), + fieldValues: emptyStats(), + }; + + function recordIdMapResponse({ + response, + responseIndex, + stats, + idMap, + label, + onInserted, + onMatched, + }: RecordIdMapResponseOptions): void { + if ("new_id" in response) { + idMap.set(response.old_id, response.new_id); + onInserted?.(response.old_id); + stats.mapped += 1; + return; + } + if ("existing_id" in response) { + idMap.set(response.old_id, response.existing_id); + onMatched?.(response.old_id); + stats.mapped += 1; + return; + } + stats.errors += 1; + warn(formatError(label, response.old_id, response, responseIndex)); + } + + async function* remapForeignKey(opts: { + jsonPath: string; + sourceFile: string; + getKey: (entry: In) => number; + idMap: Map; + transform: (entry: In, newKey: number) => Out; + describeSkip: (entry: In, missingKey: number) => string; + }): AsyncGenerator { + for await (const entry of streamJsonElements( + opts.sourceFile, + opts.jsonPath, + )) { + const oldKey = opts.getKey(entry); + const newKey = opts.idMap.get(oldKey); + if (newKey === undefined) { + warn(opts.describeSkip(entry, oldKey)); + continue; + } + yield opts.transform(entry, newKey); + } + } + + function remapFieldReference( + oldId: number | null | undefined, + ownerFieldId: number, + referenceName: "parent_id" | "fk_target_field_id", + ): number | null { + if (oldId == null) { + return null; + } + const newId = fieldIdMap.get(oldId); + if (newId === undefined) { + warn( + `Field ${ownerFieldId}: dropping ${referenceName} → ${oldId} (referenced field was not mapped)`, + ); + return null; + } + return newId; + } + + async function* fieldFinalizeRequests(): AsyncGenerator { + for await (const field of streamJsonElements( + metadataFile, + JSON_PATHS.fields, + )) { + if (!insertedFieldIds.has(field.id)) { + continue; + } + const newId = fieldIdMap.get(field.id); + if (newId === undefined) { + continue; + } + yield { + id: newId, + parent_id: remapFieldReference(field.parent_id, field.id, "parent_id"), + fk_target_field_id: remapFieldReference( + field.fk_target_field_id, + field.id, + "fk_target_field_id", + ), + }; + } + } + + async function* streamDatabaseRequests(): AsyncGenerator { + for await (const database of streamJsonElements( + metadataFile, + JSON_PATHS.databases, + )) { + yield pickDatabaseRequest(database); + } + } + + await postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.databases), + apiKey, + requests: streamDatabaseRequests(), + onResponse: (response, responseIndex) => + recordIdMapResponse({ + response, + responseIndex, + stats: result.databases, + idMap: databaseIdMap, + label: "Database", + }), + }); + + await postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.tables), + apiKey, + requests: remapForeignKey({ + jsonPath: JSON_PATHS.tables, + sourceFile: metadataFile, + getKey: (table) => table.db_id, + idMap: databaseIdMap, + transform: pickTableRequest, + describeSkip: (table, oldDbId) => + `Skipping table ${table.id} (${table.name}): source db_id ${oldDbId} did not map to a target database`, + }), + onResponse: (response, responseIndex) => + recordIdMapResponse({ + response, + responseIndex, + stats: result.tables, + idMap: tableIdMap, + label: "Table", + }), + }); + + await postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.fields), + apiKey, + requests: remapForeignKey({ + jsonPath: JSON_PATHS.fields, + sourceFile: metadataFile, + getKey: (field) => field.table_id, + idMap: tableIdMap, + transform: pickFieldInsertRequest, + describeSkip: (field, oldTableId) => + `Skipping field ${field.id} (${field.name}): source table_id ${oldTableId} did not map to a target table`, + }), + onResponse: (response, responseIndex) => + recordIdMapResponse({ + response, + responseIndex, + stats: result.fieldsInsert, + idMap: fieldIdMap, + label: "Field", + onInserted: (oldId) => { + insertedFieldIds.add(oldId); + result.fieldsInsert.inserted += 1; + }, + onMatched: () => { + result.fieldsInsert.matched += 1; + }, + }), + }); + + const finalizePass = postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.fieldsFinalize), + apiKey, + requests: fieldFinalizeRequests(), + onResponse: (response, responseIndex) => { + if ("ok" in response) { + result.fieldsFinalize.mapped += 1; + return; + } + result.fieldsFinalize.errors += 1; + warn(formatError("Finalize", response.id, response, responseIndex)); + }, + }); + + const fieldValuesPass = fieldValuesFile + ? postNdjson({ + url: joinUrl(instanceUrl, API_PATHS.fieldValues), + apiKey, + requests: remapForeignKey({ + jsonPath: JSON_PATHS.fieldValues, + sourceFile: fieldValuesFile, + getKey: (entry) => entry.field_id, + idMap: fieldIdMap, + transform: pickFieldValuesRequest, + describeSkip: (entry, oldId) => + `Skipping field values for field_id ${oldId}: no mapping from source field to target`, + }), + onResponse: (response, responseIndex) => { + if ("error" in response) { + result.fieldValues.errors += 1; + warn( + formatError( + "Field values", + response.field_id, + response, + responseIndex, + ), + ); + return; + } + result.fieldValues.mapped += 1; + }, + }) + : Promise.resolve(); + + await Promise.all([finalizePass, fieldValuesPass]); + + return result; +}