Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions src/generator/WorkerPool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { Worker, MessageChannel, parentPort } from "worker_threads";
import * as path from "path";
import * as os from "os";

export interface WorkerTask {
event: "start_generation" | "start_export";
data: unknown;
}

export interface WorkerResult {
event: "done" | "error" | "progress";
data?: unknown;
error?: string;
}

export interface WorkerPoolOptions {
maxWorkers?: number;
workerPath?: string;
}

export class WorkerPool {
private workers: Worker[] = [];
private taskQueue: Array<{ task: WorkerTask; resolve: (result: WorkerResult) => void; reject: (error: Error) => void }> = [];
private activeWorkers = 0;
private maxWorkers: number;
private workerPath: string;
private channel: MessageChannel | null = null;

constructor(options: WorkerPoolOptions = {}) {
this.maxWorkers = options.maxWorkers || Math.max(1, os.cpus().length - 1);
this.workerPath = options.workerPath || path.join(__dirname, "worker.cjs");
}

async initialize(): Promise<void> {
for (let i = 0; i < this.maxWorkers; i++) {
const worker = new Worker(this.workerPath);
this.workers.push(worker);
}
}

async processTask(task: WorkerTask): Promise<WorkerResult> {
return new Promise((resolve, reject) => {
this.taskQueue.push({ task, resolve, reject });
this.processNext();
});
}

private async processNext(): Promise<void> {
if (this.taskQueue.length === 0 || this.activeWorkers >= this.maxWorkers) {
return;
}

const { task, resolve, reject } = this.taskQueue.shift()!;
this.activeWorkers++;

const workerIndex = this.findAvailableWorker();
if (workerIndex === -1) {
this.activeWorkers--;
this.taskQueue.unshift({ task, resolve, reject });
return;
}

const worker = this.workers[workerIndex];

const messageHandler = (result: WorkerResult) => {
if (result.event === "progress") {
task.data && typeof task.data === "object" && "onProgress" in task.data &&
((task.data as any).onProgress?.(result.data));
return;
}

worker.off("message", messageHandler);
worker.off("error", errorHandler);
this.activeWorkers--;

if (result.event === "error") {
reject(new Error(result.error || "Unknown error"));
} else {
resolve(result);
}

this.processNext();
};

const errorHandler = (error: Error) => {
worker.off("message", messageHandler);
worker.off("error", errorHandler);
this.activeWorkers--;
reject(error);
this.processNext();
};

worker.on("message", messageHandler);
worker.on("error", errorHandler);

try {
worker.postMessage(task);
} catch (error) {
worker.off("message", messageHandler);
worker.off("error", errorHandler);
this.activeWorkers--;
reject(error instanceof Error ? error : new Error(String(error)));
this.processNext();
}
}

private findAvailableWorker(): number {
for (let i = 0; i < this.workers.length; i++) {
return i;
}
return 0;
}

async terminate(): Promise<void> {
await Promise.all(this.workers.map(w => w.terminate()));
this.workers = [];
}

getWorkerCount(): number {
return this.maxWorkers;
}

getActiveWorkerCount(): number {
return this.activeWorkers;
}

getQueuedTaskCount(): number {
return this.taskQueue.length;
}
}

export interface ShardRange {
workerId: number;
start: number;
end: number;
}

export function computeShards(
totalCount: number,
workerCount: number,
seed: string | number
): ShardRange[] {
const baseChunkSize = Math.floor(totalCount / workerCount);
const remainder = totalCount % workerCount;
const ranges: ShardRange[] = [];

let currentStart = 0;
for (let i = 0; i < workerCount; i++) {
const chunkSize = baseChunkSize + (i < remainder ? 1 : 0);
if (chunkSize === 0) continue;

ranges.push({
workerId: i,
start: currentStart,
end: currentStart + chunkSize,
});
currentStart += chunkSize;
}

return ranges;
}

export function generateDeterministicShards(
collections: Array<{ collectionName: string; count: number }>,
workerCount: number,
seed: string | number
): Map<string, ShardRange[]> {
const result = new Map<string, ShardRange[]>();

for (const col of collections) {
result.set(col.collectionName, computeShards(col.count, workerCount, seed));
}

return result;
}
7 changes: 6 additions & 1 deletion src/generator/adapters/BaseAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ export interface CollectionDetails {
primaryKey?: string;
primaryKeyType?: "string" | "integer" | "number" | "uuid" | "long";
startId?: number;
// Composite PK support - TODO - not woriking as of now for data generation
// Composite PK support
primaryKeys?: string[];
primaryKeyTypes?: ("string" | "integer" | "number" | "uuid" | "long")[];
isCompositePK?: boolean;
/** Whether the primary key is auto-generated by the database (e.g. AUTO_INCREMENT, SERIAL) */
isAutoIncrement?: boolean;
}

/**
Expand Down Expand Up @@ -236,6 +238,7 @@ export abstract class BaseAdapter {
for (let i = rangeStart; i < rangeStart + count; i++) {
const doc: GeneratedDocument = {
id: undefined as any,
pkValues: {},
data: {},
};

Expand All @@ -250,6 +253,7 @@ export abstract class BaseAdapter {
if (pkField.isForeignKey) continue;
const pkValue = this.generatePKValue(collection.name, pkField, i);
doc.data[pkField.name] = pkValue;
doc.pkValues![pkField.name] = pkValue;
}

const firstNonFKPK = sortedPkFields.find((f) => !f.isForeignKey);
Expand All @@ -265,6 +269,7 @@ export abstract class BaseAdapter {
const pkField = pkFields[0];
if (pkField) {
doc.data[pkField.name] = docId;
doc.pkValues![pkField.name] = docId;
}
if (!doc.data["id"]) doc.data["id"] = docId;
}
Expand Down
38 changes: 31 additions & 7 deletions src/generator/adapters/MySQLAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ export class MySQLAdapter extends BaseAdapter {

const escapedCollection = this.escapeIdentifier(collectionName);

// Determine the primary key to use for accurate ID extraction
const details = await this.getCollectionDetails(collectionName);
const primaryKey = details.primaryKey || "id";
const isAutoIncrement = !!(details as any).isAutoIncrement;

try {
const keys = Object.keys(documents[0]);
const columns = keys.map(this.escapeIdentifier).join(", ");
Expand All @@ -114,12 +119,30 @@ export class MySQLAdapter extends BaseAdapter {

const query = `INSERT INTO ${escapedCollection} (${columns}) VALUES ${placeholders}`;
const [result] = await this.connection.query(query, values);

const insertId = (result as any).insertId;
if (insertId !== undefined) {
return documents.map((_, index) => insertId + index);

if (isAutoIncrement) {
// For auto-increment PKs, the database tells us the first inserted ID
const insertId = (result as any).insertId;
if (insertId !== undefined && insertId > 0) {
return documents.map((_, index) => insertId + index);
}
}
return documents.map((_, index) => index + 1); // Fallback

// For manual PKs (UUID, custom string/int), extract IDs directly from documents
if (primaryKey && keys.includes(primaryKey)) {
return documents.map(doc => doc[primaryKey]).filter(id => id !== undefined && id !== null);
}

// For composite PKs, extract the first PK component as a representative ID
if (details.isCompositePK && details.primaryKeys) {
const firstPK = details.primaryKeys[0];
if (firstPK && keys.includes(firstPK)) {
return documents.map(doc => doc[firstPK]).filter(id => id !== undefined && id !== null);
}
}

// Last resort: return document indices (should rarely reach here)
return documents.map((_, index) => index + 1);
} catch (error: any) {
if (error.message && error.message.includes("ER_NO_SUCH_TABLE")) {
throw new Error("ER_NO_SUCH_TABLE: Table does not exist");
Expand Down Expand Up @@ -186,8 +209,9 @@ export class MySQLAdapter extends BaseAdapter {
return {
primaryKey: pkRows[0].Field,
primaryKeyType: type,
isCompositePK: false
};
isCompositePK: false,
isAutoIncrement: pkRows[0].Extra?.toLowerCase().includes("auto_increment")
} as CollectionDetails;
}

return {
Expand Down
Loading
Loading