diff --git a/apps/server/src/open.test.ts b/apps/server/src/open.test.ts index 1efa36e957c..7b712628427 100644 --- a/apps/server/src/open.test.ts +++ b/apps/server/src/open.test.ts @@ -1,7 +1,8 @@ import * as NodeServices from "@effect/platform-node/NodeServices"; import { assert, it } from "@effect/vitest"; import { assertSuccess } from "@effect/vitest/utils"; -import { FileSystem, Path, Effect } from "effect"; +import { FileSystem, Path, Effect, Sink, Stream } from "effect"; +import { ChildProcessSpawner } from "effect/unstable/process"; import { isCommandAvailable, @@ -475,20 +476,53 @@ it.layer(NodeServices.layer)("resolveEditorLaunch", (it) => { }); it.layer(NodeServices.layer)("launchDetached", (it) => { - it.effect("resolves when command can be spawned", () => + it.effect("spawns through the Effect child process service", () => Effect.gen(function* () { + let capturedCommand: unknown; + let unrefCalled = false; + const spawner = ChildProcessSpawner.make((command) => + Effect.sync(() => { + capturedCommand = command; + return ChildProcessSpawner.makeHandle({ + pid: ChildProcessSpawner.ProcessId(123), + exitCode: Effect.succeed(ChildProcessSpawner.ExitCode(0)), + isRunning: Effect.succeed(false), + kill: () => Effect.void, + unref: Effect.sync(() => { + unrefCalled = true; + return Effect.void; + }), + stdin: Sink.drain, + stdout: Stream.empty, + stderr: Stream.empty, + all: Stream.empty, + getInputFd: () => Sink.drain, + getOutputFd: () => Stream.empty, + }); + }), + ); + const result = yield* launchDetached({ command: process.execPath, args: ["-e", "process.exit(0)"], - }).pipe(Effect.result); + }).pipe( + Effect.provideService(ChildProcessSpawner.ChildProcessSpawner, spawner), + Effect.result, + ); + assertSuccess(result, undefined); + assert.isTrue(unrefCalled); + assert.deepInclude(capturedCommand as Record, { + command: process.execPath, + args: ["-e", "process.exit(0)"], + }); }), ); it.effect("rejects when command does not exist", () => Effect.gen(function* () { const result = yield* launchDetached({ - command: `t3code-no-such-command-${Date.now()}`, + command: "t3code-no-such-command-effect-child-process", args: [], }).pipe(Effect.result); assert.equal(result._tag, "Failure"); diff --git a/apps/server/src/open.ts b/apps/server/src/open.ts index 98dfcaf4caa..f265a807197 100644 --- a/apps/server/src/open.ts +++ b/apps/server/src/open.ts @@ -6,11 +6,10 @@ * * @module Open */ -import { spawn } from "node:child_process"; - import { EDITORS, OpenError, type EditorId } from "@t3tools/contracts"; import { isCommandAvailable, type CommandAvailabilityOptions } from "@t3tools/shared/shell"; import { Context, Effect, Layer } from "effect"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; // ============================== // Definitions @@ -182,44 +181,54 @@ export const resolveEditorLaunch = Effect.fn("resolveEditorLaunch")(function* ( return { command: fileManagerCommandForPlatform(platform), args: [input.cwd] }; }); -export const launchDetached = (launch: EditorLaunch) => +const launchDetachedWithSpawner = ( + launch: EditorLaunch, + spawner: ChildProcessSpawner.ChildProcessSpawner["Service"], +) => Effect.gen(function* () { if (!isCommandAvailable(launch.command)) { return yield* new OpenError({ message: `Editor command not found: ${launch.command}` }); } - yield* Effect.callback((resume) => { - let child; - try { + yield* Effect.scoped( + Effect.gen(function* () { const isWin32 = process.platform === "win32"; - child = spawn( - launch.command, - isWin32 ? launch.args.map((a) => `"${a}"`) : [...launch.args], - { - detached: true, - stdio: "ignore", - shell: isWin32, - }, - ); - } catch (error) { - return resume( - Effect.fail(new OpenError({ message: "failed to spawn detached process", cause: error })), + const child = yield* spawner + .spawn( + ChildProcess.make( + launch.command, + isWin32 ? launch.args.map((a) => `"${a}"`) : [...launch.args], + { + detached: true, + stdin: "ignore", + stdout: "ignore", + stderr: "ignore", + shell: isWin32, + }, + ), + ) + .pipe( + Effect.mapError( + (cause) => new OpenError({ message: "failed to spawn detached process", cause }), + ), + ); + const _reref = yield* child.unref.pipe( + Effect.mapError( + (cause) => new OpenError({ message: "failed to unref detached process", cause }), + ), ); - } - - const handleSpawn = () => { - child.unref(); - resume(Effect.void); - }; + }), + ); + }); - child.once("spawn", handleSpawn); - child.once("error", (cause) => - resume(Effect.fail(new OpenError({ message: "failed to spawn detached process", cause }))), - ); - }); +export const launchDetached = (launch: EditorLaunch) => + Effect.gen(function* () { + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; + return yield* launchDetachedWithSpawner(launch, spawner); }); const make = Effect.gen(function* () { + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; const open = yield* Effect.tryPromise({ try: () => import("open"), catch: (cause) => new OpenError({ message: "failed to load browser opener", cause }), @@ -230,8 +239,11 @@ const make = Effect.gen(function* () { Effect.tryPromise({ try: () => open.default(target), catch: (cause) => new OpenError({ message: "Browser auto-open failed", cause }), - }), - openInEditor: (input) => Effect.flatMap(resolveEditorLaunch(input), launchDetached), + }).pipe(Effect.asVoid), + openInEditor: (input) => + Effect.flatMap(resolveEditorLaunch(input), (launch) => + launchDetachedWithSpawner(launch, spawner), + ), } satisfies OpenShape; }); diff --git a/apps/server/src/orchestration/decider.clock.test.ts b/apps/server/src/orchestration/decider.clock.test.ts new file mode 100644 index 00000000000..5cdb20b283f --- /dev/null +++ b/apps/server/src/orchestration/decider.clock.test.ts @@ -0,0 +1,70 @@ +import { CommandId, EventId, ProjectId } from "@t3tools/contracts"; +import { assert, it } from "@effect/vitest"; +import { DateTime, Duration, Effect, Random } from "effect"; +import { TestClock } from "effect/testing"; + +import { decideOrchestrationCommand } from "./decider.ts"; +import { createEmptyReadModel, projectEvent } from "./projector.ts"; + +const projectId = ProjectId.make("project-clock"); +const createdAt = "2026-01-01T00:00:00.000Z"; +const zeroRandomService = { + nextIntUnsafe: () => 0, + nextDoubleUnsafe: () => 0, +}; + +it.effect("uses the Effect clock for generated project update timestamps", () => + Effect.gen(function* () { + const readModel = yield* projectEvent(createEmptyReadModel(createdAt), { + sequence: 1, + eventId: EventId.make("evt-project-clock"), + aggregateKind: "project", + aggregateId: projectId, + type: "project.created", + occurredAt: createdAt, + commandId: CommandId.make("cmd-project-clock-create"), + causationEventId: null, + correlationId: CommandId.make("cmd-project-clock-create"), + metadata: {}, + payload: { + projectId, + title: "Clock", + workspaceRoot: "/tmp/clock", + defaultModelSelection: null, + scripts: [], + createdAt, + updatedAt: createdAt, + }, + }); + + yield* TestClock.adjust(Duration.seconds(5)); + const expectedNow = DateTime.formatIso(yield* DateTime.now); + const result = yield* decideOrchestrationCommand({ + command: { + type: "project.meta.update", + commandId: CommandId.make("cmd-project-clock-update"), + projectId, + title: "Clock Updated", + }, + readModel, + }); + const events = Array.isArray(result) ? [...result] : [result]; + assert.lengthOf(events, 1); + const event = events[0]; + if (!event) { + assert.fail("expected a project meta-updated event"); + return; + } + if (event.type !== "project.meta-updated") { + assert.fail(`expected project.meta-updated, received ${event.type}`); + return; + } + + assert.equal(event.occurredAt, expectedNow); + assert.equal(event.eventId, EventId.make("00000000-0000-4000-8000-000000000000")); + assert.equal(event.payload.updatedAt, expectedNow); + }).pipe( + Effect.provide(TestClock.layer()), + Effect.provideService(Random.Random, zeroRandomService), + ), +); diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 05ae5b0eb00..40c9c6b11c6 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -3,7 +3,7 @@ import type { OrchestrationEvent, OrchestrationReadModel, } from "@t3tools/contracts"; -import { DateTime, Effect } from "effect"; +import { DateTime, Effect, Random } from "effect"; import { OrchestrationCommandInvariantError } from "./Errors.ts"; import { @@ -17,7 +17,11 @@ import { } from "./commandInvariants.ts"; import { projectEvent } from "./projector.ts"; -const currentIso = DateTime.now.pipe(Effect.map(DateTime.formatIso)); +const nowIso = Effect.map(DateTime.now, DateTime.formatIso); +const defaultMetadata: Pick = { + causationEventId: null, + metadata: {}, +}; function withEventBase( input: Pick & { @@ -26,17 +30,17 @@ function withEventBase( readonly occurredAt: string; readonly metadata?: OrchestrationEvent["metadata"]; }, -): Omit { - return { - eventId: crypto.randomUUID() as OrchestrationEvent["eventId"], +): Effect.Effect> { + return Effect.map(Random.nextUUIDv4, (eventId) => ({ + ...defaultMetadata, + eventId: eventId as OrchestrationEvent["eventId"], aggregateKind: input.aggregateKind, aggregateId: input.aggregateId, occurredAt: input.occurredAt, commandId: input.commandId, - causationEventId: null, correlationId: input.commandId, metadata: input.metadata ?? {}, - }; + })); } type PlannedOrchestrationEvent = Omit; @@ -91,12 +95,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "project", aggregateId: command.projectId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "project.created", payload: { projectId: command.projectId, @@ -116,14 +120,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, projectId: command.projectId, }); - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "project", aggregateId: command.projectId, occurredAt, commandId: command.commandId, - }), + })), type: "project.meta-updated", payload: { projectId: command.projectId, @@ -173,14 +177,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }); } - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "project", aggregateId: command.projectId, occurredAt, commandId: command.commandId, - }), + })), type: "project.deleted" as const, payload: { projectId: command.projectId, @@ -201,12 +205,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.created", payload: { threadId: command.threadId, @@ -229,14 +233,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt, commandId: command.commandId, - }), + })), type: "thread.deleted", payload: { threadId: command.threadId, @@ -251,14 +255,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt, commandId: command.commandId, - }), + })), type: "thread.archived", payload: { threadId: command.threadId, @@ -274,14 +278,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt, commandId: command.commandId, - }), + })), type: "thread.unarchived", payload: { threadId: command.threadId, @@ -296,14 +300,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt, commandId: command.commandId, - }), + })), type: "thread.meta-updated", payload: { threadId: command.threadId, @@ -324,14 +328,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt, commandId: command.commandId, - }), + })), type: "thread.runtime-mode-set", payload: { threadId: command.threadId, @@ -347,14 +351,14 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); - const occurredAt = yield* currentIso; + const occurredAt = yield* nowIso; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt, commandId: command.commandId, - }), + })), type: "thread.interaction-mode-set", payload: { threadId: command.threadId, @@ -395,12 +399,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }); } const userMessageEvent: Omit = { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.message-sent", payload: { threadId: command.threadId, @@ -415,12 +419,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }, }; const turnStartRequestedEvent: Omit = { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), causationEventId: userMessageEvent.eventId, type: "thread.turn-start-requested", payload: { @@ -446,12 +450,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.turn-interrupt-requested", payload: { threadId: command.threadId, @@ -468,7 +472,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, @@ -476,7 +480,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" metadata: { requestId: command.requestId, }, - }), + })), type: "thread.approval-response-requested", payload: { threadId: command.threadId, @@ -494,7 +498,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, @@ -502,7 +506,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" metadata: { requestId: command.requestId, }, - }), + })), type: "thread.user-input-response-requested", payload: { threadId: command.threadId, @@ -520,12 +524,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.checkpoint-revert-requested", payload: { threadId: command.threadId, @@ -542,12 +546,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.session-stop-requested", payload: { threadId: command.threadId, @@ -563,13 +567,13 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, metadata: {}, - }), + })), type: "thread.session-set", payload: { threadId: command.threadId, @@ -585,12 +589,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.message-sent", payload: { threadId: command.threadId, @@ -612,12 +616,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.message-sent", payload: { threadId: command.threadId, @@ -639,12 +643,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.proposed-plan-upserted", payload: { threadId: command.threadId, @@ -660,12 +664,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.turn-diff-completed", payload: { threadId: command.threadId, @@ -687,12 +691,12 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, - }), + })), type: "thread.reverted", payload: { threadId: command.threadId, @@ -716,13 +720,13 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" .requestId as OrchestrationEvent["metadata"]["requestId"]) : undefined; return { - ...withEventBase({ + ...(yield* withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, ...(requestId !== undefined ? { metadata: { requestId } } : {}), - }), + })), type: "thread.activity-appended", payload: { threadId: command.threadId,