Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions Classes/Job/ConfigurationFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Neos\Flow\Configuration\ConfigurationManager;
use Neos\Flow\Core\Booting\Scripts;
use Neos\Utility\ObjectAccess;
use Netlogix\JobQueue\Pool\ProcessFactory;

use function defined;
use function preg_replace;
use function rtrim;
Expand Down Expand Up @@ -59,16 +61,7 @@ public function buildJobConfiguration(string $queueName): array
ConfigurationManager::CONFIGURATION_TYPE_SETTINGS,
'Neos.Flow'
);

$command = Scripts::buildPhpCommand(
$flowSettings
);
$command .= sprintf(
' %s %s --queue=%s',
escapeshellarg(\FLOW_PATH_FLOW . 'Scripts/flow.php'),
escapeshellarg('flowpack.jobqueue.common:job:execute'),
escapeshellarg($queueName)
);
$command = (new ProcessFactory)->buildSubprocessCommand();

$workerPool = (array)$this->configurationManager->getConfiguration(
ConfigurationManager::CONFIGURATION_TYPE_SETTINGS,
Expand Down
5 changes: 5 additions & 0 deletions Classes/Lock.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public function __construct(int $numberOfWorkers, string $lockFileDirectory)
}
}

/**
* @template T
* @param callable(): T $run
* @return T
*/
public function run(callable $run)
{
$this->findSlot();
Expand Down
105 changes: 67 additions & 38 deletions Classes/Loop.php
Original file line number Diff line number Diff line change
@@ -1,57 +1,86 @@
<?php

declare(strict_types=1);

namespace Netlogix\JobQueue\FastRabbit;

use Neos\Flow\Annotations as Flow;
use Netlogix\JobQueue\Pool\Pool;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use t3n\JobQueue\RabbitMQ\Queue\RabbitQueue;

/**
* @Flow\Proxy(false)
*/
use function count;
use function max;

#[Flow\Proxy(false)]
final class Loop
{
protected $queue;

/**
* Unix timestamp after which the Loop should exit
*
* @var int|null
*/
protected $exitAfterTimestamp;

/**
* Timeout in seconds when waiting for new messages
*
* @var int|null
*/
protected $timeout;

/**
* @param RabbitQueue $queue The Queue to watch
* @param int $exitAfter Time in seconds after which the loop should exit
*/
public function __construct(RabbitQueue $queue, int $exitAfter = 0)
{
$this->queue = $queue;
$this->exitAfterTimestamp = $exitAfter > 0 ? time() + $exitAfter : null;
$this->timeout = $exitAfter > 0 ? $exitAfter : null;
public const int SIX_HOURS_IN_SECONDS = 21600;

public function __construct(
/**
* The Queue to watch
*/
protected RabbitQueue $queue,

protected readonly Pool $poolObject,

/**
* Time in seconds after which the loop should exit
*/
protected readonly ?int $exitAfter
) {
}

public function runMessagesOnWorker(Worker $worker)
{
$worker->prepare();
do {
try {
$message = $this->queue->waitAndReserve($this->timeout);
$worker->executeMessage($message);
} catch (AMQPTimeoutException $e) {
}
$this
->poolObject
->runLoop(function (Pool $pool) use ($worker) {
$worker->prepare();

$runDueJobs = $pool->eventLoop->addPeriodicTimer(
interval: 0.01,
callback: fn () => $this->runDueJob($pool, $worker)
);

if ($this->exitAfterTimestamp !== null && time() >= $this->exitAfterTimestamp) {
break;
if ($this->exitAfter) {
$pool->eventLoop->addTimer(
interval: max($this->exitAfter, 1),
callback: function () use ($pool, $runDueJobs) {
$pool->eventLoop->cancelTimer($runDueJobs);
$checkForPoolToClear = $pool->eventLoop->addPeriodicTimer(
interval: 1,
callback: function () use ($pool, &$checkForPoolToClear) {
if (count($pool) === 0) {
$pool->eventLoop->cancelTimer($checkForPoolToClear);
$pool->eventLoop->stop();
}
}
);
}
);
}
});
}

private function runDueJob(Pool $pool, Worker $worker): void
{
/**
* No parallel execution of multiple messages here, create multiple
* fast rabbit instances connected instead.
* Counting the running instances in the pool only prevents the
* pool from spawning too many workers.
*/
if (count($pool)) {
return;
}
try {
$message = $this->queue->waitAndReserve(10);
if ($message) {
$pool->eventLoop->futureTick(fn () => $worker->executeMessage($message));
}
} while (true);
} catch (AMQPTimeoutException $e) {
}
}
}
44 changes: 44 additions & 0 deletions Classes/Package.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Netlogix\JobQueue\FastRabbit;

use Neos\Flow\Core\Bootstrap;
use Neos\Flow\Monitor\FileMonitor;
use Neos\Flow\ObjectManagement\CompileTimeObjectManager;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Neos\Flow\ObjectManagement\Proxy\Compiler;
use Neos\Flow\Package\Package as BasePackage;
use Neos\Flow\SignalSlot\Dispatcher;
use Netlogix\JobQueue\FastRabbit\SingletonPreloading\AllSingletonsPreloader;

final class Package extends BasePackage
{
public function boot(Bootstrap $bootstrap): void
{
$dispatcher = $bootstrap->getSignalSlotDispatcher();
assert($dispatcher instanceof Dispatcher);

/**
* @see Compiler::compiledClasses()
* @see FileMonitor::emitFilesHaveChanged()
* @see AllSingletonsPreloader::flush()
*/
$dispatcher->connect(
signalClassName: FileMonitor::class,
signalName: 'filesHaveChanged',
slotClassNameOrObject: fn () => static::flushSingletonsPreloaderCache($bootstrap->getObjectManager())
);
}

private function flushSingletonsPreloaderCache(ObjectManagerInterface $objectManager): void
{
if ($objectManager instanceof CompileTimeObjectManager) {
return;
}
$objectManager
->get(AllSingletonsPreloader::CACHE)
->flush();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace Netlogix\JobQueue\FastRabbit\PreventLoggingOfMissingInput;

use Flowpack\JobQueue\Common\Command\JobCommandController;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Aop\JoinPointInterface;
use Neos\Flow\Cli\Exception\StopCommandException;
use Neos\Flow\Reflection\ClassReflection;
use Symfony\Component\Console\Exception\MissingInputException;

/**
* There might be orphaned worker processes when the parent process
* of FastRabbit loop restarts due to its max wait time.
*
* Those are still waiting for message identifiers, which can never
* be fulfilled once their STDIN closes, so a MissingInputException
* gets thrown.
*
* This is expected and will happen every 6 hours due to the current
* configuration. We don't need those exceptions tracked. So redirect
* to StopCommandException instead.
*/
#[Flow\Aspect]
#[Flow\Proxy(false)]
class JobCommandInitializationAspect
{
#[Flow\Around('within(' . JobCommandController::class . ') && method(.*->mapRequestArgumentsToControllerArguments())')]
public function preventLoggingOfMissingInputExceptions(JoinPointInterface $joinPoint): void
{
$jobCommandController = $joinPoint->getProxy();
assert($jobCommandController instanceof JobCommandController);

$reflection = new ClassReflection($jobCommandController);

$commandMethodName = $reflection
->getProperty('commandMethodName')
->getValue($jobCommandController);

if ($commandMethodName !== 'executeCommand') {
$joinPoint->getAdviceChain()->proceed($joinPoint);
return;
}

try {
$joinPoint->getAdviceChain()->proceed($joinPoint);
} catch (MissingInputException $e) {
throw new StopCommandException();
}
}
}
Loading