diff --git a/src/Activity.php b/src/Activity.php index a26508a7..44e116b9 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void $this->storedWorkflow, $throwable, $workflow->connection(), - $workflow->queue() + $workflow->queue(), + $this::class ); } diff --git a/src/ActivityStub.php b/src/ActivityStub.php index ed8f2ca2..2bc9b259 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -28,32 +28,66 @@ public static function async(callable $callback): PromiseInterface public static function make($activity, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); + $result = null; - $log = $context->storedWorkflow->findLogByIndex($context->index); + while (true) { + $log = $context->storedWorkflow->findLogByIndex($context->index); + $result = null; - if (WorkflowStub::faked()) { - $mocks = WorkflowStub::mocks(); + if (WorkflowStub::faked()) { + $mocks = WorkflowStub::mocks(); - if (! $log && array_key_exists($activity, $mocks)) { - $result = $mocks[$activity]; + if (! $log && array_key_exists($activity, $mocks)) { + $mockedResult = $mocks[$activity]; - $log = $context->storedWorkflow->createLog([ - 'index' => $context->index, - 'now' => $context->now, - 'class' => $activity, - 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result - ), - ]); + $log = $context->storedWorkflow->createLog([ + 'index' => $context->index, + 'now' => $context->now, + 'class' => $activity, + 'result' => Serializer::serialize( + is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult + ), + ]); - WorkflowStub::recordDispatched($activity, $arguments); + WorkflowStub::recordDispatched($activity, $arguments); + } + } + + if (! $log) { + break; } + + if ($log->class !== Exception::class) { + break; + } + + $result = Serializer::unserialize($log->result); + + if (! self::isForeignExceptionResult($result, $activity)) { + break; + } + + ++$context->index; + WorkflowStub::setContext($context); } if ($log) { + $result ??= Serializer::unserialize($log->result); + + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $activity + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); - $result = Serializer::unserialize($log->result); if ( is_array($result) && array_key_exists('class', $result) && @@ -74,11 +108,25 @@ public static function make($activity, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + WorkflowStub::markProbePendingBeforeMatch(); + ++$context->index; + WorkflowStub::setContext($context); + return (new Deferred())->promise(); + } + $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); + } + + private static function isForeignExceptionResult(mixed $result, string $activity): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $activity; } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 896237a3..b2f2d880 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -22,32 +22,66 @@ public static function all(iterable $promises): PromiseInterface public static function make($workflow, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); + $result = null; - $log = $context->storedWorkflow->findLogByIndex($context->index); + while (true) { + $log = $context->storedWorkflow->findLogByIndex($context->index); + $result = null; - if (WorkflowStub::faked()) { - $mocks = WorkflowStub::mocks(); + if (WorkflowStub::faked()) { + $mocks = WorkflowStub::mocks(); - if (! $log && array_key_exists($workflow, $mocks)) { - $result = $mocks[$workflow]; + if (! $log && array_key_exists($workflow, $mocks)) { + $mockedResult = $mocks[$workflow]; - $log = $context->storedWorkflow->createLog([ - 'index' => $context->index, - 'now' => $context->now, - 'class' => $workflow, - 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result - ), - ]); + $log = $context->storedWorkflow->createLog([ + 'index' => $context->index, + 'now' => $context->now, + 'class' => $workflow, + 'result' => Serializer::serialize( + is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult + ), + ]); - WorkflowStub::recordDispatched($workflow, $arguments); + WorkflowStub::recordDispatched($workflow, $arguments); + } + } + + if (! $log) { + break; } + + if ($log->class !== Exception::class) { + break; + } + + $result = Serializer::unserialize($log->result); + + if (! self::isForeignExceptionResult($result, $workflow)) { + break; + } + + ++$context->index; + WorkflowStub::setContext($context); } if ($log) { + $result ??= Serializer::unserialize($log->result); + + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $workflow + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); - $result = Serializer::unserialize($log->result); if ( is_array($result) && array_key_exists('class', $result) @@ -67,6 +101,13 @@ public static function make($workflow, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + WorkflowStub::markProbePendingBeforeMatch(); + ++$context->index; + WorkflowStub::setContext($context); + return (new Deferred())->promise(); + } + if (! $context->replaying) { $storedChildWorkflow = $context->storedWorkflow->children() ->wherePivot('parent_index', $context->index) @@ -94,7 +135,14 @@ public static function make($workflow, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); + } + + private static function isForeignExceptionResult(mixed $result, string $workflow): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $workflow; } } diff --git a/src/Exception.php b/src/Exception.php index de177e51..e5e2bea4 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -10,9 +10,15 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Support\Carbon; +use Illuminate\Support\Facades\Cache; +use ReflectionClass; +use Throwable; use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; +use Workflow\Models\StoredWorkflowLog; +use Workflow\Serializers\Serializer; final class Exception implements ShouldBeEncrypted, ShouldQueue { @@ -21,6 +27,14 @@ final class Exception implements ShouldBeEncrypted, ShouldQueue use Queueable; use SerializesModels; + private const LOCK_RETRY_DELAY = 1; + + private const PROBE_SKIP = 'skip'; + + private const PROBE_PERSIST = 'persist'; + + private const PROBE_RETRY = 'retry'; + public ?string $key = null; public $tries = PHP_INT_MAX; @@ -35,7 +49,8 @@ public function __construct( public StoredWorkflow $storedWorkflow, public $exception, $connection = null, - $queue = null + $queue = null, + public ?string $sourceClass = null ) { $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( @@ -48,18 +63,36 @@ public function __construct( public function handle() { - $workflow = $this->storedWorkflow->toWorkflow(); + $lock = Cache::lock('laravel-workflow-exception:' . $this->storedWorkflow->id, 15); + + if (! $lock->get()) { + $this->release(self::LOCK_RETRY_DELAY); + + return; + } try { - if ($this->storedWorkflow->hasLogByIndex($this->index)) { - $workflow->resume(); - } elseif (! $this->storedWorkflow->logs()->where('class', self::class)->exists()) { - $workflow->next($this->index, $this->now, self::class, $this->exception); - } - } catch (TransitionNotFound) { - if ($workflow->running()) { - $this->release(); + $workflow = $this->storedWorkflow->toWorkflow(); + + try { + if ($this->storedWorkflow->hasLogByIndex($this->index)) { + $workflow->resume(); + } else { + $probeDecision = $this->probeReplayDecision(); + + if ($probeDecision === self::PROBE_PERSIST) { + $workflow->next($this->index, $this->now, self::class, $this->exceptionPayload()); + } elseif ($probeDecision === self::PROBE_RETRY) { + $this->release(self::LOCK_RETRY_DELAY); + } + } + } catch (TransitionNotFound) { + if ($workflow->running()) { + $this->release(); + } } + } finally { + $lock->release(); } } @@ -68,10 +101,114 @@ public function middleware() return [ new WithoutOverlappingMiddleware( $this->storedWorkflow->id, - WithoutOverlappingMiddleware::ACTIVITY, - 0, + WithoutOverlappingMiddleware::WORKFLOW, + self::LOCK_RETRY_DELAY, 15 ), ]; } + + private function probeReplayDecision(): string + { + $workflowClass = $this->storedWorkflow->class; + + if (! is_string($workflowClass) || $workflowClass === '') { + return self::PROBE_PERSIST; + } + + try { + if (! class_exists($workflowClass) || ! is_subclass_of($workflowClass, Workflow::class)) { + return self::PROBE_PERSIST; + } + + if (! (new ReflectionClass($workflowClass))->isInstantiable()) { + return self::PROBE_PERSIST; + } + } catch (Throwable) { + return self::PROBE_PERSIST; + } + + $previousContext = WorkflowStub::getContext(); + $probeDecision = self::PROBE_SKIP; + + try { + try { + $probeNow = Carbon::parse($this->now); + } catch (Throwable) { + return self::PROBE_PERSIST; + } + + $tentativeWorkflow = $this->createTentativeWorkflowState(); + $workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments()); + $workflow->replaying = true; + + WorkflowStub::setContext([ + 'storedWorkflow' => $tentativeWorkflow, + 'index' => 0, + 'now' => $probeNow, + 'replaying' => true, + 'probing' => true, + 'probeIndex' => $this->index, + 'probeClass' => $this->sourceClass, + 'probeMatched' => false, + 'probePendingBeforeMatch' => false, + ]); + + try { + $workflow->handle(); + } catch (Throwable) { + // The replay path may still throw; we only care whether it matched this tentative log. + } + + if (WorkflowStub::probeMatched()) { + $probeDecision = WorkflowStub::probePendingBeforeMatch() + ? self::PROBE_RETRY + : self::PROBE_PERSIST; + } + } finally { + WorkflowStub::setContext($previousContext); + } + + return $probeDecision; + } + + private function createTentativeWorkflowState(): StoredWorkflow + { + $storedWorkflowClass = $this->storedWorkflow::class; + + /** @var StoredWorkflow $tentativeWorkflow */ + $tentativeWorkflow = $storedWorkflowClass::query() + ->findOrFail($this->storedWorkflow->id); + + $tentativeWorkflow->loadMissing(['logs', 'signals']); + + /** @var StoredWorkflowLog $tentativeLog */ + $tentativeLog = $tentativeWorkflow->logs() + ->make([ + 'index' => $this->index, + 'now' => $this->now, + 'class' => self::class, + 'result' => Serializer::serialize($this->exceptionPayload()), + ]); + + $tentativeWorkflow->setRelation( + 'logs', + $tentativeWorkflow->getRelation('logs') + ->push($tentativeLog) + ->values() + ); + + return $tentativeWorkflow; + } + + private function exceptionPayload() + { + if (! is_array($this->exception) || $this->sourceClass === null) { + return $this->exception; + } + + return array_merge($this->exception, [ + 'sourceClass' => $this->sourceClass, + ]); + } } diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 3bd5ae17..c3fc138a 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -22,6 +22,12 @@ public static function await($condition): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + self::markProbePendingBeforeMatch(); + ++self::$context->index; + return (new Deferred())->promise(); + } + $result = $condition(); if ($result === true) { @@ -49,7 +55,6 @@ public static function await($condition): PromiseInterface } ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index e4a4db0c..763f7a4e 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Serializers\Serializer; @@ -20,6 +21,12 @@ public static function sideEffect($callable): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + self::markProbePendingBeforeMatch(); + ++self::$context->index; + return (new Deferred())->promise(); + } + $result = $callable(); if (! self::$context->replaying) { diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index 538f9502..35c133bd 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -41,6 +41,12 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa $when = self::$context->now->copy() ->addSeconds($seconds); + if (self::isProbing()) { + self::markProbePendingBeforeMatch(); + ++self::$context->index; + return (new Deferred())->promise(); + } + if (! self::$context->replaying) { $timer = self::$context->storedWorkflow->createTimer([ 'index' => self::$context->index, @@ -48,8 +54,7 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa ]); } else { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } @@ -94,8 +99,11 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa )->delay($delay); } + if (self::isProbing()) { + self::markProbePendingBeforeMatch(); + } + ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index d8fd011f..c9655a7e 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Exceptions\VersionNotSupportedException; @@ -33,6 +34,12 @@ public static function getVersion( return resolve($version); } + if (self::isProbing()) { + self::markProbePendingBeforeMatch(); + ++self::$context->index; + return (new Deferred())->promise(); + } + $version = $maxSupported; if (! self::$context->replaying) { diff --git a/src/Workflow.php b/src/Workflow.php index 6f71ff4d..eebefb91 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -213,7 +213,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); } - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -233,7 +233,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -313,4 +313,19 @@ public function handle(): void } } } + + private function setContext(array $context): void + { + $existingContext = WorkflowStub::getContext(); + + if (property_exists($existingContext, 'probing') && $existingContext->probing) { + $context['probing'] = true; + $context['probeIndex'] = $existingContext->probeIndex ?? null; + $context['probeClass'] = $existingContext->probeClass ?? null; + $context['probeMatched'] = $existingContext->probeMatched ?? false; + $context['probePendingBeforeMatch'] = $existingContext->probePendingBeforeMatch ?? false; + } + + WorkflowStub::setContext($context); + } } diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 5aabead1..8aeacd56 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -157,6 +157,10 @@ public static function fromStoredWorkflow(StoredWorkflow $storedWorkflow): stati public static function getContext(): \stdClass { + if (self::$context === null) { + self::$context = new \stdClass(); + } + return self::$context; } @@ -170,6 +174,49 @@ public static function now() return self::getContext()->now; } + public static function isProbing(): bool + { + return (bool) (self::getContext()->probing ?? false); + } + + public static function probeIndex(): ?int + { + return self::getContext()->probeIndex ?? null; + } + + public static function probeClass(): ?string + { + return self::getContext()->probeClass ?? null; + } + + public static function markProbeMatched(): void + { + if (! self::isProbing()) { + return; + } + + self::$context->probeMatched = true; + } + + public static function markProbePendingBeforeMatch(): void + { + if (! self::isProbing() || self::probeMatched()) { + return; + } + + self::$context->probePendingBeforeMatch = true; + } + + public static function probeMatched(): bool + { + return (bool) (self::getContext()->probeMatched ?? false); + } + + public static function probePendingBeforeMatch(): bool + { + return (bool) (self::getContext()->probePendingBeforeMatch ?? false); + } + public function id() { return $this->storedWorkflow->id; @@ -287,7 +334,7 @@ public function fail($exception): void ->format('Y-m-d\TH:i:s.u\Z')); $this->storedWorkflow->parents() - ->each(static function ($parentWorkflow) use ($exception) { + ->each(function ($parentWorkflow) use ($exception) { if ( $parentWorkflow->pivot->parent_index === StoredWorkflow::CONTINUE_PARENT_INDEX || $parentWorkflow->pivot->parent_index === StoredWorkflow::ACTIVE_WORKFLOW_INDEX @@ -324,7 +371,8 @@ public function fail($exception): void $parentWorkflow, $throwable, $parentWf->connection(), - $parentWf->queue() + $parentWf->queue(), + $this->storedWorkflow->class ); }); } diff --git a/tests/Feature/ExceptionLoggingReplayTest.php b/tests/Feature/ExceptionLoggingReplayTest.php new file mode 100644 index 00000000..4c2dcc35 --- /dev/null +++ b/tests/Feature/ExceptionLoggingReplayTest.php @@ -0,0 +1,88 @@ +start(); + + sleep(1); + $workflow->requestRetry(); + + sleep(1); + $workflow->requestRetry(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('success', $workflow->output()); + $this->assertSame([ + Exception::class, + Signal::class, + Exception::class, + Signal::class, + TestProbeRetryActivity::class, + ], $classes); + } + + public function testBackToBackCaughtExceptionsEachPersist(): void + { + $workflow = WorkflowStub::make(TestProbeBackToBackWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught second: second failure', $workflow->output()); + $this->assertSame([Exception::class, Exception::class], $classes); + } + + public function testParallelChildFailuresStillDeduplicateToOneParentException(): void + { + $workflow = WorkflowStub::make(TestProbeChildFailureParentWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught: child failed: child-1', $workflow->output()); + $this->assertSame([ + TestProbeChildFailureParentStepActivity::class, + Exception::class, + TestProbeChildFailureCompensationActivity::class, + ], $classes); + $this->assertSame(1, $workflow->logs()->where('class', Exception::class)->count()); + } +} diff --git a/tests/Fixtures/TestProbeBackToBackWorkflow.php b/tests/Fixtures/TestProbeBackToBackWorkflow.php new file mode 100644 index 00000000..06396d64 --- /dev/null +++ b/tests/Fixtures/TestProbeBackToBackWorkflow.php @@ -0,0 +1,28 @@ +getMessage(); + } + + return 'unexpected-success'; + } +} diff --git a/tests/Fixtures/TestProbeChildFailureCompensationActivity.php b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php new file mode 100644 index 00000000..d2ea9568 --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php @@ -0,0 +1,15 @@ +addCompensation(static fn () => activity(TestProbeChildFailureCompensationActivity::class)); + + yield all([ + child(TestProbeChildFailureWorkflow::class, 'child-1'), + child(TestProbeChildFailureWorkflow::class, 'child-2'), + child(TestProbeChildFailureWorkflow::class, 'child-3'), + ]); + + return 'unexpected-success'; + } catch (Throwable $throwable) { + yield from $this->compensate(); + + return 'caught: ' . $throwable->getMessage(); + } + } +} diff --git a/tests/Fixtures/TestProbeChildFailureWorkflow.php b/tests/Fixtures/TestProbeChildFailureWorkflow.php new file mode 100644 index 00000000..ca36619b --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureWorkflow.php @@ -0,0 +1,16 @@ + throw new RuntimeException('first failure'), + 2 => throw new InvalidArgumentException('second failure'), + default => 'success', + }; + } +} diff --git a/tests/Fixtures/TestProbeRetryWorkflow.php b/tests/Fixtures/TestProbeRetryWorkflow.php new file mode 100644 index 00000000..ec80b0cb --- /dev/null +++ b/tests/Fixtures/TestProbeRetryWorkflow.php @@ -0,0 +1,41 @@ +inbox->receive('retry'); + } + + public function execute() + { + $attempt = 0; + + while (true) { + try { + ++$attempt; + + return yield activity(TestProbeRetryActivity::class, $attempt); + } catch (Throwable $throwable) { + if ($attempt >= 3) { + throw $throwable; + } + + yield await(fn (): bool => $this->inbox->hasUnread()); + + $this->inbox->nextUnread(); + } + } + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index de3b4bf6..37f28192 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -29,17 +29,7 @@ public static function setUpBeforeClass(): void } } - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); @@ -53,6 +43,10 @@ public static function tearDownAfterClass(): void foreach (self::$workers as $worker) { $worker->stop(); } + + self::$workers = []; + + self::flushRedis(); } protected function setUp(): void @@ -67,17 +61,7 @@ protected function setUp(): void Cache::flush(); - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); } protected function defineDatabaseMigrations() @@ -94,4 +78,19 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } + + private static function flushRedis(): void + { + $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); + $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); + if ($redisHost && class_exists(\Redis::class)) { + try { + $redis = new \Redis(); + $redis->connect($redisHost, (int) $redisPort); + $redis->flushDB(); + } catch (\Throwable $e) { + // Ignore if no redis + } + } + } } diff --git a/tests/Unit/ActivityStubTest.php b/tests/Unit/ActivityStubTest.php index af6cee9e..ec970de2 100644 --- a/tests/Unit/ActivityStubTest.php +++ b/tests/Unit/ActivityStubTest.php @@ -7,9 +7,11 @@ use Exception; use RuntimeException; use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestOtherActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\ActivityStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -123,6 +125,136 @@ public function testLoadsStoredExceptionWithNonStandardConstructor(): void }); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('test'), + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + + public function testSkipsMultipleStoredExceptionsForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign-1', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign-2', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 2, + 'now' => WorkflowStub::now(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('test'), + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(3, WorkflowStub::getContext()->index); + } + + public function testDoesNotMarkProbeMatchedForForeignStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestActivity::class, + 'probeMatched' => false, + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testAll(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index f9723562..9de6e8c1 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -4,11 +4,14 @@ namespace Tests\Unit; +use Exception; use Mockery; use Tests\Fixtures\TestChildWorkflow; +use Tests\Fixtures\TestExceptionWorkflow; use Tests\Fixtures\TestParentWorkflow; use Tests\TestCase; use Workflow\ChildWorkflowStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -91,6 +94,207 @@ public function testLoadsChildWorkflow(): void $this->assertNull($result); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestChildWorkflow::class, + 'result' => Serializer::serialize('test'), + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + + public function testSkipsMultipleStoredExceptionsForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child 1', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child 2', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 2, + 'now' => WorkflowStub::now(), + 'class' => TestChildWorkflow::class, + 'result' => Serializer::serialize('test'), + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(3, WorkflowStub::getContext()->index); + } + + public function testMarksProbeMatchedForMatchingStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'matching child failure', + 'code' => 0, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + try { + ChildWorkflowStub::make(TestChildWorkflow::class); + $this->fail('Expected child exception to be thrown.'); + } catch (Exception $exception) { + $this->assertSame('matching child failure', $exception->getMessage()); + } + + $this->assertTrue(WorkflowStub::probeMatched()); + } + + public function testDoesNotMarkProbeMatchedForForeignStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child failure', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredChildWorkflow(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testDoesNotResumeRunningStartedChildWorkflow(): void { $childWorkflow = Mockery::mock(); diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 1f6cf3ca..9df287de 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -4,6 +4,22 @@ namespace Tests\Unit; +use Exception as BaseException; +use Illuminate\Contracts\Queue\Job as JobContract; +use Illuminate\Support\Facades\Cache; +use InvalidArgumentException; +use Mockery; +use ReflectionMethod; +use RuntimeException; +use stdClass; +use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestProbeBackToBackWorkflow; +use Tests\Fixtures\TestProbeChildFailureWorkflow; +use Tests\Fixtures\TestProbeNowSignalWorkflow; +use Tests\Fixtures\TestProbeParallelChildWorkflow; +use Tests\Fixtures\TestProbeRetryActivity; +use Tests\Fixtures\TestSagaActivity; +use Tests\Fixtures\TestSagaParallelActivityWorkflow; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; @@ -11,13 +27,14 @@ use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowRunningStatus; +use Workflow\Workflow; use Workflow\WorkflowStub; final class ExceptionTest extends TestCase { public function testMiddleware(): void { - $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new \Exception( + $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new BaseException( 'Test exception' )); @@ -25,7 +42,9 @@ public function testMiddleware(): void ->values(); $this->assertCount(1, $middleware); - $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); + $this->assertSame(WithoutOverlappingMiddleware::class, $middleware[0]::class); + $this->assertSame(WithoutOverlappingMiddleware::WORKFLOW, $middleware[0]->type); + $this->assertSame(1, $middleware[0]->releaseAfter); $this->assertSame(15, $middleware[0]->expiresAfter); } @@ -38,13 +57,218 @@ public function testExceptionWorkflowRunning(): void 'status' => WorkflowRunningStatus::$name, ]); - $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new \Exception('Test exception')); + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('Test exception')); $exception->handle(); $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); } - public function testSkipsWriteWhenSiblingExceptionLogExists(): void + public function testHandleResumesWorkflowWhenLogAlreadyExists(): void + { + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(true); + $lock->shouldReceive('release') + ->once(); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:123', 15) + ->andReturn($lock); + + $workflow = Mockery::mock(); + $workflow->shouldReceive('resume') + ->once(); + + $storedWorkflow = Mockery::mock(StoredWorkflow::class) + ->makePartial(); + $storedWorkflow->id = 123; + $storedWorkflow->shouldReceive('effectiveConnection') + ->andReturn(null); + $storedWorkflow->shouldReceive('effectiveQueue') + ->andReturn(null); + $storedWorkflow->shouldReceive('toWorkflow') + ->once() + ->andReturn($workflow); + $storedWorkflow->shouldReceive('hasLogByIndex') + ->once() + ->with(0) + ->andReturn(true); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('existing log')); + $exception->handle(); + + $this->assertSame(123, $storedWorkflow->id); + + Mockery::close(); + } + + public function testHandleReleasesWhenExceptionLockUnavailable(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(false); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:' . $storedWorkflow->id, 15) + ->andReturn($lock); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(1); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'locked', + 'code' => 0, + ]); + $exception->setJob($job); + $exception->handle(); + + $this->assertFalse($storedWorkflow->hasLogByIndex(0)); + + Mockery::close(); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassIsInvalid(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = ''; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'invalid workflow class', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + + $this->assertSame('persist', $method->invoke($exception)); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassDoesNotExist(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = 'Tests\\Fixtures\\MissingWorkflowClass'; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'missing workflow class', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + + $this->assertSame('persist', $method->invoke($exception)); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotAWorkflow(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = stdClass::class; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'non workflow class', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + + $this->assertSame('persist', $method->invoke($exception)); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotInstantiable(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = ExceptionTestAbstractWorkflow::class; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'abstract workflow class', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + + $this->assertSame('persist', $method->invoke($exception)); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassAutoloadThrows(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = 'Tests\\Fixtures\\ThrowingAutoloadWorkflow'; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'autoload failure', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $autoload = static function (string $class): void { + if ($class === 'Tests\\Fixtures\\ThrowingAutoloadWorkflow') { + throw new RuntimeException('autoload exploded'); + } + }; + + spl_autoload_register($autoload); + + try { + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + + $this->assertSame('persist', $method->invoke($exception)); + } finally { + spl_autoload_unregister($autoload); + } + } + + public function testProbeReplayUsesCarbonNowBeforeWorkflowHandleResetsContext(): void + { + TestProbeNowSignalWorkflow::$signalSawCarbonNow = false; + + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeNowSignalWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + $storedWorkflow->signals() + ->create([ + 'method' => 'recordNowType', + 'arguments' => Serializer::serialize([]), + ]); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'probe now type', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + $method->invoke($exception); + + $this->assertTrue(TestProbeNowSignalWorkflow::$signalSawCarbonNow); + } + + public function testProbeReplayPersistsWhenNowCannotBeParsed(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); @@ -53,6 +277,27 @@ public function testSkipsWriteWhenSiblingExceptionLogExists(): void 'status' => WorkflowRunningStatus::$name, ]); + $exception = new Exception(0, 'not-a-timestamp', $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'bad now', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + + $this->assertSame('persist', $method->invoke($exception)); + } + + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + $storedWorkflow->logs() ->create([ 'index' => 0, @@ -60,20 +305,128 @@ public function testSkipsWriteWhenSiblingExceptionLogExists(): void ->toDateTimeString(), 'class' => Exception::class, 'result' => Serializer::serialize([ - 'class' => \Exception::class, - 'message' => 'first child failed', + 'class' => BaseException::class, + 'message' => 'child failed: child-1', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ - 'class' => \Exception::class, - 'message' => 'second child failed', + 'class' => BaseException::class, + 'message' => 'child failed: child-2', 'code' => 0, - ]); + ], sourceClass: TestProbeChildFailureWorkflow::class); $exception->handle(); $this->assertFalse($storedWorkflow->hasLogByIndex(1)); $this->assertSame(1, $storedWorkflow->logs()->count()); } + + public function testRetriesWriteWhenProbeMatchesAfterEarlierPendingWork(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(1); + + $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'child failed: child-2', + 'code' => 0, + ], sourceClass: TestProbeChildFailureWorkflow::class); + $exception->setJob($job); + $exception->handle(); + + $this->assertFalse($storedWorkflow->fresh()->hasLogByIndex(1)); + } + + public function testPersistsWriteWhenProbeReachesCandidateException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeBackToBackWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => RuntimeException::class, + 'message' => 'first failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => InvalidArgumentException::class, + 'message' => 'second failure', + 'code' => 0, + ], sourceClass: TestProbeRetryActivity::class); + $exception->handle(); + + $log = $storedWorkflow->fresh() + ->logs() + ->firstWhere('index', 1); + + $this->assertNotNull($log); + $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); + $this->assertSame(TestProbeRetryActivity::class, Serializer::unserialize($log->result)['sourceClass']); + } + + public function testSkipsWriteWhenProbeReachesDifferentActivityClassAtSameIndex(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestSagaParallelActivityWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('step complete'), + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => RuntimeException::class, + 'message' => 'parallel failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(2, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => RuntimeException::class, + 'message' => 'another parallel failure', + 'code' => 0, + ], sourceClass: TestSagaActivity::class); + $exception->handle(); + + $this->assertFalse($storedWorkflow->fresh()->hasLogByIndex(2)); + } +} + +abstract class ExceptionTestAbstractWorkflow extends Workflow +{ } diff --git a/tests/Unit/Traits/AwaitsTest.php b/tests/Unit/Traits/AwaitsTest.php index 12b2a8bd..400d90af 100644 --- a/tests/Unit/Traits/AwaitsTest.php +++ b/tests/Unit/Traits/AwaitsTest.php @@ -102,6 +102,37 @@ public function testResolvesConflictingResult(): void $this->assertFalse(Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $conditionEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::await(static function () use (&$conditionEvaluated): bool { + $conditionEvaluated = true; + + return true; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($conditionEvaluated); + $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/SideEffectsTest.php b/tests/Unit/Traits/SideEffectsTest.php index 9e121000..db11844c 100644 --- a/tests/Unit/Traits/SideEffectsTest.php +++ b/tests/Unit/Traits/SideEffectsTest.php @@ -88,6 +88,37 @@ public function testResolvesConflictingResult(): void $this->assertSame('test', Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $callableEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::sideEffect(static function () use (&$callableEvaluated): string { + $callableEvaluated = true; + + return 'test'; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($callableEvaluated); + $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/TimersTest.php b/tests/Unit/Traits/TimersTest.php index 0e5ead92..947914a6 100644 --- a/tests/Unit/Traits/TimersTest.php +++ b/tests/Unit/Traits/TimersTest.php @@ -242,6 +242,74 @@ public function testTimerReturnsUnresolvedPromiseWhenReplayingAndNoTimer(): void ]); } + public function testTimerReturnsUnresolvedPromiseWhenProbingAndNoTimer(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::timer('1 minute') + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertDatabaseMissing('workflow_timers', [ + 'stored_workflow_id' => $workflow->id(), + 'index' => 0, + ]); + } + + public function testTimerMarksProbePendingBeforeMatchWhenStoredTimerHasNotElapsed(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->timers() + ->create([ + 'index' => 0, + 'stop_at' => now() + ->addMinute(), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::timer('1 minute') + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + } + public function testTimerCapsDelayForSqsDriver(): void { Bus::fake(); diff --git a/tests/Unit/Traits/VersionsTest.php b/tests/Unit/Traits/VersionsTest.php index 1fe909f9..f3174456 100644 --- a/tests/Unit/Traits/VersionsTest.php +++ b/tests/Unit/Traits/VersionsTest.php @@ -191,6 +191,31 @@ public function testResolvesConflictingResultThrowsWhenVersionNotSupported(): vo Mockery::close(); } + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredVersion(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::getVersion('test-change', WorkflowStub::DEFAULT_VERSION, 1) + ->then(static function ($value) use (&$result): void { + $result = $value; + }); + + $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/WorkflowFakerTest.php b/tests/Unit/WorkflowFakerTest.php index bad1e792..03fe51ba 100644 --- a/tests/Unit/WorkflowFakerTest.php +++ b/tests/Unit/WorkflowFakerTest.php @@ -71,6 +71,25 @@ public function testParentWorkflow(): void $this->assertSame($workflow->output(), 'workflow_activity_other_activity'); } + public function testParentWorkflowWithCallableChildMock(): void + { + WorkflowStub::fake(); + + WorkflowStub::mock(TestActivity::class, 'activity'); + + WorkflowStub::mock(TestChildWorkflow::class, static function ($context) { + return 'other_activity'; + }); + + $workflow = WorkflowStub::make(TestParentWorkflow::class); + $workflow->start(); + + WorkflowStub::assertDispatchedTimes(TestActivity::class, 1); + WorkflowStub::assertDispatchedTimes(TestChildWorkflow::class, 1); + + $this->assertSame($workflow->output(), 'workflow_activity_other_activity'); + } + public function testConcurrentWorkflow(): void { WorkflowStub::fake(); diff --git a/tests/Unit/WorkflowStubTest.php b/tests/Unit/WorkflowStubTest.php index cf6b7e60..03290159 100644 --- a/tests/Unit/WorkflowStubTest.php +++ b/tests/Unit/WorkflowStubTest.php @@ -8,6 +8,8 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Queue; +use ReflectionProperty; +use stdClass; use Tests\Fixtures\TestAwaitWorkflow; use Tests\Fixtures\TestBadConnectionWorkflow; use Tests\Fixtures\TestChatBotWorkflow; @@ -231,6 +233,48 @@ public function testConnection(): void $this->assertSame('default', WorkflowStub::queue()); } + public function testProbeHelpers(): void + { + $contextProperty = new ReflectionProperty(WorkflowStub::class, 'context'); + $contextProperty->setAccessible(true); + $previousContext = $contextProperty->getValue(); + $contextProperty->setValue(null); + + try { + $this->assertInstanceOf(stdClass::class, WorkflowStub::getContext()); + $this->assertFalse(WorkflowStub::isProbing()); + $this->assertNull(WorkflowStub::probeIndex()); + $this->assertNull(WorkflowStub::probeClass()); + $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertFalse(WorkflowStub::probePendingBeforeMatch()); + + WorkflowStub::markProbeMatched(); + WorkflowStub::markProbePendingBeforeMatch(); + + $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertFalse(WorkflowStub::probePendingBeforeMatch()); + + WorkflowStub::setContext([ + 'probing' => true, + 'probeIndex' => 7, + 'probeClass' => TestWorkflow::class, + 'probeMatched' => false, + 'probePendingBeforeMatch' => false, + ]); + + WorkflowStub::markProbePendingBeforeMatch(); + WorkflowStub::markProbeMatched(); + + $this->assertTrue(WorkflowStub::isProbing()); + $this->assertSame(7, WorkflowStub::probeIndex()); + $this->assertSame(TestWorkflow::class, WorkflowStub::probeClass()); + $this->assertTrue(WorkflowStub::probeMatched()); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + } finally { + $contextProperty->setValue($previousContext); + } + } + public function testHandlesDuplicateLogInsertionProperly(): void { Queue::fake(); diff --git a/tests/Unit/WorkflowTest.php b/tests/Unit/WorkflowTest.php index 96b2e247..144e23de 100644 --- a/tests/Unit/WorkflowTest.php +++ b/tests/Unit/WorkflowTest.php @@ -9,6 +9,7 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Event; use Mockery; +use ReflectionMethod; use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestChildWorkflow; use Tests\Fixtures\TestContinueAsNewWorkflow; @@ -179,6 +180,43 @@ public function testExceptionAlreadyLogged(): void $this->assertSame(1, $storedWorkflow->logs()->count()); } + public function testSetContextPreservesProbePendingBeforeMatch(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->arguments = Serializer::serialize([]); + $storedWorkflow->save(); + + $workflow = new Workflow($storedWorkflow); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 3, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 7, + 'probeClass' => TestActivity::class, + 'probeMatched' => true, + 'probePendingBeforeMatch' => true, + ]); + + $method = new ReflectionMethod(Workflow::class, 'setContext'); + $method->setAccessible(true); + $method->invoke($workflow, [ + 'storedWorkflow' => $storedWorkflow, + 'index' => 4, + 'now' => now(), + 'replaying' => true, + ]); + + $this->assertTrue(WorkflowStub::isProbing()); + $this->assertSame(7, WorkflowStub::probeIndex()); + $this->assertSame(TestActivity::class, WorkflowStub::probeClass()); + $this->assertTrue(WorkflowStub::probeMatched()); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + } + public function testParent(): void { Carbon::setTestNow('2022-01-01');