From ba7a72e288ad0c5a1291075bc8c54fe1aaf568df Mon Sep 17 00:00:00 2001 From: Sam Saffron Date: Fri, 19 Jun 2026 16:44:22 +1000 Subject: [PATCH] feat: add global MiniRacer pause gate Add MiniRacer.pause/resume to quiesce operations process-wide with timeout handling and nested pauses. Expose PauseTimeoutError and opt-in Process._fork hooks so fork can wait for MiniRacer to drain before parent and child continue. Document the fork coordination APIs and cover pause, timeout, hook, and single-threaded fork behavior with tests. --- .github/workflows/ci.yml | 5 +- CHANGELOG | 1 + README.md | 39 +- .../mini_racer_extension.c | 566 ++++++++++++++++-- lib/mini_racer.rb | 62 ++ test/mini_racer_test.rb | 259 ++++++++ test/single_threaded_test.rb | 109 ++++ 7 files changed, 985 insertions(+), 56 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 36799f24..5813807f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: matrix: os: - "macos-latest" - - "ubuntu-latest" + - "ubuntu-24.04" ruby: - "truffleruby+graalvm" @@ -39,6 +39,9 @@ jobs: runs-on: ${{ matrix.os }} timeout-minutes: 10 + env: + TRUFFLERUBYOPT: "--jvm --polyglot" + steps: - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v4 with: diff --git a/CHANGELOG b/CHANGELOG index d1c5c44b..55dab4bb 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -9,6 +9,7 @@ - Avoid finalizer hangs when a forked child garbage-collects a non-idle inherited `:single_threaded` context - Allow Ruby thread interrupts, process shutdown, and cross-thread `Context#dispose` to terminate busy `:single_threaded` JavaScript execution instead of hanging - Make `Context#dispose` while an attached Ruby callback is active either terminate safely or raise instead of deadlocking + - Add `MiniRacer.pause(timeout:)` / `MiniRacer.resume` to quiesce MiniRacer globally, plus opt-in fork hooks built on that pause gate - 0.21.2 - 11-06-2026 - Add `Context#perform_microtask_checkpoint` to synchronously drain the V8 microtask queue, useful for spec-compliant `dispatchEvent` sequencing inside Ruby callbacks diff --git a/README.md b/README.md index 6c0b0b0c..1ce66c34 100644 --- a/README.md +++ b/README.md @@ -143,10 +143,41 @@ When using pre-fork `MiniRacer::Context` objects in `:single_threaded` mode, ensure the process only forks while MiniRacer is quiescent: no thread may be evaluating JavaScript, calling into a context, disposing/freeing a context, running a Ruby callback from JavaScript, or otherwise using MiniRacer at the -instant of `fork`. In multi-threaded applications, guard all MiniRacer context -operations and the `fork` itself with the same application-level lock. Forking -while a MiniRacer operation is in progress can leave inherited pthread mutexes -in an unusable state in the child process. +instant of `fork`. Forking while a MiniRacer operation is in progress can leave +inherited pthread mutexes in an unusable state in the child process. + +`MiniRacer.pause(timeout:)` is a process-global quiesce gate. It prevents new +MiniRacer operations from starting, waits for operations already in progress to +finish, and then keeps MiniRacer paused until `MiniRacer.resume` is called. +`timeout:` is in seconds; if MiniRacer cannot drain in time, +`MiniRacer::PauseTimeoutError` is raised and the pause is rolled back. Omitting +`timeout:` waits indefinitely, which is useful only when the caller knows active +JavaScript cannot get stuck. + +```ruby +MiniRacer.pause(timeout: 5) +begin + pid = fork do + MiniRacer.resume # child: reset inherited pause state + # child process work + end +ensure + MiniRacer.resume # parent: release the pause +end +``` + +For normal Ruby forks you can install an opt-in `Process._fork` hook which uses +that same pause gate automatically: + +```ruby +MiniRacer.install_fork_hooks!(timeout: 5) +``` + +The hook covers `Kernel#fork`, `Process.fork`, and `IO.popen("-")` on Rubies +that expose `Process._fork`. It intentionally does not cover `Process.daemon` or +raw native `fork(2)` calls from other C extensions. When the hook is installed, +do not call `MiniRacer.resume` again in the child block; the hook already resumes +in both parent and child before user child code runs. If you want to ensure your application does not leak memory after fork either: diff --git a/ext/mini_racer_extension/mini_racer_extension.c b/ext/mini_racer_extension/mini_racer_extension.c index afa6e9df..a5cdf5a7 100644 --- a/ext/mini_racer_extension/mini_racer_extension.c +++ b/ext/mini_racer_extension/mini_racer_extension.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #if defined(__linux__) && !defined(__GLIBC__) @@ -191,6 +192,7 @@ static const rb_data_type_t snapshot_type = { static VALUE platform_init_error; static VALUE context_disposed_error; +static VALUE pause_timeout_error; static VALUE parse_error; static VALUE memory_error; static VALUE script_error; @@ -207,6 +209,219 @@ static VALUE js_function_class; static pthread_mutex_t flags_mtx = PTHREAD_MUTEX_INITIALIZER; static Buf flags; // protected by |flags_mtx| +#if defined(__GNUC__) || defined(__clang__) +static __thread int mini_racer_operation_depth; +#else +static _Thread_local int mini_racer_operation_depth; +#endif + +#ifndef __APPLE__ +#define MINI_RACER_PAUSE_CLOCK CLOCK_MONOTONIC +#else +#define MINI_RACER_PAUSE_CLOCK CLOCK_REALTIME +#endif + +#define MINI_RACER_MAX_PAUSE_TIMEOUT (10.0 * 365.0 * 24.0 * 60.0 * 60.0) + +typedef struct MiniRacerPauseState +{ + pthread_mutex_t mtx; + pthread_cond_t cv; + atomic_int pause_depth; + atomic_int active; + atomic_long pid; +} MiniRacerPauseState; + +static MiniRacerPauseState pause_state = { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .pause_depth = 0, + .active = 0, +}; + +struct mini_racer_gate_wait +{ + atomic_int cancel; + int *counted; +}; + +struct mini_racer_pause_wait +{ + atomic_int cancel; + int timed; + int active; + struct timespec deadline; +}; + +static void mini_racer_pause_state_init(int reset_mutex) +{ + pthread_condattr_t cattr; + + if (reset_mutex) { + // Forked children inherit the bytes of parent pthread mutexes/conds, + // but not the parent threads that may have owned or waited on them. + // POSIX is not kind to reinitializing an already-initialized object; + // this mirrors Ruby's pragmatic atfork approach and avoids touching + // parent-owned synchronization state in the child. + pthread_mutex_init(&pause_state.mtx, NULL); + } + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, MINI_RACER_PAUSE_CLOCK); +#endif + pthread_cond_init(&pause_state.cv, &cattr); + pthread_condattr_destroy(&cattr); + atomic_store(&pause_state.pause_depth, 0); + atomic_store(&pause_state.active, 0); + atomic_store(&pause_state.pid, (long)getpid()); + mini_racer_operation_depth = 0; +} + +static inline void mini_racer_pause_recover_after_fork(void) +{ + if (atomic_load(&pause_state.pid) != (long)getpid()) + mini_racer_pause_state_init(1); +} + +static void mini_racer_pause_wakeup_all(void) +{ + mini_racer_pause_recover_after_fork(); + pthread_mutex_lock(&pause_state.mtx); + pthread_cond_broadcast(&pause_state.cv); + pthread_mutex_unlock(&pause_state.mtx); +} + +static void mini_racer_timespec_from_timeout(struct timespec *ts, double timeout) +{ + double seconds; + long nsec; + + clock_gettime(MINI_RACER_PAUSE_CLOCK, ts); + nsec = (long)(modf(timeout, &seconds) * 1000000000.0); + ts->tv_sec += (time_t)seconds; + ts->tv_nsec += nsec; + if (ts->tv_nsec >= 1000000000L) { + ts->tv_sec++; + ts->tv_nsec -= 1000000000L; + } +} + +static const char *mini_racer_error_message(int r) +{ + if (r == ECANCELED) + return "MiniRacer operation was interrupted or canceled"; + return strerror(r); +} + +static int mini_racer_operation_wait_unpaused(atomic_int *cancel, atomic_int *interrupted) +{ + int r; + + if ((r = pthread_mutex_lock(&pause_state.mtx))) + return r; + while (atomic_load(&pause_state.pause_depth) > 0 && + !atomic_load(cancel) && + !(interrupted && atomic_load(interrupted))) { + if ((r = pthread_cond_wait(&pause_state.cv, &pause_state.mtx))) { + pthread_mutex_unlock(&pause_state.mtx); + return r; + } + } + pthread_mutex_unlock(&pause_state.mtx); + if (atomic_load(cancel)) + return ECANCELED; + if (interrupted && atomic_load(interrupted)) + return EINTR; + return 0; +} + +static void mini_racer_active_decrement(void) +{ + if (atomic_fetch_sub(&pause_state.active, 1) == 1 && + atomic_load(&pause_state.pause_depth) > 0) { + pthread_mutex_lock(&pause_state.mtx); + pthread_cond_broadcast(&pause_state.cv); + pthread_mutex_unlock(&pause_state.mtx); + } +} + +static int mini_racer_operation_enter_nogvl(atomic_int *cancel, atomic_int *interrupted, int *counted) +{ + int r; + + *counted = 0; + if (mini_racer_operation_depth > 0) { + mini_racer_operation_depth++; + return 0; + } + + mini_racer_pause_recover_after_fork(); + for (;;) { + if (atomic_load(cancel)) + return ECANCELED; + if (interrupted && atomic_load(interrupted)) + return EINTR; + if (atomic_load(&pause_state.pause_depth) == 0) { + atomic_fetch_add(&pause_state.active, 1); + if (atomic_load(&pause_state.pause_depth) == 0) { + mini_racer_operation_depth = 1; + *counted = 1; + return 0; + } + mini_racer_active_decrement(); + } + if ((r = mini_racer_operation_wait_unpaused(cancel, interrupted))) + return r; + } +} + +static void mini_racer_operation_leave_nogvl(int counted) +{ + if (mini_racer_operation_depth > 0) + mini_racer_operation_depth--; + if (!counted || mini_racer_operation_depth > 0) + return; + + mini_racer_pause_recover_after_fork(); + mini_racer_active_decrement(); +} + +static void *mini_racer_operation_enter_nogvl_entry(void *arg) +{ + struct mini_racer_gate_wait *w; + int r; + + w = arg; + r = mini_racer_operation_enter_nogvl(&w->cancel, NULL, w->counted); + return (void *)(intptr_t)r; +} + +static void mini_racer_gate_wait_ubf(void *arg) +{ + struct mini_racer_gate_wait *w; + + w = arg; + atomic_store(&w->cancel, 1); + mini_racer_pause_wakeup_all(); +} + +static int mini_racer_operation_enter_gvl(int *counted) +{ + struct mini_racer_gate_wait w; + void *r; + + atomic_init(&w.cancel, 0); + *counted = 0; + w.counted = counted; + r = rb_nogvl(mini_racer_operation_enter_nogvl_entry, &w, + mini_racer_gate_wait_ubf, &w, 0); + return (int)(intptr_t)r; +} + +static void mini_racer_operation_leave_gvl(int counted) +{ + mini_racer_operation_leave_nogvl(counted); +} + // arg == &(struct rendezvous_nogvl){...} static void *rendezvous_callback(void *arg); @@ -233,7 +448,8 @@ struct rendezvous_nogvl Buf *req, *res; atomic_int active; atomic_int interrupted; - int started, finished, has_rr_mtx; + atomic_int cancel; + int started, finished, has_rr_mtx, operation_entered, counted; }; struct rendezvous_des @@ -1062,6 +1278,11 @@ static int single_threaded_recover_after_fork(Context *c) #ifndef __APPLE__ pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC); #endif + // In a forked child, the runner thread and any waiters from the parent no + // longer exist. Reinitialize the inherited condition variable in place + // rather than destroying it; this is technically outside POSIX's happy path + // for already-initialized condvars, but it avoids touching parent-owned + // waiter state and matches Ruby's pragmatic atfork reset style. r = pthread_cond_init(&c->cv, &cattr); pthread_condattr_destroy(&cattr); if (r) @@ -1093,12 +1314,17 @@ static void rendezvous_release(struct rendezvous_nogvl *a) Context *c; atomic_store(&a->active, 0); - if (!a->has_rr_mtx) - return; - c = a->context; - c->depth--; - a->has_rr_mtx = 0; - pthread_mutex_unlock(&c->rr_mtx); + if (a->has_rr_mtx) { + c = a->context; + c->depth--; + a->has_rr_mtx = 0; + pthread_mutex_unlock(&c->rr_mtx); + } + if (a->operation_entered) { + mini_racer_operation_leave_nogvl(a->counted); + a->operation_entered = 0; + a->counted = 0; + } } static inline void *rendezvous_nogvl(void *arg) @@ -1109,10 +1335,22 @@ static inline void *rendezvous_nogvl(void *arg) a = arg; c = a->context; + if (!a->operation_entered) { + if ((r = mini_racer_operation_enter_nogvl(&a->cancel, &a->interrupted, &a->counted))) + return (void *)(intptr_t)r; + a->operation_entered = 1; + } if (!a->started) { - if (single_threaded && (r = single_threaded_recover_after_fork(c))) + if (single_threaded && (r = single_threaded_recover_after_fork(c))) { + rendezvous_release(a); return (void *)(intptr_t)r; + } pthread_mutex_lock(&c->rr_mtx); + if (atomic_load(&a->cancel)) { + pthread_mutex_unlock(&c->rr_mtx); + rendezvous_release(a); + return (void *)(intptr_t)ECANCELED; + } a->has_rr_mtx = 1; if (c->depth > 0 && c->depth%50 == 0) { // TODO stop steep recursion fprintf(stderr, "mini_racer: deep js->ruby->js recursion, depth=%d\n", c->depth); @@ -1125,7 +1363,7 @@ static inline void *rendezvous_nogvl(void *arg) next: atomic_store(&a->active, 1); pthread_mutex_lock(&c->mtx); - if (atomic_load(&c->quit)) { + if (atomic_load(&c->quit) || atomic_load(&a->cancel)) { buf_reset(a->req); pthread_mutex_unlock(&c->mtx); a->finished = 1; @@ -1148,14 +1386,18 @@ static inline void *rendezvous_nogvl(void *arg) } pthread_cond_signal(&c->cv); } - while (!c->res_ready && !atomic_load(&a->interrupted) && !atomic_load(&c->quit)) + while (!c->res_ready && + !atomic_load(&a->interrupted) && + !atomic_load(&a->cancel) && + !atomic_load(&c->quit)) { pthread_cond_wait(&c->cv, &c->mtx); + } if (!c->res_ready && atomic_load(&a->interrupted)) { atomic_store(&a->active, 0); pthread_mutex_unlock(&c->mtx); return (void *)(intptr_t)EINTR; } - if (!c->res_ready && atomic_load(&c->quit)) { + if (!c->res_ready && (atomic_load(&c->quit) || atomic_load(&a->cancel))) { buf_reset(a->req); pthread_mutex_unlock(&c->mtx); a->finished = 1; @@ -1170,7 +1412,7 @@ static inline void *rendezvous_nogvl(void *arg) if (*a->res->buf == 'c') { // js -> ruby callback? rb_thread_call_with_gvl(rendezvous_callback, a); buf_reset(a->res); - if (atomic_load(&c->quit)) { + if (atomic_load(&c->quit) || atomic_load(&a->cancel)) { buf_reset(a->req); a->finished = 1; rendezvous_release(a); @@ -1189,18 +1431,30 @@ static void rendezvous_ubf(void *arg) Context *c; a = arg; + atomic_store(&a->interrupted, 1); + mini_racer_pause_wakeup_all(); if (!atomic_load(&a->active)) return; - atomic_store(&a->interrupted, 1); c = a->context; pthread_cond_broadcast(&c->cv); } +struct context_dispose_wait +{ + Context *context; + atomic_int cancel; + int counted; +}; + static void terminate_ubf(void *arg) { + struct context_dispose_wait *a; Context *c; - c = arg; + a = arg; + atomic_store(&a->cancel, 1); + mini_racer_pause_wakeup_all(); + c = a->context; if (c->pst) v8_terminate_execution(c->pst); pthread_cond_broadcast(&c->cv); @@ -1215,6 +1469,8 @@ static void *rendezvous_cancel_nogvl(void *arg) Context *c; a = arg; + atomic_store(&a->cancel, 1); + mini_racer_pause_wakeup_all(); c = a->context; atomic_store(&a->active, 0); if (c->pst) @@ -1293,16 +1549,19 @@ static void rendezvous_no_des(Context *c, Buf *req, Buf *res) a.res = res; atomic_init(&a.active, 0); atomic_init(&a.interrupted, 0); + atomic_init(&a.cancel, 0); a.started = 0; a.finished = 0; a.has_rr_mtx = 0; + a.operation_entered = 0; + a.counted = 0; rv = rb_ensure(rendezvous_no_des_body, (VALUE)&a, rendezvous_no_des_ensure, (VALUE)&a); r = (void *)(intptr_t)NUM2LONG(rv); if ((int)(intptr_t)r == ECANCELED) rb_raise(context_disposed_error, "disposed context"); if (r) - rb_raise(runtime_error, "single-threaded runner: %s", strerror((int)(intptr_t)r)); + rb_raise(runtime_error, "MiniRacer operation: %s", mini_racer_error_message((int)(intptr_t)r)); } // send request to & receive reply from v8 thread; takes ownership of |req| @@ -1586,30 +1845,50 @@ static VALUE context_attach(VALUE self, VALUE name, VALUE proc) static void *context_dispose_do(void *arg) { + struct context_dispose_wait *a; Context *c; + void *ret; int r; - c = arg; + a = arg; + c = a->context; + ret = NULL; + if ((r = mini_racer_operation_enter_nogvl(&a->cancel, NULL, &a->counted))) + return (void *)(intptr_t)r; if (single_threaded) { - if ((r = single_threaded_recover_after_fork(c))) - return (void *)(intptr_t)r; + if ((r = single_threaded_recover_after_fork(c))) { + ret = (void *)(intptr_t)r; + goto out; + } + } + if (atomic_load(&a->cancel)) { + ret = (void *)(intptr_t)ECANCELED; + goto out; } if (c->depth > 0) { r = pthread_mutex_trylock(&c->rr_mtx); if (!r) { pthread_mutex_unlock(&c->rr_mtx); - return (void *)(intptr_t)EBUSY; + ret = (void *)(intptr_t)EBUSY; + goto out; + } + if (r != EBUSY) { + ret = (void *)(intptr_t)r; + goto out; } - if (r != EBUSY) - return (void *)(intptr_t)r; if (c->pst) v8_terminate_execution(c->pst); pthread_cond_broadcast(&c->cv); } if (single_threaded) { pthread_mutex_lock(&c->mtx); - while (c->req.len || c->res.len) + while ((c->req.len || c->res.len) && !atomic_load(&a->cancel)) pthread_cond_wait(&c->cv, &c->mtx); + if (atomic_load(&a->cancel)) { + pthread_mutex_unlock(&c->mtx); + ret = (void *)(intptr_t)ECANCELED; + goto out; + } atomic_store(&c->quit, 1); // disposed if (c->single_threaded_thr_started && c->single_threaded_pid == getpid()) { pthread_cond_signal(&c->cv); @@ -1621,24 +1900,35 @@ static void *context_dispose_do(void *arg) pthread_mutex_unlock(&c->mtx); } else { pthread_mutex_lock(&c->mtx); - while (c->req.len || c->res.len) + while ((c->req.len || c->res.len) && !atomic_load(&a->cancel)) pthread_cond_wait(&c->cv, &c->mtx); + if (atomic_load(&a->cancel)) { + pthread_mutex_unlock(&c->mtx); + ret = (void *)(intptr_t)ECANCELED; + goto out; + } atomic_store(&c->quit, 1); // disposed pthread_cond_signal(&c->cv); // wake up v8 thread pthread_mutex_unlock(&c->mtx); } - return NULL; +out: + mini_racer_operation_leave_nogvl(a->counted); + return ret; } static VALUE context_dispose(VALUE self) { Context *c; + struct context_dispose_wait a; void *r; TypedData_Get_Struct(self, Context, &context_type, c); - r = rb_thread_call_without_gvl(context_dispose_do, c, terminate_ubf, c); + a.context = c; + a.counted = 0; + atomic_init(&a.cancel, 0); + r = rb_thread_call_without_gvl(context_dispose_do, &a, terminate_ubf, &a); if (r) - rb_raise(runtime_error, "context dispose: %s", strerror((int)(intptr_t)r)); + rb_raise(runtime_error, "context dispose: %s", mini_racer_error_message((int)(intptr_t)r)); return Qnil; } @@ -1897,6 +2187,141 @@ static VALUE platform_set_flags(int argc, VALUE *argv, VALUE klass) rb_raise(platform_init_error, "platform already initialized"); } +static void *mini_racer_pause_nogvl(void *arg) +{ + struct mini_racer_pause_wait *w; + int r; + + w = arg; + mini_racer_pause_recover_after_fork(); + if ((r = pthread_mutex_lock(&pause_state.mtx))) + return (void *)(intptr_t)r; + atomic_fetch_add(&pause_state.pause_depth, 1); + for (;;) { + if (atomic_load(&w->cancel)) { + r = ECANCELED; + goto fail; + } + if (atomic_load(&pause_state.active) == 0) { + pthread_mutex_unlock(&pause_state.mtx); + return NULL; + } + if (w->timed) { + r = pthread_cond_timedwait(&pause_state.cv, &pause_state.mtx, &w->deadline); + if (r == ETIMEDOUT && atomic_load(&pause_state.active) == 0) + continue; + if (r) + goto fail; + } else if ((r = pthread_cond_wait(&pause_state.cv, &pause_state.mtx))) { + goto fail; + } + } +fail: + w->active = atomic_load(&pause_state.active); + if (atomic_fetch_sub(&pause_state.pause_depth, 1) == 1) + pthread_cond_broadcast(&pause_state.cv); + pthread_mutex_unlock(&pause_state.mtx); + return (void *)(intptr_t)r; +} + +static void mini_racer_pause_ubf(void *arg) +{ + struct mini_racer_pause_wait *w; + + w = arg; + atomic_store(&w->cancel, 1); + mini_racer_pause_wakeup_all(); +} + +static double mini_racer_parse_pause_timeout(int argc, VALUE *argv, int *timed) +{ + VALUE kwargs, vals[1]; + ID keys[1]; + double timeout; + + rb_scan_args(argc, argv, ":", &kwargs); + *timed = 0; + if (NIL_P(kwargs)) + return 0; + keys[0] = rb_intern("timeout"); + rb_get_kwargs(kwargs, keys, 0, 1, vals); + if (vals[0] == Qundef || NIL_P(vals[0])) + return 0; + if (!RTEST(rb_obj_is_kind_of(vals[0], rb_cNumeric))) + rb_raise(rb_eArgError, "timeout must be a number"); + timeout = NUM2DBL(vals[0]); + if (!isfinite(timeout) || timeout < 0 || timeout > MINI_RACER_MAX_PAUSE_TIMEOUT) + rb_raise(rb_eArgError, "timeout must be a finite number between 0 and 10 years"); + *timed = 1; + return timeout; +} + +static VALUE mini_racer_resume(VALUE self); + +static VALUE mini_racer_pause_yield(VALUE arg) +{ + (void)arg; + return rb_yield(Qnil); +} + +static VALUE mini_racer_pause_ensure_resume(VALUE self) +{ + int status; + + rb_protect(mini_racer_resume, self, &status); + if (status) + rb_set_errinfo(Qnil); + return Qnil; +} + +static VALUE mini_racer_pause(int argc, VALUE *argv, VALUE self) +{ + struct mini_racer_pause_wait w; + double timeout; + void *r; + + if (mini_racer_operation_depth > 0) + rb_raise(runtime_error, "cannot pause MiniRacer from inside an active MiniRacer operation"); + + timeout = mini_racer_parse_pause_timeout(argc, argv, &w.timed); + atomic_init(&w.cancel, 0); + w.active = 0; + if (w.timed) + mini_racer_timespec_from_timeout(&w.deadline, timeout); + r = rb_nogvl(mini_racer_pause_nogvl, &w, mini_racer_pause_ubf, &w, 0); + if (r) { + if ((int)(intptr_t)r == ETIMEDOUT) + rb_raise(pause_timeout_error, "MiniRacer.pause timed out waiting for %d active operation%s", + w.active, w.active == 1 ? "" : "s"); + rb_raise(runtime_error, "MiniRacer.pause: %s", mini_racer_error_message((int)(intptr_t)r)); + } + if (rb_block_given_p()) + return rb_ensure(mini_racer_pause_yield, Qnil, mini_racer_pause_ensure_resume, self); + return Qtrue; +} + +static VALUE mini_racer_resume(VALUE self) +{ + int depth, empty; + + (void)self; + if (atomic_load(&pause_state.pid) != (long)getpid()) { + mini_racer_pause_state_init(1); + return Qnil; + } + pthread_mutex_lock(&pause_state.mtx); + depth = atomic_load(&pause_state.pause_depth); + if (depth <= 0) { + pthread_mutex_unlock(&pause_state.mtx); + rb_raise(runtime_error, "MiniRacer.resume called without a matching pause"); + } + empty = (atomic_fetch_sub(&pause_state.pause_depth, 1) == 1); + if (empty) + pthread_cond_broadcast(&pause_state.cv); + pthread_mutex_unlock(&pause_state.mtx); + return Qnil; +} + // called by v8_global_init; caller must free |*p| with free() void v8_get_flags(char **p, size_t *n) { @@ -1919,16 +2344,68 @@ void v8_get_flags(char **p, size_t *n) rb_thread_lock_native_thread(); } -static VALUE context_initialize(int argc, VALUE *argv, VALUE self) +struct context_initialize_args { - VALUE kwargs, a, k, v; + Context *context; + int counted; +}; + +static VALUE context_initialize_do(VALUE arg) +{ + struct context_initialize_args *a; pthread_attr_t attr; const char *cause; pthread_t thr; + Context *c; + int r; + + a = (struct context_initialize_args *)arg; + c = a->context; + + cause = "MiniRacer operation"; + if ((r = mini_racer_operation_enter_gvl(&a->counted))) + goto fail; + if (single_threaded) { + v8_once_init(); + c->pst = v8_thread_init(c, c->snapshot.buf, c->snapshot.len, c->max_memory, c->verbose_exceptions); + } else { + cause = "pthread_attr_init"; + if ((r = pthread_attr_init(&attr))) + goto fail; + pthread_attr_setstacksize(&attr, 2<<20); // 2 MiB + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + // v8 thread takes ownership of |c| + cause = "pthread_create"; + r = pthread_create(&thr, &attr, v8_thread_start, c); + pthread_attr_destroy(&attr); + if (r) + goto fail; + barrier_wait(&c->early_init); + barrier_wait(&c->late_init); + } + return Qnil; +fail: + rb_raise(runtime_error, "Context.initialize: %s: %s", cause, mini_racer_error_message(r)); + return Qnil; // pacify compiler +} + +static VALUE context_initialize_ensure(VALUE arg) +{ + struct context_initialize_args *a; + + a = (struct context_initialize_args *)arg; + if (a->counted) + mini_racer_operation_leave_gvl(a->counted); + return Qnil; +} + +static VALUE context_initialize(int argc, VALUE *argv, VALUE self) +{ + VALUE kwargs, a, k, v; + struct context_initialize_args init_args; Snapshot *ss; Context *c; char *s; - int r; TypedData_Get_Struct(self, Context, &context_type, c); rb_scan_args(argc, argv, ":", &kwargs); @@ -1971,28 +2448,10 @@ static VALUE context_initialize(int argc, VALUE *argv, VALUE self) } } init: - if (single_threaded) { - v8_once_init(); - c->pst = v8_thread_init(c, c->snapshot.buf, c->snapshot.len, c->max_memory, c->verbose_exceptions); - } else { - cause = "pthread_attr_init"; - if ((r = pthread_attr_init(&attr))) - goto fail; - pthread_attr_setstacksize(&attr, 2<<20); // 2 MiB - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - // v8 thread takes ownership of |c| - cause = "pthread_create"; - r = pthread_create(&thr, &attr, v8_thread_start, c); - pthread_attr_destroy(&attr); - if (r) - goto fail; - barrier_wait(&c->early_init); - barrier_wait(&c->late_init); - } - return Qnil; -fail: - rb_raise(runtime_error, "Context.initialize: %s: %s", cause, strerror(r)); - return Qnil; // pacify compiler + init_args.context = c; + init_args.counted = 0; + return rb_ensure(context_initialize_do, (VALUE)&init_args, + context_initialize_ensure, (VALUE)&init_args); } static VALUE snapshot_alloc(VALUE klass) @@ -2126,10 +2585,15 @@ void Init_mini_racer_extension(void) VALUE c, m; m = rb_define_module("MiniRacer"); + mini_racer_pause_state_init(0); c = rb_define_class_under(m, "Error", rb_eStandardError); snapshot_error = rb_define_class_under(m, "SnapshotError", c); platform_init_error = rb_define_class_under(m, "PlatformAlreadyInitialized", c); context_disposed_error = rb_define_class_under(m, "ContextDisposedError", c); + pause_timeout_error = rb_define_class_under(m, "PauseTimeoutError", c); + + rb_define_singleton_method(m, "pause", mini_racer_pause, -1); + rb_define_singleton_method(m, "resume", mini_racer_resume, 0); c = rb_define_class_under(m, "EvalError", c); parse_error = rb_define_class_under(m, "ParseError", c); diff --git a/lib/mini_racer.rb b/lib/mini_racer.rb index 0687abdc..3d2c218d 100644 --- a/lib/mini_racer.rb +++ b/lib/mini_racer.rb @@ -47,6 +47,8 @@ class ContextDisposedError < Error end class PlatformAlreadyInitialized < Error end + class PauseTimeoutError < Error + end class EvalError < Error end @@ -70,6 +72,66 @@ def backtrace end end + module ForkHooks + def _fork + paused = false + MiniRacer.pause(timeout: MiniRacer.fork_hook_timeout) + paused = true + + super + ensure + exception = $! + if paused + begin + MiniRacer.resume + rescue StandardError + # Keep the original fork/pause failure. + raise unless exception + end + end + end + end + private_constant :ForkHooks + + @fork_hook_timeout = 5.0 + @fork_hooks_installed = false + MAX_FORK_HOOK_TIMEOUT = 10 * 365 * 24 * 60 * 60 + private_constant :MAX_FORK_HOOK_TIMEOUT + + class << self + attr_reader :fork_hook_timeout + + def install_fork_hooks!(timeout: 5.0) + unless respond_to?(:pause) && respond_to?(:resume) + raise NotImplementedError, + "MiniRacer.pause/resume fork coordination is not available on this platform" + end + unless Process.respond_to?(:_fork, true) + raise NotImplementedError, + "Process._fork is not available on this platform" + end + unless timeout.nil? + unless timeout.is_a?(Numeric) + raise ArgumentError, + "timeout must be nil or a finite number between 0 and 10 years" + end + timeout = timeout.to_f + unless timeout.finite? && timeout >= 0 && + timeout <= MAX_FORK_HOOK_TIMEOUT + raise ArgumentError, + "timeout must be nil or a finite number between 0 and 10 years" + end + end + + @fork_hook_timeout = timeout + unless @fork_hooks_installed + Process.singleton_class.prepend(ForkHooks) + @fork_hooks_installed = true + end + true + end + end + class ScriptError < EvalError def initialize(message) message, *@frames = message.split("\n") diff --git a/test/mini_racer_test.rb b/test/mini_racer_test.rb index e9ff5c5f..db8985e0 100644 --- a/test/mini_racer_test.rb +++ b/test/mini_racer_test.rb @@ -91,6 +91,263 @@ def test_that_it_has_a_version_number refute_nil ::MiniRacer::VERSION end + def test_pause_blocks_new_operations_until_resume + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + context = MiniRacer::Context.new + paused = false + result = Queue.new + thread = nil + + MiniRacer.pause(timeout: 1) + paused = true + thread = Thread.new { result << context.eval("1 + 1") } + sleep 0.1 + + assert thread.alive?, "eval should wait while MiniRacer is paused" + assert result.empty?, "eval should not finish while MiniRacer is paused" + + MiniRacer.resume + paused = false + + assert_equal 2, result.pop + assert thread.join(3), "eval did not finish after MiniRacer.resume" + ensure + begin + MiniRacer.resume if paused + rescue StandardError + nil + end + thread&.kill if thread&.alive? + thread&.join + end + + def test_pause_times_out_while_operation_is_active + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + started_r, started_w = IO.pipe + release_r, release_w = IO.pipe + context = MiniRacer::Context.new + context.attach( + "block", + proc do + started_w.write("x") + started_w.flush + release_r.read(1) + 42 + end + ) + + worker = Thread.new { context.eval("block()") } + started_r.read(1) + + paused = false + begin + MiniRacer.pause(timeout: 0.05) + paused = true + flunk "MiniRacer.pause should time out while an operation is active" + rescue MiniRacer::PauseTimeoutError + # expected + ensure + begin + MiniRacer.resume if paused + rescue StandardError + nil + end + end + + release_w.write("x") + release_w.flush + assert worker.join(3), "active eval did not finish" + assert_equal 2, context.eval("1 + 1") + ensure + begin + release_w&.write("x") + rescue StandardError + nil + end + worker&.kill if worker&.alive? + worker&.join + [started_r, started_w, release_r, release_w].each do |io| + begin + io&.close + rescue StandardError + nil + end + end + end + + def test_pause_block_resumes_after_exception + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + assert_raises(::RuntimeError) do + MiniRacer.pause(timeout: 1) { raise "boom" } + end + + assert_equal 2, MiniRacer::Context.new.eval("1 + 1") + end + + def test_pause_allows_reentrant_callback_work_to_drain + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + started_r, started_w = IO.pipe + release_r, release_w = IO.pipe + context = MiniRacer::Context.new + context.attach( + "reenter", + proc do + started_w.write("x") + started_w.flush + release_r.read(1) + context.eval("20 + 22") + end + ) + + worker = Thread.new { context.eval("reenter()") } + started_r.read(1) + + paused = false + pause_thread = + Thread.new do + MiniRacer.pause(timeout: 1) + paused = true + end + + sleep 0.1 + release_w.write("x") + release_w.flush + + assert pause_thread.join(3), + "pause did not wait for reentrant callback work to drain" + assert worker.join(3), "eval did not finish" + assert_equal 42, worker.value + ensure + begin + MiniRacer.resume if paused + rescue StandardError + nil + end + begin + release_w&.write("x") + rescue StandardError + nil + end + worker&.kill if worker&.alive? + pause_thread&.kill if pause_thread&.alive? + worker&.join + pause_thread&.join + [started_r, started_w, release_r, release_w].each do |io| + begin + io&.close + rescue StandardError + nil + end + end + end + + def test_pause_block_preserves_original_exception_if_block_resumes + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + error = + assert_raises(::RuntimeError) do + MiniRacer.pause(timeout: 1) do + MiniRacer.resume + raise "original" + end + end + assert_equal "original", error.message + end + + def test_pause_is_nested_until_all_resumes_run + thread = nil + paused = 0 + + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + context = MiniRacer::Context.new + result = Queue.new + + MiniRacer.pause(timeout: 1) + paused += 1 + MiniRacer.pause(timeout: 1) + paused += 1 + + thread = Thread.new { result << context.eval("20 + 22") } + sleep 0.1 + assert thread.alive?, "eval should wait while nested pause is held" + + MiniRacer.resume + paused -= 1 + sleep 0.1 + assert thread.alive?, "eval should still wait until the outer pause resumes" + + MiniRacer.resume + paused -= 1 + assert_equal 42, result.pop + assert thread.join(3), "eval did not finish after outer resume" + ensure + paused.times do + begin + MiniRacer.resume + rescue StandardError + nil + end + end + thread&.kill if thread&.alive? + thread&.join + end + + def test_resume_without_pause_raises + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + assert_raises(MiniRacer::RuntimeError) { MiniRacer.resume } + end + + def test_pause_rejects_invalid_timeout + unless MiniRacer.respond_to?(:pause) + skip "MiniRacer.pause is only implemented for CRuby" + end + + assert_raises(ArgumentError) { MiniRacer.pause(timeout: -1) } + assert_raises(ArgumentError) { MiniRacer.pause(timeout: Float::NAN) } + assert_raises(ArgumentError) { MiniRacer.pause(timeout: Float::INFINITY) } + assert_raises(ArgumentError) { MiniRacer.pause(timeout: "5") } + assert_raises(ArgumentError) { MiniRacer.pause(timeout: 1e300) } + end + + def test_fork_hooks_reject_invalid_timeout + unless MiniRacer.respond_to?(:install_fork_hooks!) && + Process.respond_to?(:_fork, true) + skip "MiniRacer.install_fork_hooks! is only implemented for CRuby" + end + + assert_raises(ArgumentError) { MiniRacer.install_fork_hooks!(timeout: -1) } + assert_raises(ArgumentError) do + MiniRacer.install_fork_hooks!(timeout: Float::NAN) + end + assert_raises(ArgumentError) do + MiniRacer.install_fork_hooks!(timeout: Float::INFINITY) + end + assert_raises(ArgumentError) { MiniRacer.install_fork_hooks!(timeout: "5") } + assert_raises(ArgumentError) do + MiniRacer.install_fork_hooks!(timeout: 1e300) + end + end + def test_types context = MiniRacer::Context.new assert_equal 2, context.eval("2") @@ -1610,6 +1867,8 @@ def test_termination_exception sleep 1.5 a.kill b.kill + assert a.join(3), "stop thread did not stop" + assert b.join(3), "heap stats thread did not stop" end def test_ruby_exception diff --git a/test/single_threaded_test.rb b/test/single_threaded_test.rb index 9340f831..1be87a6d 100644 --- a/test/single_threaded_test.rb +++ b/test/single_threaded_test.rb @@ -371,4 +371,113 @@ def test_fork_after_low_memory_notification raise "child failed with status #{status.inspect}" unless status.success? RUBY end + + def test_fork_hook_pauses_and_recovers_child + assert_single_threaded_script <<~'RUBY' + exit 0 unless Process.respond_to?(:fork) + + MiniRacer.install_fork_hooks!(timeout: 1) + + context = MiniRacer::Context.new + context.eval("var answer = 41") + context.eval("answer += 1") + + pid = fork do + Thread.new do + sleep 3 + warn "child timed out" + exit! 99 + end + + exit!(context.eval("answer") == 42 ? 0 : 1) + end + _, status = Process.wait2(pid) + raise "child failed with status #{status.inspect}" unless status.success? + raise "parent context broke" unless context.eval("answer") == 42 + RUBY + end + + def test_manual_pause_resume_around_fork + assert_single_threaded_script <<~'RUBY' + exit 0 unless Process.respond_to?(:fork) + + context = MiniRacer::Context.new + context.eval("var answer = 41") + context.eval("answer += 1") + + MiniRacer.pause(timeout: 1) + begin + pid = fork do + Thread.new do + sleep 3 + warn "child timed out" + exit! 99 + end + + MiniRacer.resume + exit!(context.eval("answer") == 42 ? 0 : 1) + end + ensure + MiniRacer.resume + end + + _, status = Process.wait2(pid) + raise "child failed with status #{status.inspect}" unless status.success? + raise "parent context broke" unless context.eval("answer") == 42 + RUBY + end + + def test_fork_hook_times_out_instead_of_forking_while_busy + assert_single_threaded_script <<~'RUBY' + exit 0 unless Process.respond_to?(:fork) + + MiniRacer.install_fork_hooks!(timeout: 0.05) + + started_r, started_w = IO.pipe + release_r, release_w = IO.pipe + context = MiniRacer::Context.new + context.attach("block", proc do + started_w.write("x") + started_w.flush + release_r.read(1) + 42 + end) + + worker = Thread.new { context.eval("block()") } + started_r.read(1) + + begin + fork { exit! 88 } + raise "expected pause timeout" + rescue MiniRacer::PauseTimeoutError + end + + release_w.write("x") + release_w.flush + raise "worker did not finish" unless worker.join(3) + raise "context should still be usable" unless context.eval("1 + 1") == 2 + RUBY + end + + def test_fork_hook_rejects_fork_from_active_callback + assert_single_threaded_script <<~'RUBY' + exit 0 unless Process.respond_to?(:fork) + + MiniRacer.install_fork_hooks!(timeout: 1) + + context = MiniRacer::Context.new + context.attach("try_fork", proc do + begin + fork { exit! 88 } + "forked" + rescue MiniRacer::RuntimeError => e + raise unless e.message.include?("cannot pause") + "rejected" + end + end) + + raise "fork was not rejected" unless context.eval("try_fork()") == "rejected" + raise "context should still be usable" unless context.eval("1 + 1") == 2 + RUBY + end end