Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions src/generator/WorkerPool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import { describe, it, expect, beforeEach } from "vitest";
import {
computeShards,
generateDeterministicShards,
createShardTasks,
ShardRange,
ShardTask,
} from "./WorkerPool";

describe("WorkerPool Shard Computation", () => {
describe("computeShards", () => {
it("should divide 10 items across 2 workers evenly", () => {
const shards = computeShards(10, 2, "seed123");

expect(shards).toHaveLength(2);
expect(shards[0]).toEqual({ workerId: 0, start: 0, end: 5, count: 5 });
expect(shards[1]).toEqual({ workerId: 1, start: 5, end: 10, count: 5 });
});

it("should handle uneven division with remainder", () => {
const shards = computeShards(10, 3, "seed123");

expect(shards).toHaveLength(3);
expect(shards[0]).toEqual({ workerId: 0, start: 0, end: 4, count: 4 });
expect(shards[1]).toEqual({ workerId: 1, start: 4, end: 7, count: 3 });
expect(shards[2]).toEqual({ workerId: 2, start: 7, end: 10, count: 3 });
});

it("should handle 1 worker (no sharding)", () => {
const shards = computeShards(100, 1, "seed123");

expect(shards).toHaveLength(1);
expect(shards[0]).toEqual({ workerId: 0, start: 0, end: 100, count: 100 });
});

it("should handle more workers than items", () => {
const shards = computeShards(3, 5, "seed123");

expect(shards).toHaveLength(3);
expect(shards[0].count).toBe(1);
expect(shards[1].count).toBe(1);
expect(shards[2].count).toBe(1);
});

it("should produce contiguous ranges", () => {
const shards = computeShards(100, 4, "seed123");

for (let i = 1; i < shards.length; i++) {
expect(shards[i].start).toBe(shards[i - 1].end);
}
expect(shards[shards.length - 1].end).toBe(100);
});

it("should cover the entire range (start to end)", () => {
const totalCount = 50;
const shards = computeShards(totalCount, 4, "seed456");

const totalGenerated = shards.reduce((sum, s) => sum + s.count, 0);
expect(totalGenerated).toBe(totalCount);
});
});

describe("generateDeterministicShards", () => {
it("should generate shards for multiple collections", () => {
const collections = [
{ collectionName: "users", count: 100 },
{ collectionName: "orders", count: 200 },
];

const result = generateDeterministicShards(collections, 4, "seed789");

expect(result.has("users")).toBe(true);
expect(result.has("orders")).toBe(true);

const userShards = result.get("users")!;
const orderShards = result.get("orders")!;

expect(userShards.reduce((sum, s) => sum + s.count, 0)).toBe(100);
expect(orderShards.reduce((sum, s) => sum + s.count, 0)).toBe(200);
});

it("should be deterministic for same seed", () => {
const collections = [
{ collectionName: "users", count: 50 },
];

const result1 = generateDeterministicShards(collections, 3, "fixed-seed");
const result2 = generateDeterministicShards(collections, 3, "fixed-seed");

const shards1 = JSON.stringify(Array.from(result1.entries()));
const shards2 = JSON.stringify(Array.from(result2.entries()));

expect(shards1).toBe(shards2);
});
});

describe("createShardTasks", () => {
it("should create tasks for all collections and shards", () => {
const collections = [
{ collectionName: "users", count: 10 },
{ collectionName: "orders", count: 20 },
];

const tasks = createShardTasks(collections, 2, "seed123");

const userTasks = tasks.filter((t) => t.collectionIndex === 0);
const orderTasks = tasks.filter((t) => t.collectionIndex === 1);

expect(userTasks.length).toBe(2);
expect(orderTasks.length).toBe(2);

expect(userTasks.reduce((sum, t) => sum + t.count, 0)).toBe(10);
expect(orderTasks.reduce((sum, t) => sum + t.count, 0)).toBe(20);
});

it("should assign correct rangeStart values", () => {
const collections = [
{ collectionName: "items", count: 100 },
];

const tasks = createShardTasks(collections, 4, "seed123");

expect(tasks[0].rangeStart).toBe(0);
expect(tasks[0].count).toBe(25);

expect(tasks[1].rangeStart).toBe(25);
expect(tasks[1].count).toBe(25);

expect(tasks[2].rangeStart).toBe(50);
expect(tasks[2].count).toBe(25);

expect(tasks[3].rangeStart).toBe(75);
expect(tasks[3].count).toBe(25);
});

it("should produce task counts that sum to total documents", () => {
const collections = [
{ collectionName: "users", count: 50 },
{ collectionName: "posts", count: 100 },
{ collectionName: "comments", count: 200 },
];

const tasks = createShardTasks(collections, 4, "seed456");

const totalCounts = tasks.reduce((sum, t) => sum + t.count, 0);
expect(totalCounts).toBe(350);
});
});

describe("Deterministic Consistency", () => {
it("should produce identical shards across multiple runs with same seed", () => {
const collections = [
{ collectionName: "test", count: 1000 },
];

const run1 = createShardTasks(collections, 4, "deterministic-seed");
const run2 = createShardTasks(collections, 4, "deterministic-seed");
const run3 = createShardTasks(collections, 4, "deterministic-seed");

expect(run1).toEqual(run2);
expect(run2).toEqual(run3);
});

it("should produce consistent shard assignments regardless of seed", () => {
const collections = [
{ collectionName: "test", count: 100 },
];

const run1 = createShardTasks(collections, 2, "seed-A");
const run2 = createShardTasks(collections, 2, "seed-B");
const run3 = createShardTasks(collections, 2, "seed-C");

expect(run1).toEqual(run2);
expect(run2).toEqual(run3);
});
});
});
112 changes: 112 additions & 0 deletions src/generator/WorkerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,98 @@ export class WorkerPool {
getQueuedTaskCount(): number {
return this.taskQueue.length;
}

async processShardedGeneration(
taskData: Omit<WorkerTask, "event">,
shardTasks: ShardTask[],
onProgress?: (progress: any, workerId: number) => void
): Promise<WorkerResult[]> {
const results: WorkerResult[] = [];
const pendingTasks = new Map<number, Promise<WorkerResult>>();

for (const shard of shardTasks) {
const workerIndex = shard.workerId % this.workers.length;

const shardTask: WorkerTask = {
event: "start_generation",
data: {
...taskData,
shard: {
workerId: shard.workerId,
collectionIndex: shard.collectionIndex,
rangeStart: shard.rangeStart,
count: shard.count,
},
},
};

const worker = this.workers[workerIndex];
const taskPromise = new Promise<WorkerResult>((resolve, reject) => {
const messageHandler = (result: WorkerResult) => {
if (result.event === "progress") {
onProgress?.(result.data, shard.workerId);
return;
}

worker.off("message", messageHandler);
worker.off("error", errorHandler);

if (result.event === "error") {
reject(new Error(result.error || "Unknown error"));
} else {
resolve(result);
}
};

const errorHandler = (error: Error) => {
worker.off("message", messageHandler);
worker.off("error", errorHandler);
reject(error);
};

worker.on("message", messageHandler);
worker.on("error", errorHandler);

try {
worker.postMessage(shardTask);
} catch (error) {
worker.off("message", messageHandler);
worker.off("error", errorHandler);
reject(error instanceof Error ? error : new Error(String(error)));
}
});

pendingTasks.set(shard.workerId, taskPromise);
}

for (const [workerId, promise] of pendingTasks) {
try {
const result = await promise;
results.push(result);
} catch (error) {
results.push({
event: "error",
error: error instanceof Error ? error.message : String(error),
});
}
}

return results;
}
}

export interface ShardRange {
workerId: number;
start: number;
end: number;
count: number;
}

export interface ShardTask {
workerId: number;
collectionIndex: number;
rangeStart: number;
count: number;
}

export function computeShards(
Expand All @@ -153,6 +239,7 @@ export function computeShards(
workerId: i,
start: currentStart,
end: currentStart + chunkSize,
count: chunkSize,
});
currentStart += chunkSize;
}
Expand All @@ -173,3 +260,28 @@ export function generateDeterministicShards(

return result;
}

export function createShardTasks(
collections: Array<{ collectionName: string; count: number }>,
workerCount: number,
seed: string | number
): ShardTask[] {
const tasks: ShardTask[] = [];
const shardsByCollection = generateDeterministicShards(collections, workerCount, seed);

for (let colIndex = 0; colIndex < collections.length; colIndex++) {
const col = collections[colIndex];
const shards = shardsByCollection.get(col.collectionName) || [];

for (const shard of shards) {
tasks.push({
workerId: shard.workerId,
collectionIndex: colIndex,
rangeStart: shard.start,
count: shard.count,
});
}
}

return tasks;
}
18 changes: 11 additions & 7 deletions src/generator/adapters/BaseAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,22 @@ export abstract class BaseAdapter {
): AsyncGenerator<GeneratedDocument> {
const random = seedrandom(`${this.seed}_${collection.name}`);

// Skip ahead if rangeStart > 0 to maintain determinism
// Note: seedrandom doesn't support skipping easily without state or manual loops
// For now, we do manual loops to advance the RNG for simplicity/correctness
for (let i = 0; i < rangeStart; i++) {
random();
}

const syncedSchema =
this.schemaMap.get(collection.name) ||
this.schemaMap.get(collection.name.split(".").pop()!) ||
collection;

if (rangeStart > 0) {
for (let i = 0; i < rangeStart; i++) {
for (const field of syncedSchema.fields) {
if (field.isPrimaryKey) continue;
if (field.name === "id" && !field.isPrimaryKey) continue;
if (field.type === "reference" || field.isForeignKey) continue;
random();
}
}
}

const pkFields = syncedSchema.fields.filter((f) => f.isPrimaryKey);
const isCompositePK = pkFields.length > 1;

Expand Down
Loading
Loading