Skip to content

ThreadPool: add bootloader, coroutine-per-task mode, and concurrency limit #107

@EdmondDantes

Description

@EdmondDantes

Motivation

ThreadPool is currently limited to CPU-bound synchronous tasks. To replace the spawn_thread + TaskGroup + Channel pattern (used in projects like queue workers), four additions are needed.

Proposed Changes

1. bootloader parameter

Allow passing a bootloader closure to ThreadPool, the same way spawn_thread() supports it. This is required for autoloading user classes inside worker threads.

$pool = new ThreadPool(
    workers: 4,
    bootloader: static function (): void {
        require_once '/path/to/vendor/autoload.php';
    },
);

2. coroutine: bool flag

When enabled, each submitted task runs inside a coroutine instead of synchronously. This unlocks async/IO operations (spawn(), await(), channels, etc.) inside worker tasks — making ThreadPool viable for IO-bound workloads.

$pool = new ThreadPool(workers: 4, coroutine: true);

$future = $pool->submit(function () {
    $response = await(httpGet('https://example.com'));
    return $response->status;
});

Without this flag, IO inside a worker blocks the entire thread.

3. concurrency: int parameter (for coroutine mode)

When coroutine: true, limit how many coroutines can run concurrently per worker thread. Without this, an unbounded number of coroutines could be spawned per worker under high load.

$pool = new ThreadPool(workers: 4, coroutine: true, concurrency: 10);
// total: 4 × 10 = 40 concurrent IO tasks

This is the direct equivalent of TaskGroup(concurrency: N) inside a long-lived thread.

4. Auto-detection of optimal worker count

When workers is 0 (or omitted), the pool automatically selects the number of workers based on available CPU cores. This removes the burden of manual tuning and provides a sensible default for most workloads.

// Auto: uses CPU core count
$pool = new ThreadPool();

// Explicit override still possible
$pool = new ThreadPool(workers: 8);

For CPU-bound tasks the optimal count is typically nCPU. For IO-bound tasks with coroutine: true a higher value may be appropriate, so the auto value could be exposed via a helper:

$cores = ThreadPool::getCpuCount(); // e.g. 8
$pool  = new ThreadPool(workers: $cores * 2, coroutine: true);

Use Case

A queue worker that processes IO-bound messages (HTTP webhooks, DB writes, email sending) currently needs:

// Current: manual thread + channel + TaskGroup setup
$jobChannel    = new ThreadChannel($capacity);
$resultChannel = new ThreadChannel($capacity);

for ($i = 0; $i < $threads; $i++) {
    spawn_thread(fn() => (new WorkerThread($jobChannel, $resultChannel, $handlers, $concurrency))->run());
}

With the proposed changes this simplifies to:

$pool = new ThreadPool(coroutine: true, concurrency: 10, bootloader: $autoloader);

$future = $pool->submit(fn() => $handler($message));
await($future) ? $transport->ack($envelope) : $transport->reject($envelope);

Summary

Parameter Type Default Description
workers int 0 (auto) Number of worker threads; 0 = auto-detect from CPU count
bootloader ?Closure null Runs once per worker thread on startup
coroutine bool false Run each task inside a coroutine
concurrency int 0 (unlimited) Max concurrent coroutines per worker (only when coroutine: true)

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

Status

In Progress

Relationships

None yet

Development

No branches or pull requests

Issue actions