Dispatch activity cancellation to worker using Nexus#9233
Dispatch activity cancellation to worker using Nexus#9233
Conversation
a91654f to
6a655f7
Compare
d402868 to
54b6d1a
Compare
13d8513 to
eb79c81
Compare
d103589 to
417606c
Compare
eb79c81 to
d1f2abf
Compare
417606c to
6246c4e
Compare
d1f2abf to
7489808
Compare
6246c4e to
fec3c41
Compare
37a51d2 to
11b1049
Compare
fec3c41 to
1dd975d
Compare
45ac313 to
e96dbe8
Compare
11c74b6 to
2cb3108
Compare
72fe398 to
37c2a1c
Compare
ac7c57e to
89d2cf5
Compare
5a9c85e to
d1572d7
Compare
bergundy
left a comment
There was a problem hiding this comment.
Can you confirm whether you're covering cancel requests and pause requests?
Do we also care about canceling activities that we know timed out or are we letting the worker take care of that?
When will you be adding support for standalone activities too?
89d2cf5 to
79e4e3e
Compare
Records worker_commands_sent{outcome=max_attempts_exceeded} so dropped
tasks are observable in dashboards and alerts.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
yycptt
left a comment
There was a problem hiding this comment.
I didn't review the actual nexus request dispatch part closely. Nexus crew can do a better job than me I guess :) .
| if ai.RequestId == requestID { | ||
| response.StartedTime = ai.StartedTime | ||
| response.Attempt = ai.Attempt | ||
| response.Clock = ai.StartedClock |
There was a problem hiding this comment.
I guess we need to handle backward compatibility case here as well.
There was a problem hiding this comment.
The early-return path returns nil when StartedClock hasn't been stored yet — same behavior as before this PR. On the cancel side (handler), we added a nil check: if StartedClock is nil, we skip the cancel command and let the activity time out normally.
There was a problem hiding this comment.
Not about the cancellation side, but for normal respondActivityCompleted path.
Existing logic will have clock always populated (in history/handler.go) in the task token, based on which, respondActivityCompleted will do a staleness check on the shard. With the change, this early return path will no longer have the clock in response.
There was a problem hiding this comment.
Ah very good catch! I have fixed it and also added a test. I am assuming it is ok to create this vector clock while holding the workflow lock.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
## Summary Defines a Nexus service for server-to-worker communication, starting with activity cancellation support. ## Design Decision We chose a **generic command API** (`ExecuteCommandsRequest` with `oneof` command types) instead of a cancel-specific API. This allows a future optimization to batch multiple commands (cancel, pause, etc) in a single request and deliver to a worker in one RPC. ## Files - `temporal/api/nexusservices/workerservice/v1/request_response.proto` - request response definitions - `nexus-rpc/temporal-proto-models-nexusrpc.yaml` - Nexus service definition ## Related - [Server PR](temporalio/temporal#9233) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
## Summary Defines a Nexus service for server-to-worker communication, starting with activity cancellation support. ## Design Decision We chose a **generic command API** (`ExecuteCommandsRequest` with `oneof` command types) instead of a cancel-specific API. This allows a future optimization to batch multiple commands (cancel, pause, etc) in a single request and deliver to a worker in one RPC. ## Files - `temporal/api/nexusservices/workerservice/v1/request_response.proto` - request response definitions - `nexus-rpc/temporal-proto-models-nexusrpc.yaml` - Nexus service definition ## Related - [Server PR](temporalio/temporal#9233) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
## Summary Defines a Nexus service for server-to-worker communication, starting with activity cancellation support. ## Design Decision We chose a **generic command API** (`ExecuteCommandsRequest` with `oneof` command types) instead of a cancel-specific API. This allows a future optimization to batch multiple commands (cancel, pause, etc) in a single request and deliver to a worker in one RPC. ## Files - `temporal/api/nexusservices/workerservice/v1/request_response.proto` - request response definitions - `nexus-rpc/temporal-proto-models-nexusrpc.yaml` - Nexus service definition ## Related - [Server PR](temporalio/temporal#9233) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
5d17c04 to
5521837
Compare
- Add StartedClock nil check in cancel handler (backward compat for activities started before deploy) - Add WorkerCommandsTask case to standby executor (drop task) - Use shardCtx.GetConfig() instead of passing config param - Add lock around Executable.Attempt() for thread safety - Add replication comment on started_clock proto field - Add tests for cancel command with/without StartedClock Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5521837 to
3e4267f
Compare
…worker (#9231) ## What changed? As part of RecordActivityTaskStarted flow, store worker_control_task_queue for an activity in the mutable state (ActivityInfo). Main changes: - executions.proto: Added the new worker_control_task_queue field. - mutable_state_impl.go: Update mutable state. - matching/forwarder.go: Propagate worker_control_task_queue when polls get forwarded. Otherwise, RecordActivityTaskStarted request will not have it set when invoked from a forwarded poll. ## Why? To support activity cancellation without activity heartbeat. Overall flow: - [This PR] Store worker attributes in ActivityInfo as part of RecordActivityTaskStarted call. - [#9232] When user cancels a workflow, create 1 or more tasks. Group all activities belonging to a worker into the task (for efficiency). - [#9233] Lookup the Nexus task queue for each worker, and send a Nexus operation for each transfer task. - [SDK] Worker will receive this cancel task and cancel the running activities. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
The replace directive pointed to a pre-release commit. The released v1.62.8 includes all needed protos (WorkerCommand, etc). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The else-if branch already implies StartedEventId != EmptyEventID since the prior if-branch handles the == case. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Nexus (#9232) ## What changed? New outbound task type (`WorkerCommandsTask`) that carries worker commands to be dispatched to workers via Nexus. Uses the generic `WorkerCommand` proto (not cancel-activity-specific), so this task type can carry any future command types. Suggested review order: proto changes → `worker_commands_task.go` → `task_generator.go` → `workflow_task_completed_handler.go` Key pieces: - **Proto**: `TASK_TYPE_WORKER_COMMANDS` enum, `WorkerCommandsTask` in `OutboundTaskInfo` with `repeated WorkerCommand`. - **Task definition**: `worker_commands_task.go` — implements outbound `Task` and `HasDestination` interfaces. - **Task creation** (`workflow_task_completed_handler.go`, `task_generator.go`): When `RequestCancelActivityTask` is processed for a started activity whose worker has a control queue, collects a `CancelActivityCommand` with the activity's task token. Commands are batched by destination control queue and flushed as one `WorkerCommandsTask` per queue at the end of WFT processing. - **Serialization**: `task_serializers.go` for persistence round-tripping. Dispatch is a no-op here — handled in #9233. Gated by dynamic config `EnableCancelActivityWorkerCommand` (default: off). ## Why? To support proactive activity cancellation without waiting for heartbeat. This is the task creation leg of the flow. 1. [#9231] Store `worker_control_task_queue` in `ActivityInfo` at activity start. 2. **[This PR]** On `RequestCancelActivityTask`, batch commands by control queue into `WorkerCommandsTask` outbound tasks. 3. [#9233] Dispatch each task as a Nexus `ExecuteCommands` operation to the worker, with a 3-attempt retry cap. 4. [SDK] Worker receives the cancel command and cancels the running activity. Gated by dynamic config `EnableCancelActivityWorkerCommand` (default: off). ## How did you test it? **Unit tests** cover task generation, command batching (including multi-queue batching), task serialization round-tripping, and the feature-flag-off path. --------- Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Restore tab alignment for TaskDiscarded in metric_defs.go - Remove extra blank line in handler test - Replace assert.Equal/Contains/Empty with require equivalents in dispatcher, handler, and dispatch response tests - Remove unused assert imports Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Nexus service descriptor (WorkerService.ServiceName, WorkerService.ExecuteCommands.Name()) is not yet published in go.temporal.io/api v1.62.8. Use hardcoded constants until it is. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
common/metrics/metric_defs.go
Outdated
|
|
||
| WorkerCommandsSent = NewCounterDef("worker_commands_sent") |
There was a problem hiding this comment.
I would put this somewhere that's not in the middle of all of the task metrics.
| // The task queue on which the server will send control tasks to the worker running this activity. | ||
| string worker_control_task_queue = 51; | ||
|
|
||
| // The shard clock at the time this activity was started (RecordActivityTaskStarted). |
There was a problem hiding this comment.
Do we still need the vector clock now that we are using a stamp for validation?
| // convertTemporalFailure converts a Temporal API Failure proto into a Go error | ||
| // via the Nexus SDK failure converter. | ||
| func convertTemporalFailure(failure *failurepb.Failure) (nexusErr error, err error) { | ||
| nexusFailure, err := commonnexus.TemporalFailureToNexusFailure(failure) |
There was a problem hiding this comment.
I'm not sure this is required. You can use the Temporal Go SDK's failure converter to construct handler errors directly. Not much of a point to translate to a Nexus failure.
| } | ||
|
|
||
| // operationErrorFromFailure converts a Temporal API Failure into a Nexus SDK operation error. | ||
| func operationErrorFromFailure(failure *failurepb.Failure) error { |
There was a problem hiding this comment.
Same here, the SDK already sends back a temporal Failure object that you can construct an error from using the Temporal Go SDK's default failure converter.
There was a problem hiding this comment.
You just won't get back an operation error, you'll get an ApplicationError or a CanceledError. That's fine IMHO.
| logger, | ||
| metricsHandler, | ||
| f.ChasmEngine, | ||
| f.MatchingRawClient, |
There was a problem hiding this comment.
Is the "raw" client the right client to use here? I think you want the other client with retries and all.
There was a problem hiding this comment.
All other tasks executors also use raw clients and let task processing side controling the retry/throttling etc.
| return d.handleError(nexusErr, task) | ||
| } | ||
|
|
||
| func (d *workerCommandsTaskDispatcher) handleError(nexusErr error, task *tasks.WorkerCommandsTask) error { |
There was a problem hiding this comment.
Check handlerErr.Retryable() to decide whether to retry the error.
…ctivities When a duplicate RecordActivityTaskStarted request hits the early-return path for an activity started before StartedClock was deployed, fall back to creating a fresh vector clock. This preserves the shard staleness check that was previously provided by handler.go's unconditional NewVectorClock() call (which this PR moved into recordActivityTaskStarted). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add Clock assertion to the existing integration test, rename helper for
clarity, and fix comment wording ("StartedClock field was added").
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Exercises the early-return path where the activity is already started and the request ID matches, verifying the stored StartedClock is returned in the response. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Move WorkerCommandsSent metric out of the task metrics section into the Nexus metrics section. - Check handlerErr.Retryable() to drop non-retryable handler errors instead of retrying them. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
What
Dispatches worker commands (starting with activity cancellation) to workers via their Nexus control queue. When the outbound queue processes a
WorkerCommandsTask, the dispatcher sends anExecuteCommandsNexus operation to the worker's control queue viaDispatchNexusTask. Retries are capped at 3 attempts since these commands are best-effort (the activity will eventually time out anyway).Suggested review order:
worker_commands_task_dispatcher.go→recordactivitytaskstarted/api.go(clock storage for task token reconstruction).Why
To support activity cancellation without activity heartbeat. This is the dispatch leg of the flow:
worker_control_task_queueinActivityInfoat activity start.RequestCancelActivityTask, batch commands by control queue intoWorkerCommandsTaskoutbound tasks.ExecuteCommandsoperation to the worker, with a 3-attempt retry cap.Gated by dynamic config
EnableCancelActivityWorkerCommand(default: off).How did you test it?