Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fae0c4d
Probe replay before persisting exception logs
durable-workflow-ops Apr 3, 2026
5f61620
Fix ECS issues in probe fixtures
durable-workflow-ops Apr 3, 2026
bfb3a65
Import Deferred in probe code
durable-workflow-ops Apr 3, 2026
b36bd34
Inline Deferred promise returns
durable-workflow-ops Apr 3, 2026
7393ea5
Remove redundant context resets
durable-workflow-ops Apr 3, 2026
f3a5175
Restore parent context in workflow stubs
durable-workflow-ops Apr 3, 2026
1ea7c85
Tag activity exceptions for probe replay
durable-workflow-ops Apr 3, 2026
d8d4219
Skip stale foreign exception logs during replay
durable-workflow-ops Apr 3, 2026
3456f71
Add unit coverage for probe replay branches
durable-workflow-ops Apr 3, 2026
d0ca14d
Stream feature worker output in CI
durable-workflow-ops Apr 3, 2026
39e6839
Capture CI worker output to files
durable-workflow-ops Apr 3, 2026
0ad07ad
Revert CI worker capture instrumentation
durable-workflow-ops Apr 3, 2026
ee0b47a
Use synthetic logs for probe replay
durable-workflow-ops Apr 3, 2026
ef06874
Flush Redis after feature workers stop
durable-workflow-ops Apr 3, 2026
1bea82f
Remove overlap middleware from exception jobs
durable-workflow-ops Apr 3, 2026
b6648fc
Merge remote-tracking branch 'origin/master' into codex/probing-tenta…
durable-workflow-ops Apr 4, 2026
da8240c
Update exception middleware expectation
durable-workflow-ops Apr 4, 2026
8887e29
Restore workflow overlap guard for exception jobs
durable-workflow-ops Apr 4, 2026
26beade
Guard probe replay against invalid workflow classes
durable-workflow-ops Apr 4, 2026
a3b85b8
Tighten probe matching and exception retries
durable-workflow-ops Apr 4, 2026
98398c8
Retry probe matches behind pending work
durable-workflow-ops Apr 4, 2026
9bcd713
Preserve probe pending state across replay
durable-workflow-ops Apr 5, 2026
f576cc4
Flatten foreign exception skips in stubs
durable-workflow-ops Apr 5, 2026
e5082f9
Normalize probe replay now context
durable-workflow-ops Apr 5, 2026
297d06a
Harden exception retry backoff and probe parsing
durable-workflow-ops Apr 5, 2026
26d287e
Trim probe replay overhead
durable-workflow-ops Apr 5, 2026
8625db8
Fix fake callable replay results
durable-workflow-ops Apr 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Activity.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void
$this->storedWorkflow,
$throwable,
$workflow->connection(),
$workflow->queue()
$workflow->queue(),
$this::class
);
}

Expand Down
82 changes: 65 additions & 17 deletions src/ActivityStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,66 @@ public static function async(callable $callback): PromiseInterface
public static function make($activity, ...$arguments): PromiseInterface
{
$context = WorkflowStub::getContext();
$result = null;

$log = $context->storedWorkflow->findLogByIndex($context->index);
while (true) {
$log = $context->storedWorkflow->findLogByIndex($context->index);
$result = null;

if (WorkflowStub::faked()) {
$mocks = WorkflowStub::mocks();
if (WorkflowStub::faked()) {
$mocks = WorkflowStub::mocks();

if (! $log && array_key_exists($activity, $mocks)) {
$result = $mocks[$activity];
if (! $log && array_key_exists($activity, $mocks)) {
$mockedResult = $mocks[$activity];

$log = $context->storedWorkflow->createLog([
'index' => $context->index,
'now' => $context->now,
'class' => $activity,
'result' => Serializer::serialize(
is_callable($result) ? $result($context, ...$arguments) : $result
),
]);
$log = $context->storedWorkflow->createLog([
'index' => $context->index,
'now' => $context->now,
'class' => $activity,
'result' => Serializer::serialize(
is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult
),
]);

WorkflowStub::recordDispatched($activity, $arguments);
WorkflowStub::recordDispatched($activity, $arguments);
}
}

if (! $log) {
break;
}

if ($log->class !== Exception::class) {
break;
}

$result = Serializer::unserialize($log->result);

if (! self::isForeignExceptionResult($result, $activity)) {
break;
}

++$context->index;
WorkflowStub::setContext($context);
}

if ($log) {
$result ??= Serializer::unserialize($log->result);

if (
WorkflowStub::isProbing()
&& WorkflowStub::probeIndex() === $context->index
&& (
WorkflowStub::probeClass() === null
|| WorkflowStub::probeClass() === $activity
)
&& $log->class === Exception::class
) {
WorkflowStub::markProbeMatched();
}

++$context->index;
WorkflowStub::setContext($context);
$result = Serializer::unserialize($log->result);
if (
is_array($result) &&
array_key_exists('class', $result) &&
Expand All @@ -74,11 +108,25 @@ public static function make($activity, ...$arguments): PromiseInterface
return resolve($result);
}

if (WorkflowStub::isProbing()) {
WorkflowStub::markProbePendingBeforeMatch();
++$context->index;
WorkflowStub::setContext($context);
return (new Deferred())->promise();
}

$activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments);

++$context->index;
WorkflowStub::setContext($context);
$deferred = new Deferred();
return $deferred->promise();
return (new Deferred())->promise();
}

private static function isForeignExceptionResult(mixed $result, string $activity): bool
{
return is_array($result)
&& isset($result['sourceClass'])
&& is_string($result['sourceClass'])
&& $result['sourceClass'] !== $activity;
}
}
82 changes: 65 additions & 17 deletions src/ChildWorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,66 @@ public static function all(iterable $promises): PromiseInterface
public static function make($workflow, ...$arguments): PromiseInterface
{
$context = WorkflowStub::getContext();
$result = null;

$log = $context->storedWorkflow->findLogByIndex($context->index);
while (true) {
$log = $context->storedWorkflow->findLogByIndex($context->index);
$result = null;

if (WorkflowStub::faked()) {
$mocks = WorkflowStub::mocks();
if (WorkflowStub::faked()) {
$mocks = WorkflowStub::mocks();

if (! $log && array_key_exists($workflow, $mocks)) {
$result = $mocks[$workflow];
if (! $log && array_key_exists($workflow, $mocks)) {
$mockedResult = $mocks[$workflow];

$log = $context->storedWorkflow->createLog([
'index' => $context->index,
'now' => $context->now,
'class' => $workflow,
'result' => Serializer::serialize(
is_callable($result) ? $result($context, ...$arguments) : $result
),
]);
$log = $context->storedWorkflow->createLog([
'index' => $context->index,
'now' => $context->now,
'class' => $workflow,
'result' => Serializer::serialize(
is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult
),
]);

WorkflowStub::recordDispatched($workflow, $arguments);
WorkflowStub::recordDispatched($workflow, $arguments);
}
}

if (! $log) {
break;
}

if ($log->class !== Exception::class) {
break;
}

$result = Serializer::unserialize($log->result);

if (! self::isForeignExceptionResult($result, $workflow)) {
break;
}

++$context->index;
WorkflowStub::setContext($context);
}

if ($log) {
$result ??= Serializer::unserialize($log->result);

if (
WorkflowStub::isProbing()
&& WorkflowStub::probeIndex() === $context->index
&& (
WorkflowStub::probeClass() === null
|| WorkflowStub::probeClass() === $workflow
)
&& $log->class === Exception::class
) {
WorkflowStub::markProbeMatched();
}

++$context->index;
WorkflowStub::setContext($context);
$result = Serializer::unserialize($log->result);
if (
is_array($result)
&& array_key_exists('class', $result)
Expand All @@ -67,6 +101,13 @@ public static function make($workflow, ...$arguments): PromiseInterface
return resolve($result);
}

if (WorkflowStub::isProbing()) {
WorkflowStub::markProbePendingBeforeMatch();
++$context->index;
WorkflowStub::setContext($context);
return (new Deferred())->promise();
}

if (! $context->replaying) {
$storedChildWorkflow = $context->storedWorkflow->children()
->wherePivot('parent_index', $context->index)
Expand Down Expand Up @@ -94,7 +135,14 @@ public static function make($workflow, ...$arguments): PromiseInterface

++$context->index;
WorkflowStub::setContext($context);
$deferred = new Deferred();
return $deferred->promise();
return (new Deferred())->promise();
}

private static function isForeignExceptionResult(mixed $result, string $workflow): bool
{
return is_array($result)
&& isset($result['sourceClass'])
&& is_string($result['sourceClass'])
&& $result['sourceClass'] !== $workflow;
}
}
Loading
Loading