-
-
Notifications
You must be signed in to change notification settings - Fork 146
Description
Description
Abortable tasks in Piscina are intended to never share threads with other tasks. If a worker runs an abortable task and that task is aborted, the worker is terminated intentionally to ensure a clean stop of the user task. However, if that abortable task shares its worker with other concurrent tasks, aborting the abortable task will also terminate those unrelated tasks — leading to unexpected errors and data loss.
Root Cause
The scheduling and load-balancer logic checks the isRunningAbortableTask flag on workers before deciding whether to schedule additional tasks on them. The WorkerInfo.isRunningAbortableTask() method was implemented as:
isRunningAbortableTask() : boolean {
// If there are abortable tasks, we are running one at most per Worker.
if (this.taskInfos.size !== 1) return false;
const [[, task]] = this.taskInfos;
return task.abortSignal != null;
}This implementation can return false for a worker that has more than one task (size > 1). If any of those multiple tasks is an abortable task, the check doesn't detect it. Consequently, a load-balancer (like LeastBusyBalancer) may schedule additional tasks on that worker — leading to abortable and non-abortable tasks sharing a worker.
Then, when someone aborts the abortable task, the worker is terminated (_removeWorker), causing all tasks executing on that worker (including unrelated tasks) to be terminated — a severe bug.
Affected Files
src/worker_pool/index.ts—WorkerInfo.isRunningAbortableTask()methodsrc/worker_pool/balancer/index.ts— Load balancer logic that relies on the above flagsrc/index.ts— Scheduling and worker removal logic
Fix
Change isRunningAbortableTask() to return true if any of the tasks currently running on this worker contain an abort signal:
- if (this.taskInfos.size !== 1) return false;
- const [[, task]] = this.taskInfos;
- return task.abortSignal != null;
+ for (const t of this.taskInfos.values()) {
+ if (t.abortSignal != null) return true;
+ }
+ return false;- Unexpected termination of unrelated tasks due to aborts is a reliability and data integrity problem for applications that use Piscina in production.
- It's a regression risk: a single aborted task could cause multiple unrelated tasks to fail.
- It violates the documented guarantee and user expectations.
Proof-of-Concept Code (pocs/abort-sharing-poc.js)
#!/usr/bin/env node
'use strict';
// Proof-of-concept to demonstrate abortable tasks sharing worker threads and
// producing unintended termination of unrelated tasks when an abort occurs.
//
// Usage:
// node --enable-source-maps --import=tsx pocs/abort-sharing-poc.js [--force-broken]
//
// If `--force-broken` is given, the pool will be configured with a broken
// balancer that ignores the 'isRunningAbortableTask' flag (simulating the
// pre-fix behavior) and demonstrates the bug.
const { resolve } = require('node:path');
const { EventEmitter } = require('node:events');
const Piscina = require('../'); // use package root
function log(msg, ...args) {
console.log(new Date().toISOString(), msg, ...args);
}
(async function main() {
const forceBroken = process.argv.includes('--force-broken');
const poolOptions = {
filename: resolve(__dirname, '..', 'test', 'fixtures', 'wait-for-notify.js'),
maxThreads: 1,
concurrentTasksPerWorker: 2,
// Expose a balancer so we can simulate the broken behaviour if requested
};
if (forceBroken) {
log('NOTE: Using broken balancer to simulate pre-fix behavior');
poolOptions.loadBalancer = (_task, workers) => {
// This balancer always returns the first worker ignoring abortable tasks.
// That will cause abortable tasks to be scheduled on occupied workers,
// reproducing the issue.
return workers[0] ?? null;
};
}
const pool = new Piscina(poolOptions);
let errors = [];
pool.on('error', (err) => {
log('[pool error]', err && err.stack ? err.stack.split('\n')[0] : err);
});
try {
log('Starting two long-running tasks (task1, task2)');
const buf1 = new Int32Array(new SharedArrayBuffer(4));
const buf2 = new Int32Array(new SharedArrayBuffer(4));
const task1Promise = pool.run(buf1).then(() => log('task1 completed')).catch(e => { errors.push(['task1', e]); log('task1 error', e && e.message); });
const task2Promise = pool.run(buf2).then(() => log('task2 completed')).catch(e => { errors.push(['task2', e]); log('task2 error', e && e.message); });
// Start an abortable task (this should stay queued, not scheduled)
log('Submit abortable task (task3)');
const buf3 = new Int32Array(new SharedArrayBuffer(4));
const ee = new EventEmitter();
const task3Promise = pool.run(buf3, { signal: ee }).then(() => log('task3 completed')).catch(e => { errors.push(['task3', e]); log('task3 error', e && e.message); });
const queueSize = pool.queueSize;
log('Queue size after submitting tasks: %d', queueSize);
if (queueSize !== 1 && forceBroken === false) {
log('Unexpected queue size: expected 1 (abortable task queued), got %d', queueSize);
}
// Abort the abortable task
log('Aborting the abortable task immediately');
ee.emit('abort');
// Wake up the first two tasks so they complete normally.
log('Notifying task1 and task2 to complete');
Atomics.store(buf1, 0, 1);
Atomics.notify(buf1, 0, 1);
Atomics.store(buf2, 0, 1);
Atomics.notify(buf2, 0, 1);
await Promise.all([task1Promise, task2Promise, task3Promise]);
// Now inspect errors to determine whether abort of task3 killed task1 or task2
const killed = errors.filter(([name, err]) => err && /Terminating worker thread/.test(err.message));
if (killed.length > 0) {
log('Bug reproduced: abort caused other tasks to be terminated!');
console.table(killed.map(k => ({ task: k[0], message: k[1] && k[1].message })));
process.exit(2);
}
log('No unintended terminations observed. This environment either contains the fix or does not reproduce the bug.');
if (forceBroken) {
log('Force-broken mode: if no terminations observed, environment may have other failsafes or worker termination behavior is different on this platform.');
}
} catch (err) {
log('PoC error', err && err.stack ? err.stack.split('\n')[0] : err);
process.exit(1);
} finally {
pool.destroy();
}
})();Regression Test (test/issue-abort-sharing.test.ts)
import { test } from 'node:test';
import assert from 'node:assert/strict';
import { EventEmitter } from 'node:events';
import Piscina from '..';
import { resolve } from 'node:path';
// This test ensures that abortable tasks cannot share threads and that aborting
// an abortable task does not cause other running tasks on the same worker to be
// terminated. We create a pool with a single worker and concurrentTasksPerWorker
// > 1 and ensure that a queued abortable task doesn't get scheduled onto the
// same worker as other tasks. This test reproduces a bug where an abortable task
// might share a worker and when aborted would terminate unrelated tasks.
test('abortable tasks will not share workers and will not cause other tasks to terminate', async () => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/wait-for-notify.js'),
maxThreads: 1,
concurrentTasksPerWorker: 2
});
const bufs = [
new Int32Array(new SharedArrayBuffer(4)),
new Int32Array(new SharedArrayBuffer(4)),
new Int32Array(new SharedArrayBuffer(4))
];
// Start two long-running tasks which should both be allowed to run
// concurrently on the single worker due to concurrentTasksPerWorker=2.
const task1 = pool.run(bufs[0]);
const task2 = pool.run(bufs[1]);
// Start an abortable task — this should not be scheduled on the same
// worker that is currently running tasks, and must stay queued.
const abortEmitter = new EventEmitter();
const abortable = pool.run(bufs[2], { signal: abortEmitter });
// The abortable task should be queued and not running
assert.strictEqual(pool.queueSize, 1);
// Abort the abortable task
abortEmitter.emit('abort');
// Wake up the first two tasks so they complete normally.
Atomics.store(bufs[0], 0, 1);
Atomics.notify(bufs[0], 0, 1);
Atomics.store(bufs[1], 0, 1);
Atomics.notify(bufs[1], 0, 1);
// Both should complete without being terminated by the abort of the aborted task
await task1;
await task2;
// And the abortable must reject
await assert.rejects(abortable, /The task has been aborted/);
});