Add callback support for standalone activities#9786
Add callback support for standalone activities#9786fretz12 wants to merge 20 commits intotemporalio:mainfrom
Conversation
bergundy
left a comment
There was a problem hiding this comment.
The logic you've added LGTM overall, here are the gaps:
- Missing validation in the frontend for the callbacks.
- Missing callback info in the describe response.
- Missing returning links in the StartActivityExecution response.
fc5ff9e to
7da34b7
Compare
There was a problem hiding this comment.
Pull request overview
Adds Nexus completion-callback support to standalone activities, reusing the existing callbacks library/validation used by workflows, and renames the CHASM callback limit dynamic config to apply to both workflows (CHASM path) and standalone activities.
Changes:
- Attach/validate
CompletionCallbacksonStartActivityExecutionfor standalone activities and trigger delivery on terminal activity close. - Centralize callback validation in
components/callbacks.ValidateCallbacksand reuse it for workflow start validation. - Rename dynamic config from
MaxCHASMCallbacksPerWorkflowtoMaxCallbacksPerExecution, and extend functional tests for callbacks + response link fields.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/standalone_activity_test.go | Adds functional coverage for callback acceptance, describe visibility, and delivery on activity terminal states. |
| service/history/workflow/mutable_state_impl.go | Switches CHASM workflow callback cap to MaxCallbacksPerExecution. |
| service/history/configs/config.go | Wires new MaxCallbacksPerExecution config getter into history config. |
| service/frontend/workflow_handler.go | Replaces local workflow callback validation with shared callbacks.ValidateCallbacks. |
| components/callbacks/config.go | Introduces shared ValidateCallbacks and internalizes URL allowlist validation helpers. |
| components/callbacks/config_test.go | Adds unit tests for ValidateCallbacks; updates allowlist tests to new internal method name. |
| common/dynamicconfig/constants.go | Renames/defines MaxCallbacksPerExecution dynamic config setting and description. |
| chasm/lib/activity/frontend.go | Validates/normalizes callbacks on StartActivityExecution requests for standalone activities. |
| chasm/lib/activity/config.go | Adds callback-related dynamic config accessors to standalone activity config. |
| chasm/lib/activity/handler.go | Persists callbacks on activity start and returns an Activity link in StartActivityExecutionResponse. |
| chasm/lib/activity/activity.go | Stores callbacks on the activity, triggers them on close, and surfaces callback info in DescribeActivityExecution. |
| go.mod / go.sum | Bumps go.temporal.io/api dependency to a newer dev revision. |
| cmd/tools/getproto/files.go | Removes a stray leading line from generated file header. |
| } | ||
|
|
||
| if cbs := req.GetCompletionCallbacks(); len(cbs) > 0 { | ||
| if err := callbacks.ValidateCallbacks( |
There was a problem hiding this comment.
I'd prefer to force all errors in callbacks.ValidateCallbacks to return serviceerrors, but didn't want to break anything existing. Open to advice here
There was a problem hiding this comment.
I'd be fine if you switched to service errors. That's very common in the codebase. It's technically redundant because it adds a header with a serialized proto that most SDKs don't care about (only Go SDK does).
| cbInfos = append(cbInfos, &workflowpb.CallbackInfo{ | ||
| Callback: cbSpec, | ||
| // WorkflowClosed is the only trigger variant in the proto. | ||
| Trigger: &workflowpb.CallbackInfo_Trigger{Variant: &workflowpb.CallbackInfo_Trigger_WorkflowClosed{}}, |
There was a problem hiding this comment.
What about adding a separate trigger variant for standalone activities? I can see us not wanting to move CallbackInfo from workflowpb because of backwards compatibility, but this feels like something we could introduce more easily now than later. If this has already been discussed and decided, that's fine too.
There was a problem hiding this comment.
Yes! That was caught in the API PR. It's now activity specific too
dandavison
left a comment
There was a problem hiding this comment.
Looks great, I did a pass and made some suggestions.
My main concern is it's not clear to me that we should release this using the workflow CallbackInfo. We'll want to straighten that out in the future, but that would be a backwards incompatible API change. So it seems to me that we need to straighten that out now.
| return srv.URL | ||
| } | ||
|
|
||
| func (s *standaloneActivityTestSuite) TestCallbacks() { |
There was a problem hiding this comment.
As I suggested above, would be good to add coverage of the timeout cases since they can cause attempt.CompleteTime not to be set.
There was a problem hiding this comment.
Yes, added tests for all terminal cases
| return serviceerror.NewInvalidArgumentf("unsupported callback variant: %T", variant) | ||
| } | ||
|
|
||
| id := fmt.Sprintf("%s-%d", requestID, idx) |
There was a problem hiding this comment.
Can we add a comment explaining what desirable properties follow from this ID-naming scheme (and add the same comment to chasm/lb/workflow/workflow.go.
Can someone confirm that the ID naming scheme used by HSM callbacks, which was designed to address a replication concern that I haven't fully understood yet, is not required by CHASM workflow/SAA callbacks?
There was a problem hiding this comment.
Added a comment, please double check accuracy folks
There was a problem hiding this comment.
No need to mention HSM here IMHO.
| opts.Error = opErr | ||
| } | ||
|
|
||
| return opts, nil |
There was a problem hiding this comment.
It should either be success or failure, so we don't expect to get here. WDYT about:
| opts.Error = opErr | |
| } | |
| return opts, nil | |
| opts.Error = opErr | |
| return opts, nil | |
| } | |
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternalf("activity in status %v has no outcome", a.Status) |
| if details := attempt.GetLastFailureDetails(); details != nil { | ||
| failure = details.GetFailure() | ||
| } | ||
| } |
There was a problem hiding this comment.
We do this check-in-two-places-for-a-failure logic in another place. I worry that something will forget to do it in the future. Adding a documented helper would reduce that risk. WDYT about introducing this helper:
diff
diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go
index 43f583674..8cf5a583a 100644
--- a/chasm/lib/activity/activity.go
+++ b/chasm/lib/activity/activity.go
@@ -362,16 +362,7 @@ func (a *Activity) GetNexusCompletion(ctx chasm.Context, _ string) (nexusrpc.Com
return opts, nil
}
- var failure *failurepb.Failure
- if f := outcome.GetFailed(); f != nil {
- failure = f.GetFailure()
- }
- if failure == nil {
- if details := attempt.GetLastFailureDetails(); details != nil {
- failure = details.GetFailure()
- }
- }
-
+ failure := a.terminalFailure(ctx)
if failure != nil {
state := nexus.OperationStateFailed
message := "operation failed"
@@ -947,15 +938,23 @@ func (a *Activity) outcome(ctx chasm.Context) *apiactivitypb.ActivityExecutionOu
Value: &apiactivitypb.ActivityExecutionOutcome_Result{Result: successful.GetOutput()},
}
}
- if failure := activityOutcome.GetFailed().GetFailure(); failure != nil {
+ if failure := a.terminalFailure(ctx); failure != nil {
return &apiactivitypb.ActivityExecutionOutcome{
Value: &apiactivitypb.ActivityExecutionOutcome_Failure{Failure: failure},
}
}
+ return nil
+}
+
+// terminalFailure returns the failure for a closed activity. The failure may be stored in
+// Outcome.Failed (terminated, canceled, timed out) or in LastAttempt.LastFailureDetails
+// (failed after exhausting retries). Returns nil if no failure is found.
+func (a *Activity) terminalFailure(ctx chasm.Context) *failurepb.Failure {
+ if f := a.Outcome.Get(ctx).GetFailed(); f != nil {
+ return f.GetFailure()
+ }
if details := a.LastAttempt.Get(ctx).GetLastFailureDetails(); details != nil {
- return &apiactivitypb.ActivityExecutionOutcome{
- Value: &apiactivitypb.ActivityExecutionOutcome_Failure{Failure: details.GetFailure()},
- }
+ return details.GetFailure()
}
return nil
}| } | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
This is an identical copy of the function used for workflow CHASM callbacks: https://github.com/temporalio/temporal/blob/main/chasm/lib/workflow/workflow.go#L56
That suggests that it should be moved into chasm/lib/callback.
| attempt := a.LastAttempt.Get(ctx) | ||
| opts := nexusrpc.CompleteOperationOptions{ | ||
| StartTime: attempt.GetStartedTime().AsTime(), | ||
| CloseTime: attempt.GetCompleteTime().AsTime(), |
There was a problem hiding this comment.
There's a little bug here. This isn't set on schedule-to-start and schedule-to-close timeouts, nor when terminated in SCHEDULED state. I believe the fix is
| CloseTime: attempt.GetCompleteTime().AsTime(), | |
| CloseTime: ctx.ExecutionInfo().CloseTime, |
Failing test case
t.Run("ScheduleToStartTimeoutWithCallbacks", func(t *testing.T) {
activityID := testcore.RandomizeStr(t.Name())
taskQueue := testcore.RandomizeStr(t.Name())
ch := &completionHandler{
requestCh: make(chan *nexusrpc.CompletionRequest, 1),
requestCompleteCh: make(chan error, 1),
}
defer func() {
close(ch.requestCh)
close(ch.requestCompleteCh)
}()
callbackAddress := s.runNexusCompletionHTTPServer(t, ch)
_, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
Namespace: s.Namespace().String(),
ActivityId: activityID,
ActivityType: s.tv.ActivityType(),
Identity: s.tv.WorkerIdentity(),
Input: defaultInput,
TaskQueue: &taskqueuepb.TaskQueue{
Name: taskQueue,
},
StartToCloseTimeout: durationpb.New(1 * time.Minute),
ScheduleToStartTimeout: durationpb.New(1 * time.Second),
RequestId: s.tv.Any().String(),
CompletionCallbacks: []*commonpb.Callback{{
Variant: &commonpb.Callback_Nexus_{Nexus: &commonpb.Callback_Nexus{Url: callbackAddress}},
}},
})
require.NoError(t, err)
// No worker polls -- activity will time out waiting to be started.
// Verify the callback is delivered with failure state and non-zero CloseTime.
select {
case completion := <-ch.requestCh:
require.Equal(t, nexus.OperationStateFailed, completion.State)
var failureErr *nexus.FailureError
require.ErrorAs(t, completion.Error.Cause, &failureErr)
require.Contains(t, failureErr.Failure.Message, "ScheduleToStart")
// StartTime may be zero (activity was never started), but CloseTime must not be.
require.False(t, completion.CloseTime.IsZero(), "CloseTime must not be zero for a timed-out activity")
ch.requestCompleteCh <- nil
case <-ctx.Done():
require.Fail(t, "timed out waiting for completion callback")
}
descResp, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{
Namespace: s.Namespace().String(),
ActivityId: activityID,
})
require.NoError(t, err)
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, descResp.GetInfo().GetStatus())
})There was a problem hiding this comment.
Good catch. Changed. Also added extra tests for all terminal states
bergundy
left a comment
There was a problem hiding this comment.
You're missing the ability to attach callbacks to a running activity when use existing is specified. You'll want to express that with on_conflict_options on the StartActivityExecutionRequest.
| // Implements callback.CompletionSource. | ||
| func (a *Activity) GetNexusCompletion(ctx chasm.Context, _ string) (nexusrpc.CompleteOperationOptions, error) { | ||
| if !a.LifecycleState(ctx).IsClosed() { | ||
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewFailedPrecondition("activity has not completed yet") |
There was a problem hiding this comment.
This is a bug if it happens.
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewFailedPrecondition("activity has not completed yet") | |
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternal("activity has not completed yet") |
| return serviceerror.NewInvalidArgumentf("unsupported callback variant: %T", variant) | ||
| } | ||
|
|
||
| id := fmt.Sprintf("%s-%d", requestID, idx) |
There was a problem hiding this comment.
No need to mention HSM here IMHO.
| opts := nexusrpc.CompleteOperationOptions{ | ||
| CloseTime: ctx.ExecutionInfo().CloseTime, | ||
| } | ||
| if startedTime := attempt.GetStartedTime(); startedTime != nil { |
There was a problem hiding this comment.
This shouldn't be attempt started time IMHO, it should be the schedule time for the activity.
|
|
||
| // ValidateCallbacks validates completion callbacks: count, URL length, endpoint allowlist, header size, and normalizes | ||
| // header keys to lowercase. | ||
| func ValidateCallbacks( |
There was a problem hiding this comment.
Don't add more code to the HSM implementation. All new code should go in chasm/lib.
There was a problem hiding this comment.
it's been refactored
| } | ||
|
|
||
| if cbs := req.GetCompletionCallbacks(); len(cbs) > 0 { | ||
| if err := callbacks.ValidateCallbacks( |
There was a problem hiding this comment.
I'd be fine if you switched to service errors. That's very common in the codebase. It's technically redundant because it adds a header with a serialized proto that most SDKs don't care about (only Go SDK does).
| var failureErr *nexus.FailureError | ||
| require.ErrorAs(t, completion.Error.Cause, &failureErr) | ||
| require.Equal(t, defaultFailure.Message, failureErr.Failure.Message) |
There was a problem hiding this comment.
Use NexusFailureToTemporalFailure and use the SDK's default failure converter to perform assertions on the errors.
| require.Equal(t, nexus.OperationStateFailed, completion.State) | ||
| require.False(t, completion.StartTime.IsZero()) | ||
| require.False(t, completion.CloseTime.IsZero()) | ||
| var failureErr *nexus.FailureError |
| require.Equal(t, nexus.OperationStateCanceled, completion.State) | ||
| require.False(t, completion.StartTime.IsZero()) | ||
| require.False(t, completion.CloseTime.IsZero()) | ||
| var failureErr *nexus.FailureError |
There was a problem hiding this comment.
Same here, this should be a canceled error from the Go SDK.
| case completion := <-ch.requestCh: | ||
| require.Equal(t, nexus.OperationStateFailed, completion.State) | ||
| var failureErr *nexus.FailureError | ||
| require.ErrorAs(t, completion.Error.Cause, &failureErr) |
There was a problem hiding this comment.
Same here, use Temporal errors for the comparison.
| require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, descResp.GetInfo().GetStatus()) | ||
| }) | ||
|
|
||
| t.Run("ScheduleToCloseTimeoutWithCallbacks", func(t *testing.T) { |
There was a problem hiding this comment.
No need to check all of the different timeout types, it's all the same outcome.
There was a problem hiding this comment.
ok, kept schedule-to-start and added a comment
| } | ||
|
|
||
| if cbs := request.GetCompletionCallbacks(); len(cbs) > 0 { | ||
| maxCallbacks := h.config.MaxCHASMCallbacksPerExecution(request.GetNamespace()) |
There was a problem hiding this comment.
This uses MaxCHASMCallbacksPerExecution (default 2000) while the frontend Validator uses MaxCallbacksPerWorkflow (default 32). Should standalone activities use their own CHASM-specific limit, or share the same limit as workflows?
There was a problem hiding this comment.
Alternatively, we could remove the count check from the Validator entirely since it's a per-component concern that depends on currentCount.
I can use some advice.
There was a problem hiding this comment.
It should be the same limit across all archtypes. We have to validate in the CHASM transaction because we don't know how many callbacks are already attached to the execution. The 32 limit is for the HSM implementation that is already deprecated.
bergundy
left a comment
There was a problem hiding this comment.
Still don't see on-conflict-policy and use_existing implemented.
|
|
||
| func validatorProvider(dc *dynamicconfig.Collection) *Validator { | ||
| return NewValidator( | ||
| dynamicconfig.MaxCHASMCallbacksPerWorkflow.Get(dc), |
There was a problem hiding this comment.
This DC still says "per workflow"
There was a problem hiding this comment.
It now uses callback.maxPerExecution in chasm/lib/callback/config.go
There was a problem hiding this comment.
It now uses callback.maxPerExecution in chasm/lib/callback/config.go
| "system.maxCallbacksPerExecution", | ||
| // NOTE (seankane): MaxCHASMCallbacksPerWorkflow is temporary, this will be removed and replaced with MaxCallbacksPerWorkflow | ||
| // once CHASM is fully enabled | ||
| MaxCHASMCallbacksPerWorkflow = NewNamespaceIntSetting( |
There was a problem hiding this comment.
Noted above, the name execution was correct. This is not just for workflows.
There was a problem hiding this comment.
You can move this into chasm/lib/callback/config.go AFAIC and rename to callback.maxPerExecution
There was a problem hiding this comment.
refactored into chasm/lib/callback/config.go and renamed to callback.maxPerExecution
| } | ||
|
|
||
| if cbs := request.GetCompletionCallbacks(); len(cbs) > 0 { | ||
| maxCallbacks := h.config.MaxCHASMCallbacksPerExecution(request.GetNamespace()) |
There was a problem hiding this comment.
It should be the same limit across all archtypes. We have to validate in the CHASM transaction because we don't know how many callbacks are already attached to the execution. The 32 limit is for the HSM implementation that is already deprecated.
| // chasm/lib/callback/fx.go and read directly from callback.AllowedAddresses. | ||
| func callbackValidatorProvider(dc *dynamicconfig.Collection) *callback.Validator { | ||
| return callback.NewValidator( | ||
| callback.MaxPerExecution.Get(dc), |
There was a problem hiding this comment.
Previous frontend cb validation basically used the hsm config MaxCallbacksPerWorkflow. want to be extra sure changing this from a default of 32 -> 2000 won't break anything
FYI @bergundy
There was a problem hiding this comment.
I think it's fine since the history handler should be validating the total number of attached callbacks anyways. This is just an initial sanity validation step.
There was a problem hiding this comment.
But please confirm in the workflow implementation please.
| return callback.ScheduleStandbyCallbacks(ctx, a.Callbacks) | ||
| } | ||
|
|
||
| type attachCallbacksRequest struct { |
There was a problem hiding this comment.
I don't think you need this request struct, you can pass in the arguments separately. The single struct is good for if you want to call UpdateComponent with a method directly.
| cbs := frontendReq.GetCompletionCallbacks() | ||
| if !result.Created && frontendReq.GetOnConflictOptions().GetAttachCompletionCallbacks() && len(cbs) > 0 { | ||
| ref := chasm.NewComponentRef[*Activity](result.ExecutionKey) | ||
| _, _, err := chasm.UpdateComponent( |
There was a problem hiding this comment.
You shouldn't start a second transaction here. Use chasm.UpdateWithStartExecution instead of StartExecution above.
There was a problem hiding this comment.
I tried switching to UpdateWithStartExecution but it breaks for the FAIL conflict policy case. The engine's UpdateWithStartExecution always calls updateFn when the execution already exists — it doesn't check BusinessIDConflictPolicy. So with FAIL policy, instead of getting ExecutionAlreadyStartedError, the updateFn runs successfully and returns Created=false with no error.
To make this work, the engine would need to either respect BusinessIDConflictPolicyFail in the updateFn. Added a TODO for now, we can follow up in separate PR for that
| // chasm/lib/callback/fx.go and read directly from callback.AllowedAddresses. | ||
| func callbackValidatorProvider(dc *dynamicconfig.Collection) *callback.Validator { | ||
| return callback.NewValidator( | ||
| callback.MaxPerExecution.Get(dc), |
There was a problem hiding this comment.
I think it's fine since the history handler should be validating the total number of attached callbacks anyways. This is just an initial sanity validation step.
| // chasm/lib/callback/fx.go and read directly from callback.AllowedAddresses. | ||
| func callbackValidatorProvider(dc *dynamicconfig.Collection) *callback.Validator { | ||
| return callback.NewValidator( | ||
| callback.MaxPerExecution.Get(dc), |
There was a problem hiding this comment.
But please confirm in the workflow implementation please.
| {Variant: &commonpb.Callback_Nexus_{Nexus: &commonpb.Callback_Nexus{Url: "http://localhost/use-existing-cb"}}}, | ||
| }, | ||
| OnConflictOptions: &commonpb.OnConflictOptions{ | ||
| AttachCompletionCallbacks: true, |
There was a problem hiding this comment.
You should validate that all 3 attach options are set at once.
There was a problem hiding this comment.
It's what we've done for workflow AFAIR. It's a bit silly but we should keep consistent with workflows.
| require.False(t, resp.GetStarted()) | ||
| }) | ||
|
|
||
| t.Run("AttachesCallbacksToExistingActivity", func(t *testing.T) { |
There was a problem hiding this comment.
Missing coverage for attaching to a new activity, existing activity and that the second start with on conflict options is idempotent.
What changed?
Added completion callback support to standalone activities:
Why?
The v2 scheduler needs to start standalone activities on a schedule and be notified when they complete, so it can track action results, handle overlap policies, and support pause-on-failure. Workflows already have this via CompletionCallbacks + Nexus callback delivery. This PR gives standalone activities the same capability, reusing the existing callback library rather than reimplementing the
delivery/retry/backoff logic. This is a prerequisite for the scheduler's Invoker to call StartActivityExecution with a callback pointing back to the Scheduler component.
How did you test it?