From ddffdddf7f501889386c22e0f1779fc9c4650a3b Mon Sep 17 00:00:00 2001 From: Hood Chatham Date: Wed, 4 Mar 2026 17:36:59 +0100 Subject: [PATCH 1/2] Refactor: extract createWorker() from makeWorkerImpl() Factor out the Isolate -> Script -> Worker creation pipeline from makeWorkerImpl() into a new Server::createWorker() method. This separates the worker creation logic (inspector policy, module registry, fallback service, artifact bundler, script compilation, worker construction) from the validation checks and post-creation wiring that are specific to makeWorkerImpl(). The experimental feature flag checks (NMR, module fallback, Python/NMR incompatibility) remain in makeWorkerImpl() since they only need to run once at initial config validation time. No behavioral changes. --- src/workerd/server/server.c++ | 156 ++++++++++++++++++---------------- src/workerd/server/server.h | 16 ++++ 2 files changed, 101 insertions(+), 71 deletions(-) diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index e4d13c96403..efd9fa76d45 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4427,68 +4427,32 @@ kj::Promise> Server::makeWorker(kj::StringPtr name, co_return co_await makeWorkerImpl(name, kj::mv(def), extensions, errorReporter); } -kj::Promise> Server::makeWorkerImpl(kj::StringPtr name, - WorkerDef def, +kj::Own Server::createWorker(kj::StringPtr name, + const WorkerSource& source, + CompatibilityFlags::Reader featureFlags, capnp::List::Reader extensions, - ErrorReporter& errorReporter) { - // Load Python artifacts if this is a Python worker - co_await preloadPython(name, def, errorReporter); - - auto jsgobserver = kj::atomicRefcounted(); - auto observer = kj::atomicRefcounted(); - auto limitEnforcer = kj::refcounted(); - + kj::Maybe moduleFallback, + kj::FunctionParam target, + v8::Local ctxExports)> compileBindings, + kj::Maybe errorReporter, + kj::Maybe> vfsAttachment) { // Create the FsMap that will be used to map known file system // roots to configurable locations. // TODO(node-fs): This is set up to allow users to configure the "mount" // points for known roots but we currently do not expose that in the // config. So for now this just uses the defaults. - auto workerFs = newWorkerFileSystem(kj::heap(), getBundleDirectory(def.source)); - - // TODO(soon): Either make python workers support the new module registry before - // NMR is defaulted on, or disable NMR by default when python workers are enabled. - // While NMR is experimental, we'll just throw an error if both are enabled. - if (def.featureFlags.getPythonWorkers()) { - KJ_REQUIRE(!def.featureFlags.getNewModuleRegistry(), - "Python workers do not currently support the new ModuleRegistry implementation. " - "Please disable the new ModuleRegistry feature flag to use Python workers."); - } - - bool usingNewModuleRegistry = def.featureFlags.getNewModuleRegistry(); - kj::Maybe> newModuleRegistry; - // TODO(soon): Python workers do not currently support the new module registry. - if (usingNewModuleRegistry) { - KJ_REQUIRE(experimental, - "The new ModuleRegistry implementation is an experimental feature. " - "You must run workerd with `--experimental` to use this feature."); - - // We use the same path for modules that the virtual file system uses. - // For instance, if the user specifies a bundle path of "/foo/bar" and - // there is a module in the bundle at "/foo/bar/baz.js", then the module's - // import specifier url will be "file:///foo/bar/baz.js". - const jsg::Url& bundleBase = workerFs->getBundleRoot(); - - // In workerd the module registry is always associated with just a single - // worker instance, so we initialize it here. In production, however, a - // single instance may be shared across multiple replicas. - kj::Maybe maybeFallbackService; - KJ_IF_SOME(moduleFallback, def.moduleFallback) { - maybeFallbackService = kj::str(moduleFallback); - } - - using ArtifactBundler = workerd::api::pyodide::ArtifactBundler; - auto isPythonWorker = def.featureFlags.getPythonWorkers(); - auto artifactBundler = isPythonWorker - ? ArtifactBundler::makePackagesOnlyBundler(pythonConfig.pyodidePackageManager) - : ArtifactBundler::makeDisabledBundler(); - - newModuleRegistry = WorkerdApi::newWorkerdModuleRegistry(*jsgobserver, - def.source.variant.tryGet(), def.featureFlags, pythonConfig, - bundleBase, extensions, kj::mv(maybeFallbackService), kj::mv(artifactBundler)); + auto workerFs = newWorkerFileSystem(kj::heap(), getBundleDirectory(source)); + KJ_IF_SOME(attachment, vfsAttachment) { + workerFs = workerFs.attach(kj::mv(attachment)); } + auto jsgobserver = kj::atomicRefcounted(); + auto observer = kj::atomicRefcounted(); + auto limitEnforcer = kj::refcounted(); auto isolateGroup = v8::IsolateGroup::GetDefault(); - auto api = kj::heap(globalContext->v8System, def.featureFlags, extensions, + auto api = kj::heap(globalContext->v8System, featureFlags, extensions, limitEnforcer->getCreateParams(), isolateGroup, kj::mv(jsgobserver), *memoryCacheProvider, pythonConfig); @@ -4498,8 +4462,9 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr inspectorPolicy = Worker::Isolate::InspectorPolicy::ALLOW_FULLY_TRUSTED; } Worker::LoggingOptions isolateLoggingOptions = loggingOptions; + bool usingNewModuleRegistry = featureFlags.getNewModuleRegistry(); isolateLoggingOptions.consoleMode = - def.source.variant.is() && !usingNewModuleRegistry + source.variant.is() && !usingNewModuleRegistry ? Worker::ConsoleMode::INSPECTOR_ONLY : loggingOptions.consoleMode; auto isolate = kj::atomicRefcounted(kj::mv(api), kj::mv(observer), name, @@ -4511,17 +4476,36 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr isolateRegistrar->registerIsolate(name, isolate.get()); } - if (!usingNewModuleRegistry) { - KJ_IF_SOME(moduleFallback, def.moduleFallback) { - KJ_REQUIRE(experimental, - "The module fallback service is an experimental feature. " - "You must run workerd with `--experimental` to use the module fallback service."); - // If the config has the moduleFallback option, then we are going to set up the ability - // to load certain modules from a fallback service. This is generally intended for local - // dev/testing purposes only. + // Set up module fallback / new module registry. + kj::Maybe> newModuleRegistry; + if (usingNewModuleRegistry) { + // We use the same path for modules that the virtual file system uses. + // For instance, if the user specifies a bundle path of "/foo/bar" and + // there is a module in the bundle at "/foo/bar/baz.js", then the module's + // import specifier url will be "file:///foo/bar/baz.js". + const jsg::Url& bundleBase = workerFs->getBundleRoot(); + + // In workerd the module registry is always associated with just a single + // worker instance, so we initialize it here. In production, however, a + // single instance may be shared across multiple replicas. + kj::Maybe maybeFallbackService; + KJ_IF_SOME(fb, moduleFallback) { + maybeFallbackService = kj::str(fb); + } + + using ArtifactBundler = workerd::api::pyodide::ArtifactBundler; + auto isPythonWorker = featureFlags.getPythonWorkers(); + auto artifactBundler = isPythonWorker + ? ArtifactBundler::makePackagesOnlyBundler(pythonConfig.pyodidePackageManager) + : ArtifactBundler::makeDisabledBundler(); + + newModuleRegistry = WorkerdApi::newWorkerdModuleRegistry(isolate->getApi().getObserver(), + source.variant.tryGet(), featureFlags, pythonConfig, + bundleBase, extensions, kj::mv(maybeFallbackService), kj::mv(artifactBundler)); + } else { + KJ_IF_SOME(fb, moduleFallback) { auto& apiIsolate = isolate->getApi(); - auto fallbackClient = - kj::heap(kj::str(moduleFallback)); + auto fallbackClient = kj::heap(kj::str(fb)); apiIsolate.setModuleFallbackCallback( [client = kj::mv(fallbackClient), featureFlags = apiIsolate.getFeatureFlags()]( jsg::Lock& js, kj::StringPtr specifier, kj::Maybe referrer, @@ -4558,14 +4542,45 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr } using ArtifactBundler = workerd::api::pyodide::ArtifactBundler; - auto isPythonWorker = def.featureFlags.getPythonWorkers(); + auto isPythonWorker = featureFlags.getPythonWorkers(); auto artifactBundler = isPythonWorker ? ArtifactBundler::makePackagesOnlyBundler(pythonConfig.pyodidePackageManager) : ArtifactBundler::makeDisabledBundler(); - auto script = isolate->newScript(name, def.source, IsolateObserver::StartType::COLD, - SpanParent(nullptr), workerFs.attach(kj::mv(def.maybeOwnedSourceCode)), false, errorReporter, - kj::mv(artifactBundler), kj::mv(newModuleRegistry)); + auto script = isolate->newScript(name, source, IsolateObserver::StartType::COLD, + SpanParent(nullptr), kj::mv(workerFs), false, errorReporter, kj::mv(artifactBundler), + kj::mv(newModuleRegistry)); + + return kj::atomicRefcounted(kj::mv(script), kj::atomicRefcounted(), + kj::mv(compileBindings), IsolateObserver::StartType::COLD, SpanParent(nullptr), + Worker::Lock::TakeSynchronously(kj::none), errorReporter); +} + +kj::Promise> Server::makeWorkerImpl(kj::StringPtr name, + WorkerDef def, + capnp::List::Reader extensions, + ErrorReporter& errorReporter) { + // Load Python artifacts if this is a Python worker + co_await preloadPython(name, def, errorReporter); + + // Validate experimental feature flags before creating the worker. + // TODO(soon): Either make python workers support the new module registry before + // NMR is defaulted on, or disable NMR by default when python workers are enabled. + if (def.featureFlags.getPythonWorkers()) { + KJ_REQUIRE(!def.featureFlags.getNewModuleRegistry(), + "Python workers do not currently support the new ModuleRegistry implementation. " + "Please disable the new ModuleRegistry feature flag to use Python workers."); + } + if (def.featureFlags.getNewModuleRegistry()) { + KJ_REQUIRE(experimental, + "The new ModuleRegistry implementation is an experimental feature. " + "You must run workerd with `--experimental` to use this feature."); + } + if (def.moduleFallback != kj::none && !def.featureFlags.getNewModuleRegistry()) { + KJ_REQUIRE(experimental, + "The module fallback service is an experimental feature. " + "You must run workerd with `--experimental` to use the module fallback service."); + } using Global = WorkerdApi::Global; jsg::V8Ref ctxExportsHandle = nullptr; @@ -4578,9 +4593,8 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr return def.compileBindings(lock, api, target); }; - auto worker = kj::atomicRefcounted(kj::mv(script), kj::atomicRefcounted(), - kj::mv(compileBindings), IsolateObserver::StartType::COLD, SpanParent(nullptr), - Worker::Lock::TakeSynchronously(kj::none), errorReporter); + auto worker = createWorker(name, def.source, def.featureFlags, extensions, def.moduleFallback, + kj::mv(compileBindings), errorReporter, kj::mv(def.maybeOwnedSourceCode)); uint totalActorChannels = 0; diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 6162b1107f8..ec4048a8efe 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -308,6 +308,22 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl struct ConfigErrorReporter; struct DynamicErrorReporter; struct WorkerDef; + + // Creates a new V8 Isolate, compiles a Script, and constructs a Worker. Handles inspector + // policy, inspector registration, module fallback setup (both old and new registry paths), + // and artifact bundler creation. + kj::Own createWorker(kj::StringPtr name, + const WorkerSource& source, + CompatibilityFlags::Reader featureFlags, + capnp::List::Reader extensions, + kj::Maybe moduleFallback, + kj::FunctionParam target, + v8::Local ctxExports)> compileBindings, + kj::Maybe errorReporter = kj::none, + kj::Maybe> vfsAttachment = kj::none); + kj::Promise> makeWorkerImpl(kj::StringPtr name, WorkerDef def, capnp::List::Reader extensions, From 3f64ddb4db2a7726f2e749d7aea972274681c2d3 Mon Sep 17 00:00:00 2001 From: Hood Chatham Date: Wed, 4 Mar 2026 14:48:58 +0100 Subject: [PATCH 2/2] Implement abortIsolate() in cloudflare:workers module Add abortIsolate(reason) API that terminates the current JS isolate and creates a fresh one from scratch, resetting all module-level state. When called, it immediately terminates all in-flight requests on the isolate. The next request creates a new Worker. The abort-all mechanism uses a shared ForkedPromise that each IoContext subscribes to via onLimitsExceeded(). When abortIsolate() is called, the promise is rejected, causing every IoContext on the isolate to abort. This mirrors how the production 2x memory limit kill works. The reason string is included in all error messages across all aborted requests. --- src/cloudflare/internal/workers.d.ts | 1 + src/cloudflare/workers.ts | 1 + src/workerd/api/workers-module.c++ | 11 + src/workerd/api/workers-module.h | 6 + src/workerd/io/io-channels.h | 7 + src/workerd/io/io-context.c++ | 18 ++ src/workerd/io/io-context.h | 5 + src/workerd/server/server-test.c++ | 378 +++++++++++++++++++++++++++ src/workerd/server/server.c++ | 242 ++++++++++++++++- src/workerd/server/server.h | 3 +- 10 files changed, 669 insertions(+), 3 deletions(-) diff --git a/src/cloudflare/internal/workers.d.ts b/src/cloudflare/internal/workers.d.ts index 607f29b8a73..3228196c02c 100644 --- a/src/cloudflare/internal/workers.d.ts +++ b/src/cloudflare/internal/workers.d.ts @@ -32,3 +32,4 @@ export class RpcTarget {} export class ServiceStub {} export function waitUntil(promise: Promise): void; +export function abortIsolate(reason?: string): never; diff --git a/src/cloudflare/workers.ts b/src/cloudflare/workers.ts index 34025807366..98bb684ee40 100644 --- a/src/cloudflare/workers.ts +++ b/src/cloudflare/workers.ts @@ -151,3 +151,4 @@ export const exports = new Proxy( ); export const waitUntil = entrypoints.waitUntil.bind(entrypoints); +export const abortIsolate = entrypoints.abortIsolate.bind(entrypoints); diff --git a/src/workerd/api/workers-module.c++ b/src/workerd/api/workers-module.c++ index 640f3ea6d59..be8e478f49b 100644 --- a/src/workerd/api/workers-module.c++ +++ b/src/workerd/api/workers-module.c++ @@ -58,4 +58,15 @@ void EntrypointsModule::waitUntil(kj::Promise promise) { IoContext::current().addWaitUntil(kj::mv(promise)); } +void EntrypointsModule::abortIsolate(jsg::Optional reason) { + auto& context = IoContext::current(); + + // Signal the runtime to swap the worker for future requests and abort the current IoContext. + context.abortIsolate(kj::mv(reason)); + + // Immediately terminate V8 execution so no further JS runs in this request. + // This raises an uncatchable exception in V8, causing the request to fail immediately. + jsg::Lock::from(v8::Isolate::GetCurrent()).terminateExecutionNow(); +} + } // namespace workerd::api diff --git a/src/workerd/api/workers-module.h b/src/workerd/api/workers-module.h index aeea03f6079..7750047fbf2 100644 --- a/src/workerd/api/workers-module.h +++ b/src/workerd/api/workers-module.h @@ -79,6 +79,11 @@ class EntrypointsModule: public jsg::Object { void waitUntil(kj::Promise promise); + // Throws away the current JS isolate and recreates the worker from scratch. The current + // request continues running on the old isolate; subsequent requests will use a fresh + // isolate with re-executed top-level module code and fresh global state. + void abortIsolate(jsg::Optional reason); + JSG_RESOURCE_TYPE(EntrypointsModule) { JSG_NESTED_TYPE(WorkerEntrypoint); JSG_NESTED_TYPE(WorkflowEntrypoint); @@ -90,6 +95,7 @@ class EntrypointsModule: public jsg::Object { JSG_NESTED_TYPE_NAMED(Fetcher, ServiceStub); JSG_METHOD(waitUntil); + JSG_METHOD(abortIsolate); } }; diff --git a/src/workerd/io/io-channels.h b/src/workerd/io/io-channels.h index d19a02d4f6e..06f357b0868 100644 --- a/src/workerd/io/io-channels.h +++ b/src/workerd/io/io-channels.h @@ -276,6 +276,13 @@ class IoChannelFactory { KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime"); } + // Signals that the current worker's JS isolate should be thrown away and recreated from scratch. + // The current request continues running on the old isolate; subsequent requests will use a fresh + // isolate with re-executed top-level module code and fresh global state. + virtual void abortIsolate(kj::Maybe reason) { + JSG_FAIL_REQUIRE(Error, "abortIsolate() is not supported by this runtime."); + } + // Use a dynamic Worker loader binding to obtain an Worker by name. If name is null, or if the named Worker doesn't already exist, the callback will be called to fetch the source code from which the Worker should be created. virtual kj::Own loadIsolate(uint loaderChannel, kj::Maybe name, diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 41fd53779c8..d87471eb0dc 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -478,6 +478,24 @@ void IoContext::abort(kj::Exception&& e) { abortFulfiller->reject(kj::mv(e)); } +void IoContext::abortIsolate(kj::Maybe reason) { + // Build the error message, including the reason if provided. + auto message = [&]() -> kj::String { + KJ_IF_SOME(r, reason) { + return kj::str("abortIsolate(): ", r); + } else { + return kj::str("abortIsolate() was called."); + } + }(); + + // Tell the IoChannelFactory to swap the worker for future requests and abort ALL in-flight + // requests on the isolate. Pass the reason so all aborted requests see the same message. + getIoChannelFactory().abortIsolate(message.asPtr()); + + // Abort the current IoContext so this request fails. + abort(JSG_KJ_EXCEPTION(FAILED, Error, message)); +} + void IoContext::abortWhen(kj::Promise promise) { // Unlike addTask(), abortWhen() always uses `tasks`, even in actors, because we do not want // these tasks to block hibernation. diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 932b1916322..a0c76b55acb 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -858,6 +858,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler getIoChannelFactory().abortAllActors(reason); } + // Signals that the current worker's JS isolate should be thrown away and recreated from scratch. + // This immediately aborts the current IoContext and terminates V8 execution, causing the current + // request to fail. Subsequent requests will use a freshly created worker. + void abortIsolate(kj::Maybe reason = kj::none); + // Get an HttpClient to use for Cache API subrequests. kj::Own getCacheClient(); diff --git a/src/workerd/server/server-test.c++ b/src/workerd/server/server-test.c++ index 81efdd78cc7..628cbe09ac3 100644 --- a/src/workerd/server/server-test.c++ +++ b/src/workerd/server/server-test.c++ @@ -6012,5 +6012,383 @@ KJ_TEST("Server: workerdDebugPort WebSocket passthrough via WorkerEntrypoint") { wsConn.send(kj::str("\x81\x05", testMessage2)); wsConn.recvWebSocket("echo:world"); } + +KJ_TEST("Server: abortIsolate() resets module state") { + // This test verifies that calling abortIsolate() from cloudflare:workers: + // 1. Terminates the current request with a failure + // 2. Causes subsequent requests to see fresh module-level state (re-executed top-level code) + TestServer test(singleWorker(R"(( + compatibilityDate = "2024-01-01", + compatibilityFlags = ["experimental", "nodejs_compat"], + modules = [ + ( name = "main.js", + esModule = + `import { abortIsolate } from 'cloudflare:workers'; + `let counter = 0; + `export default { + ` async fetch(request) { + ` const url = new URL(request.url); + ` if (url.pathname === '/increment') { + ` counter++; + ` return new Response(String(counter)); + ` } + ` if (url.pathname === '/reset') { + ` abortIsolate('resetting module state'); + ` return new Response('unreachable'); + ` } + ` return new Response('not found', { status: 404 }); + ` } + `} + ) + ] + ))"_kj)); + + test.server.allowExperimental(); + test.start(); + + { + auto conn = test.connect("test-addr"); + // First request: counter goes from 0 to 1. + conn.httpGet200("/increment", "1"); + // Second request: counter goes to 2, proving state persists across requests. + conn.httpGet200("/increment", "2"); + } + + { + // Third request: call abortIsolate(). Should get a 500 error. + // The IoContext abort and JS exception produce log messages we need to expect. + KJ_EXPECT_LOG(INFO, "abortIsolate(): resetting module state"); + KJ_EXPECT_LOG(ERROR, "abortIsolate(): resetting module state"); + + auto conn = test.connect("test-addr"); + conn.sendHttpGet("/reset"); + conn.recv(R"( + HTTP/1.1 500 Internal Server Error + Connection: close + Content-Length: 21 + + Internal Server Error)"_blockquote); + } + + { + auto conn = test.connect("test-addr"); + // Fourth request: after reset, module-level counter should be back to 0, so this returns "1". + conn.httpGet200("/increment", "1"); + } +} + +KJ_TEST("Server: abortIsolate() preserves env bindings") { + // Verifies that after abortIsolate(), env bindings (text, json) are still available + // in the fresh worker. + TestServer test(singleWorker(R"(( + compatibilityDate = "2024-01-01", + compatibilityFlags = ["experimental", "nodejs_compat"], + modules = [ + ( name = "main.js", + esModule = + `import { abortIsolate } from 'cloudflare:workers'; + `let counter = 0; + `export default { + ` async fetch(request, env) { + ` const url = new URL(request.url); + ` if (url.pathname === '/check') { + ` counter++; + ` return new Response(JSON.stringify({ + ` counter, + ` text: env.MY_TEXT, + ` json: env.MY_JSON, + ` })); + ` } + ` if (url.pathname === '/reset') { + ` abortIsolate(); + ` } + ` return new Response('not found', { status: 404 }); + ` } + `} + ) + ], + bindings = [ + ( name = "MY_TEXT", text = "hello-text" ), + ( name = "MY_JSON", + json = `{"key":"value"} + ) + ] + ))"_kj)); + + test.server.allowExperimental(); + test.start(); + + { + auto conn = test.connect("test-addr"); + conn.httpGet200("/check", R"({"counter":1,"text":"hello-text","json":{"key":"value"}})"); + conn.httpGet200("/check", R"({"counter":2,"text":"hello-text","json":{"key":"value"}})"); + } + + { + KJ_EXPECT_LOG(INFO, "abortIsolate() was called"); + KJ_EXPECT_LOG(ERROR, "abortIsolate() was called"); + auto conn = test.connect("test-addr"); + conn.sendHttpGet("/reset"); + conn.recv(R"( + HTTP/1.1 500 Internal Server Error + Connection: close + Content-Length: 21 + + Internal Server Error)"_blockquote); + } + + { + // After reset: counter restarted at 0 (now 1), and bindings still work. + auto conn = test.connect("test-addr"); + conn.httpGet200("/check", R"({"counter":1,"text":"hello-text","json":{"key":"value"}})"); + } +} + +KJ_TEST("Server: abortIsolate() with service bindings") { + // Verifies that after abortIsolate(), service bindings to other workers still function. + TestServer test(R"(( + services = [ + ( name = "main-worker", + worker = ( + compatibilityDate = "2024-01-01", + compatibilityFlags = ["experimental", "nodejs_compat"], + modules = [ + ( name = "main.js", + esModule = + `import { abortIsolate } from 'cloudflare:workers'; + `let counter = 0; + `export default { + ` async fetch(request, env) { + ` const url = new URL(request.url); + ` if (url.pathname === '/check') { + ` counter++; + ` let resp = await env.backend.fetch('http://backend/hello'); + ` let text = await resp.text(); + ` return new Response(counter + ':' + text); + ` } + ` if (url.pathname === '/reset') { + ` abortIsolate(); + ` } + ` return new Response('not found', { status: 404 }); + ` } + `} + ) + ], + bindings = [(name = "backend", service = "backend-worker")] + ) + ), + ( name = "backend-worker", + worker = ( + compatibilityDate = "2024-01-01", + modules = [ + ( name = "main.js", + esModule = + `export default { + ` async fetch(request) { + ` return new Response('from-backend'); + ` } + `} + ) + ] + ) + ), + ], + sockets = [ + ( name = "main", + address = "test-addr", + service = "main-worker" + ) + ] + ))"_kj); + + test.server.allowExperimental(); + test.start(); + + { + auto conn = test.connect("test-addr"); + conn.httpGet200("/check", "1:from-backend"); + conn.httpGet200("/check", "2:from-backend"); + } + + { + KJ_EXPECT_LOG(INFO, "abortIsolate() was called"); + KJ_EXPECT_LOG(ERROR, "abortIsolate() was called"); + auto conn = test.connect("test-addr"); + conn.sendHttpGet("/reset"); + conn.recv(R"( + HTTP/1.1 500 Internal Server Error + Connection: close + Content-Length: 21 + + Internal Server Error)"_blockquote); + } + + { + // After reset: counter restarted, service binding still works. + auto conn = test.connect("test-addr"); + conn.httpGet200("/check", "1:from-backend"); + } +} + +KJ_TEST("Server: abortIsolate() with durable object") { + // Verifies that after abortIsolate(): + // 1. The DO binding still works after recreation + // 2. The DO's in-memory state also resets (in workerd, the DO shares the same isolate) + // 3. The worker's module-level state resets + TestServer test(R"(( + services = [ + ( name = "hello", + worker = ( + compatibilityDate = "2024-01-01", + compatibilityFlags = ["experimental", "nodejs_compat"], + modules = [ + ( name = "main.js", + esModule = + `import { abortIsolate, DurableObject } from 'cloudflare:workers'; + `let counter = 0; + `export default { + ` async fetch(request, env) { + ` const url = new URL(request.url); + ` if (url.pathname === '/check') { + ` counter++; + ` let id = env.ns.idFromName('singleton'); + ` let stub = env.ns.get(id); + ` let resp = await stub.fetch('http://do/increment'); + ` let doCount = await resp.text(); + ` return new Response(counter + ':' + doCount); + ` } + ` if (url.pathname === '/reset') { + ` abortIsolate(); + ` } + ` return new Response('not found', { status: 404 }); + ` } + `} + `export class MyDO extends DurableObject { + ` count = 0; + ` async fetch(request) { + ` this.count++; + ` return new Response(String(this.count)); + ` } + `} + ) + ], + bindings = [(name = "ns", durableObjectNamespace = "MyDO")], + durableObjectNamespaces = [ + ( className = "MyDO", + uniqueKey = "mykey", + ) + ], + durableObjectStorage = (inMemory = void) + ) + ), + ], + sockets = [ + ( name = "main", + address = "test-addr", + service = "hello" + ) + ] + ))"_kj); + + test.server.allowExperimental(); + test.start(); + + { + auto conn = test.connect("test-addr"); + // Worker counter=1, DO count=1 + conn.httpGet200("/check", "1:1"); + // Worker counter=2, DO count=2 + conn.httpGet200("/check", "2:2"); + } + + { + KJ_EXPECT_LOG(INFO, "abortIsolate() was called"); + KJ_EXPECT_LOG(ERROR, "abortIsolate() was called"); + auto conn = test.connect("test-addr"); + conn.sendHttpGet("/reset"); + conn.recv(R"( + HTTP/1.1 500 Internal Server Error + Connection: close + Content-Length: 21 + + Internal Server Error)"_blockquote); + } + + { + // After reset: both worker counter AND DO in-memory state restart, + // because abortIsolate() destroys all activity on the isolate including DOs. + auto conn = test.connect("test-addr"); + conn.httpGet200("/check", "1:1"); + } +} + +KJ_TEST("Server: abortIsolate() terminates concurrent WebSocket connection") { + // Verifies that calling abortIsolate() from one request terminates other in-flight requests + // on the same isolate. Request 1 opens a WebSocket. Request 2 calls abortIsolate(). + // Request 1's WebSocket should be terminated. + TestServer test(singleWorker(R"(( + compatibilityDate = "2024-01-01", + compatibilityFlags = ["experimental", "nodejs_compat"], + modules = [ + ( name = "main.js", + esModule = + `import { abortIsolate } from 'cloudflare:workers'; + `export default { + ` async fetch(request) { + ` const url = new URL(request.url); + ` if (request.headers.get('Upgrade') === 'websocket') { + ` const pair = new WebSocketPair(); + ` const [client, server] = Object.values(pair); + ` server.accept(); + ` server.addEventListener('message', (e) => { + ` server.send('echo:' + e.data); + ` }); + ` return new Response(null, { status: 101, webSocket: client }); + ` } + ` if (url.pathname === '/reset') { + ` abortIsolate('test abort'); + ` } + ` return new Response('not found', { status: 404 }); + ` } + `} + ) + ] + ))"_kj)); + + test.server.allowExperimental(); + test.start(); + + // Request 1: Open a WebSocket and verify it works. + auto wsConn = test.connect("test-addr"); + wsConn.upgradeToWebSocket(); + wsConn.send(kj::str("\x81\x05hello")); + wsConn.recvWebSocket("echo:hello"); + + { + // Request 2: Call abortIsolate() on a separate connection. + // This should terminate ALL activity on the isolate, including the WebSocket from Request 1. + // abortIsolate() produces several error logs: + // - The calling request's IoContext abort (INFO + ERROR for the reason message) + // - The WebSocket request's IoContext abort via onLimitsExceeded() (multiple ERROR logs) + // - The HTTP connection error handler for the WebSocket connection + KJ_EXPECT_LOG(INFO, "abortIsolate(): test abort"); + KJ_EXPECT_LOG(ERROR, "abortIsolate(): test abort"); + KJ_EXPECT_LOG(ERROR, "abortIsolate(): test abort"); + + auto conn = test.connect("test-addr"); + conn.sendHttpGet("/reset"); + conn.recv(R"( + HTTP/1.1 500 Internal Server Error + Connection: close + Content-Length: 21 + + Internal Server Error)"_blockquote); + } + + // The WebSocket from Request 1 should now be terminated — the connection should be at EOF + // because abortIsolate() destroys all activity on the isolate, not just the calling request. + KJ_EXPECT(wsConn.isEof()); +} + } // namespace } // namespace workerd::server diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index efd9fa76d45..750f3e136a1 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1918,7 +1918,9 @@ class Server::WorkerService final: public Service, abortActorsCallback(kj::mv(abortActorsCallback)), dockerPath(kj::mv(dockerPathParam)), containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImageParam)), - isDynamic(isDynamic) {} + isDynamic(isDynamic) { + resetAbortAllPromise(); + } // Call immediately after the constructor to set up `actorNamespaces`. This can't happen during // the constructor itself since it sets up cyclic references, which will throw an exception if @@ -2073,6 +2075,11 @@ class Server::WorkerService final: public Service, return actorNamespaces; } + using WorkerFactory = kj::Function()>; + void setWorkerFactory(WorkerFactory factory) { + workerFactory = kj::mv(factory); + } + kj::Own startRequest(IoChannelFactory::SubrequestMetadata metadata) override { return startRequest(kj::mv(metadata), kj::none, {}); } @@ -2092,6 +2099,12 @@ class Server::WorkerService final: public Service, bool isTracer = false) { TRACE_EVENT("workerd", "Server::WorkerService::startRequest()"); + // If abortIsolate() was called, lazily recreate the worker now (outside any isolate lock). + if (isolateAbortRequested) { + worker = KJ_ASSERT_NONNULL(workerFactory)(); + isolateAbortRequested = false; + } + auto& channels = KJ_ASSERT_NONNULL(ioChannels.tryGet()); kj::Vector> bufferedTailWorkers(channels.tails.size()); @@ -3222,6 +3235,24 @@ class Server::WorkerService final: public Service, kj::Maybe containerEgressInterceptorImage; bool isDynamic; + // Set to true when abortIsolate() is called; checked by startRequest() to lazily recreate. + bool isolateAbortRequested = false; + + // Factory function that creates a fresh Worker. Used by abortIsolate(). + kj::Maybe workerFactory; + + // Abort signal for all in-flight requests. When abortIsolate() is called, the fulfiller is + // rejected, which causes all IoContexts listening via onLimitsExceeded() to abort. A new + // promise/fulfiller pair is then created for the next generation of requests. + kj::ForkedPromise abortAllPromise = nullptr; + kj::Own> abortAllFulfiller; + + void resetAbortAllPromise() { + auto paf = kj::newPromiseAndFulfiller(); + abortAllPromise = paf.promise.fork(); + abortAllFulfiller = kj::mv(paf.fulfiller); + } + class ActorChannelImpl final: public IoChannelFactory::ActorChannel { public: ActorChannelImpl(kj::Own actorContainer) @@ -3434,6 +3465,32 @@ class Server::WorkerService final: public Service, abortActorsCallback(reason); } + void abortIsolate(kj::Maybe reason) override { + if (workerFactory == kj::none) { + JSG_FAIL_REQUIRE(Error, "abortIsolate() is not supported for this worker configuration."); + } + // Set a flag so that the next call to startRequest() will recreate the worker. + // We can't create the new worker right now because we're inside a request that holds the + // V8 isolate lock, and creating a new Worker from the same Script requires that same lock. + isolateAbortRequested = true; + + // Reject the shared abort promise to terminate ALL in-flight requests on this isolate, + // not just the calling request. This mirrors how the production 2x memory limit works: + // each IoContext subscribes to onLimitsExceeded() which returns a branch of this promise. + // Include the reason so all aborted requests see the same message. + auto message = [&]() -> kj::String { + KJ_IF_SOME(r, reason) { + return kj::str(r); + } else { + return kj::str("abortIsolate() was called."); + } + }(); + abortAllFulfiller->reject(JSG_KJ_EXCEPTION(FAILED, Error, kj::mv(message))); + + // Create a fresh promise/fulfiller for the next generation of requests (after recreation). + resetAbortAllPromise(); + } + kj::Own loadIsolate(uint loaderChannel, kj::Maybe name, kj::Function()> fetchSource) override; @@ -3510,7 +3567,7 @@ class Server::WorkerService final: public Service, return kj::none; } kj::Promise onLimitsExceeded() override { - return kj::NEVER_DONE; + return abortAllPromise.addBranch(); } void setCpuLimitNearlyExceededCallback(kj::Function cb) override {} void requireLimitsNotExceeded() override {} @@ -4010,6 +4067,10 @@ struct Server::WorkerDef { // If the WorkerDef was created from a DymamicWorkerSource and that // source contains a clone of the source bundle, this will take ownership. kj::Maybe> maybeOwnedSourceCode; + + // Cloned env binding globals for use by abortIsolate() to recreate the worker with the same + // bindings. Only populated for static (non-dynamic) workers. + kj::Array envGlobalsForFactory; }; class Server::WorkerLoaderNamespace: public kj::Refcounted { @@ -4374,6 +4435,9 @@ kj::Promise> Server::makeWorker(kj::StringPtr name, } // Construct `WorkerDef` from `conf`. + // Clone the env binding globals before they're moved into def.compileBindings, so + // we can reuse them for abortIsolate() to recreate the worker with the same bindings. + auto envGlobalsForFactory = KJ_MAP(g, globals) { return g.clone(); }; WorkerDef def{ .featureFlags = featureFlags.asReader(), .source = WorkerdApi::extractSource(name, conf, featureFlags.asReader(), errorReporter), @@ -4422,11 +4486,118 @@ kj::Promise> Server::makeWorker(kj::StringPtr name, return WorkerdApi::from(api).compileGlobals(lock, globals, target, 1); }, // clang-format on + + .envGlobalsForFactory = kj::mv(envGlobalsForFactory), }; co_return co_await makeWorkerImpl(name, kj::mv(def), extensions, errorReporter); } +// Owns a deep copy of a WorkerSource so it can outlive the original capnp config message. +// Used by the abortIsolate() factory to recreate the worker from scratch. +// Only supports ModulesSource (ES modules) which is the common case for abortIsolate(). +class OwnedWorkerSource { + public: + explicit OwnedWorkerSource(const WorkerSource& src) + : OwnedWorkerSource(src.variant.get()) {} + + const WorkerSource& borrow() const { + return source; + } + + private: + // Per-module owned data. The WorkerSource's Module array borrows StringPtr/ArrayPtr from these. + struct ModuleData { + kj::String name; + kj::OneOf> body; + bool treatAsInternalForTest = false; + // We temporarily store the ModuleContent in here before moving it into WorkerSource. + kj::Maybe content; + }; + + explicit OwnedWorkerSource(const WorkerSource::ModulesSource& ms) + : ownedMainModule(kj::str(ms.mainModule)), + isPython(ms.isPython), + ownedModuleData(cloneAll(ms.modules)), + source(WorkerSource::ModulesSource{ + .mainModule = ownedMainModule, + .modules = buildAll(ownedModuleData), + .isPython = isPython, + }) {} + + static kj::Array cloneAll(const kj::Array& modules) { + return KJ_MAP(m, modules) -> ModuleData { + ModuleData d; + d.name = kj::str(m.name); + d.treatAsInternalForTest = m.treatAsInternalForTest; + KJ_SWITCH_ONEOF(m.content) { + KJ_CASE_ONEOF(es, WorkerSource::EsModule) { + auto body = kj::str(es.body); + d.content = WorkerSource::EsModule{.body = body.asArray()}; + d.body = kj::mv(body); + } + KJ_CASE_ONEOF(cjs, WorkerSource::CommonJsModule) { + auto body = kj::str(cjs.body); + d.content = WorkerSource::CommonJsModule{.body = body}; + d.body = kj::mv(body); + } + KJ_CASE_ONEOF(text, WorkerSource::TextModule) { + auto body = kj::str(text.body); + d.content = WorkerSource::TextModule{.body = body}; + d.body = kj::mv(body); + } + KJ_CASE_ONEOF(json, WorkerSource::JsonModule) { + auto body = kj::str(json.body); + d.content = WorkerSource::JsonModule{.body = body}; + d.body = kj::mv(body); + } + KJ_CASE_ONEOF(py, WorkerSource::PythonModule) { + auto body = kj::str(py.body); + d.content = WorkerSource::PythonModule{.body = body}; + d.body = kj::mv(body); + } + KJ_CASE_ONEOF(data, WorkerSource::DataModule) { + auto body = kj::heapArray(data.body); + d.content = WorkerSource::DataModule{.body = body}; + d.body = kj::mv(body); + } + KJ_CASE_ONEOF(wasm, WorkerSource::WasmModule) { + auto body = kj::heapArray(wasm.body); + d.content = WorkerSource::WasmModule{.body = body}; + d.body = kj::mv(body); + } + KJ_CASE_ONEOF(req, WorkerSource::PythonRequirement) { + d.content = WorkerSource::PythonRequirement{}; + } + KJ_CASE_ONEOF(capnp, WorkerSource::CapnpModule) { + d.content = WorkerSource::CapnpModule{.typeId = capnp.typeId}; + } + } + KJ_ASSERT(KJ_ASSERT_NONNULL(d.content).which() == m.content.which()); + return d; + }; + } + + static kj::Array buildAll(kj::ArrayPtr data) { + return KJ_MAP(d, data) -> WorkerSource::Module { + auto content = kj::mv(KJ_ASSERT_NONNULL(d.content)); + d.content = kj::none; + return { + .name = d.name, + .content = kj::mv(content), + .treatAsInternalForTest = d.treatAsInternalForTest, + }; + }; + } + + // Members in initialization order. `source` MUST be declared last because it borrows + // StringPtr/ArrayPtr from ownedMainModule and ownedModuleData. + kj::String ownedMainModule; + bool isPython; + kj::Array ownedModuleData; + WorkerSource source; +}; + kj::Own Server::createWorker(kj::StringPtr name, const WorkerSource& source, CompatibilityFlags::Reader featureFlags, @@ -4597,6 +4768,7 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr kj::mv(compileBindings), errorReporter, kj::mv(def.maybeOwnedSourceCode)); uint totalActorChannels = 0; + kj::Array ctxExportsForFactory; worker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none), [&](Worker::Lock& lock) { lock.validateHandlers(errorReporter); @@ -4663,6 +4835,9 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr } totalActorChannels = nextActorChannel; + // Clone the ctx.exports globals so we can reuse them in the worker factory for abortIsolate(). + ctxExportsForFactory = KJ_MAP(g, ctxExports) { return g.clone(); }; + JSG_WITHIN_CONTEXT_SCOPE(lock, lock.getContext(), [&](jsg::Lock& js) { WorkerdApi::from(worker->getIsolate().getApi()) .compileGlobals(lock, ctxExports, ctxExportsHandle.getHandle(js), 1); @@ -4672,6 +4847,23 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr { auto drop = kj::mv(ctxExportsHandle); } }); + // Extract pieces needed for the worker factory (abortIsolate()) before they're moved away. + auto envGlobalsForFactory = kj::mv(def.envGlobalsForFactory); + bool isDynamicForFactory = def.isDynamic; + bool isModulesSourceForFactory = def.source.variant.is(); + kj::Maybe> ownedSourceForFactory; + kj::Maybe> featureFlagsMsgForFactory; + kj::Maybe moduleFallbackForFactory; + if (!isDynamicForFactory && isModulesSourceForFactory) { + auto msg = kj::heap(); + msg->setRoot(def.featureFlags); + featureFlagsMsgForFactory = kj::mv(msg); + ownedSourceForFactory = kj::heap(def.source); + KJ_IF_SOME(moduleFallback, def.moduleFallback) { + moduleFallbackForFactory = kj::str(moduleFallback); + } + } + auto linkCallback = [this, def = kj::mv(def), totalActorChannels](WorkerService& workerService, Worker::ValidationErrorReporter& errorReporter) mutable { WorkerService::LinkedIoChannels result{.alarmScheduler = *alarmScheduler}; @@ -4850,6 +5042,52 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath), kj::mv(containerEgressInterceptorImage), def.isDynamic); result->initActorNamespaces(def.localActorConfigs, network); + + // Set up the worker factory for abortIsolate(). The factory creates a completely fresh + // Isolate -> Script -> Worker, ensuring all module-level state is re-initialized. + // We capture references to Server members needed for Isolate construction and deep copies + // of the env/ctx.exports globals for binding compilation. + // Note: ownedSourceForFactory and featureFlagsMsgForFactory were extracted above, before + // `def` was moved into the linkCallback lambda. + if (!isDynamicForFactory && isModulesSourceForFactory) { + auto ownedSource = kj::mv(KJ_ASSERT_NONNULL(ownedSourceForFactory)); + auto featureFlagsMsg = kj::mv(KJ_ASSERT_NONNULL(featureFlagsMsgForFactory)); + + result->setWorkerFactory( + [this, workerName = kj::str(name), envGlobals = kj::mv(envGlobalsForFactory), + ctxExportsGlobals = kj::mv(ctxExportsForFactory), + featureFlagsMsg = kj::mv(featureFlagsMsg), ownedSource = kj::mv(ownedSource), + extensions, + moduleFallback = kj::mv(moduleFallbackForFactory)]() mutable -> kj::Own { + auto featureFlags = featureFlagsMsg->getRoot(); + const auto& source = ownedSource->borrow(); + + auto moduleFallbackPtr = moduleFallback.map([](auto& fb) { return fb.asPtr(); }); + + // Create the Worker with bindings. + jsg::V8Ref ctxExportsHandle = nullptr; + auto compileBindings = [&](jsg::Lock& lock, const Worker::Api& apiRef, + v8::Local target, v8::Local ctxExports) { + ctxExportsHandle = lock.v8Ref(ctxExports); + return WorkerdApi::from(apiRef).compileGlobals(lock, envGlobals, target, 1); + }; + + auto newWorker = createWorker(workerName, source, featureFlags.asReader(), extensions, + moduleFallbackPtr, kj::mv(compileBindings)); + + // Compile ctx.exports bindings. + newWorker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none), [&](Worker::Lock& lock) { + JSG_WITHIN_CONTEXT_SCOPE(lock, lock.getContext(), [&](jsg::Lock& js) { + WorkerdApi::from(newWorker->getIsolate().getApi()) + .compileGlobals(lock, ctxExportsGlobals, ctxExportsHandle.getHandle(js), 1); + }); + { auto drop = kj::mv(ctxExportsHandle); } + }); + + return newWorker; + }); + } + co_return result; } diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index ec4048a8efe..f26db892182 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -311,7 +311,8 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl // Creates a new V8 Isolate, compiles a Script, and constructs a Worker. Handles inspector // policy, inspector registration, module fallback setup (both old and new registry paths), - // and artifact bundler creation. + // and artifact bundler creation. Used by both makeWorkerImpl() (initial creation) and the + // abortIsolate() factory (recreation). kj::Own createWorker(kj::StringPtr name, const WorkerSource& source, CompatibilityFlags::Reader featureFlags,