diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 913f3a211a..6bd49c586b 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -2601,8 +2601,11 @@ type ActivityInfo struct { // set to true if reset heartbeat flag was set with an activity reset ResetHeartbeats bool `protobuf:"varint,48,opt,name=reset_heartbeats,json=resetHeartbeats,proto3" json:"reset_heartbeats,omitempty"` StartVersion int64 `protobuf:"varint,50,opt,name=start_version,json=startVersion,proto3" json:"start_version,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // A dedicated per-worker Nexus task queue on which the server sends control + // tasks (e.g. activity cancellation) to this specific worker instance. + WorkerControlTaskQueue string `protobuf:"bytes,51,opt,name=worker_control_task_queue,json=workerControlTaskQueue,proto3" json:"worker_control_task_queue,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ActivityInfo) Reset() { @@ -2975,6 +2978,13 @@ func (x *ActivityInfo) GetStartVersion() int64 { return 0 } +func (x *ActivityInfo) GetWorkerControlTaskQueue() string { + if x != nil { + return x.WorkerControlTaskQueue + } + return "" +} + type isActivityInfo_BuildIdInfo interface { isActivityInfo_BuildIdInfo() } @@ -4924,7 +4934,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x17NexusInvocationTaskInfo\x12\x18\n" + "\aattempt\x18\x01 \x01(\x05R\aattempt\"4\n" + "\x18NexusCancelationTaskInfo\x12\x18\n" + - "\aattempt\x18\x01 \x01(\x05R\aattempt\"\x9e\x1b\n" + + "\aattempt\x18\x01 \x01(\x05R\aattempt\"\xd9\x1b\n" + "\fActivityInfo\x12\x18\n" + "\aversion\x18\x01 \x01(\x03R\aversion\x127\n" + "\x18scheduled_event_batch_id\x18\x02 \x01(\x03R\x15scheduledEventBatchId\x12A\n" + @@ -4977,7 +4987,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "pause_info\x18. \x01(\v2:.temporal.server.api.persistence.v1.ActivityInfo.PauseInfoR\tpauseInfo\x12%\n" + "\x0eactivity_reset\x18/ \x01(\bR\ractivityReset\x12)\n" + "\x10reset_heartbeats\x180 \x01(\bR\x0fresetHeartbeats\x12#\n" + - "\rstart_version\x182 \x01(\x03R\fstartVersion\x1ay\n" + + "\rstart_version\x182 \x01(\x03R\fstartVersion\x129\n" + + "\x19worker_control_task_queue\x183 \x01(\tR\x16workerControlTaskQueue\x1ay\n" + "\x16UseWorkflowBuildIdInfo\x12+\n" + "\x12last_used_build_id\x18\x01 \x01(\tR\x0flastUsedBuildId\x122\n" + "\x15last_redirect_counter\x18\x02 \x01(\x03R\x13lastRedirectCounter\x1a\x89\x02\n" + diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index d6c26a1af1..753670d778 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -655,6 +655,10 @@ message ActivityInfo { bool reset_heartbeats = 48; int64 start_version = 50; + + // A dedicated per-worker Nexus task queue on which the server sends control + // tasks (e.g. activity cancellation) to this specific worker instance. + string worker_control_task_queue = 51; } // timer_map column diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 2356f326c0..94629a2bf3 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -242,6 +242,7 @@ func recordActivityTaskStarted( if _, err := mutableState.AddActivityTaskStartedEvent( ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(), versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(), + request.PollRequest.GetWorkerControlTaskQueue(), ); err != nil { return nil, rejectCodeUndefined, err } diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index ed96a48e53..f8af8bea73 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -99,6 +99,7 @@ func Invoke( // TODO (shahab): do we need to do anything with wf redirect in this case or any // other case where an activity starts? nil, + "", // workerControlTaskQueue not available for force complete ) if err != nil { return nil, err diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 703bd06387..bfbf4da636 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -393,6 +393,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( workflowTaskHandler := newWorkflowTaskCompletedHandler( request.GetIdentity(), + request.GetWorkerControlTaskQueue(), completedEvent.GetEventId(), // If completedEvent is nil, then GetEventId() returns 0 and this value shouldn't be used in workflowTaskHandler. ms, updateRegistry, diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index bdfe16900d..24758ceef6 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -54,6 +54,7 @@ type ( workflowTaskCompletedHandler struct { identity string + workerControlTaskQueue string workflowTaskCompletedID int64 // internal state @@ -106,6 +107,7 @@ type ( func newWorkflowTaskCompletedHandler( identity string, + workerControlTaskQueue string, workflowTaskCompletedID int64, mutableState historyi.MutableState, updateRegistry update.Registry, @@ -126,6 +128,7 @@ func newWorkflowTaskCompletedHandler( ) *workflowTaskCompletedHandler { return &workflowTaskCompletedHandler{ identity: identity, + workerControlTaskQueue: workerControlTaskQueue, workflowTaskCompletedID: workflowTaskCompletedID, // internal state @@ -585,6 +588,7 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi stamp, nil, nil, + handler.workerControlTaskQueue, // Eager: activity runs on the same worker that completed the WFT. ); err != nil { return nil, err } diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go index 80751b20bc..c0aeedd6f5 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go @@ -108,6 +108,7 @@ func TestCommandProtocolMessage(t *testing.T) { ) out.handler = newWorkflowTaskCompletedHandler( t.Name(), // identity + "", // workerControlTaskQueue 123, // workflowTaskCompletedID out.ms, out.updates, diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index b8f874446a..028bed90a2 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -6702,6 +6702,7 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6 nil, nil, nil, + "", ) return event } diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index ad722d85b2..d9faf1e56a 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -57,6 +57,7 @@ type ( *commonpb.WorkerVersionStamp, *deploymentpb.Deployment, *taskqueuespb.BuildIdRedirectInfo, + string, // workerControlTaskQueue ) (*historypb.HistoryEvent, error) AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error) AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index c571d15435..a9428c987d 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -148,18 +148,18 @@ func (mr *MockMutableStateMockRecorder) AddActivityTaskScheduledEvent(arg0, arg1 } // AddActivityTaskStartedEvent mocks base method. -func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo) (*history.HistoryEvent, error) { +func (m *MockMutableState) AddActivityTaskStartedEvent(arg0 *persistence.ActivityInfo, arg1 int64, arg2, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *deployment.Deployment, arg6 *taskqueue0.BuildIdRedirectInfo, arg7 string) (*history.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret := m.ctrl.Call(m, "AddActivityTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) ret0, _ := ret[0].(*history.HistoryEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // AddActivityTaskStartedEvent indicates an expected call of AddActivityTaskStartedEvent. -func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddActivityTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActivityTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddActivityTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) } // AddActivityTaskTimedOutEvent mocks base method. diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index c020cb1577..5227a3df35 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4093,6 +4093,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, deployment *deploymentpb.Deployment, redirectInfo *taskqueuespb.BuildIdRedirectInfo, + workerControlTaskQueue string, ) (*historypb.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskStarted err := ms.checkMutability(opTag) @@ -4121,6 +4122,8 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( ai.LastDeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment) } + ai.WorkerControlTaskQueue = workerControlTaskQueue + if !ai.HasRetryPolicy { event := ms.hBuilder.AddActivityTaskStartedEvent( scheduledEventID, diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index 9337a98fff..3cb91b7fbd 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -419,6 +419,7 @@ func (s *retryActivitySuite) makeActivityAndPutIntoFailingState() *persistencesp nil, nil, nil, + "", ) s.NoError(err) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 8abffdfa19..f5a5e441dc 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -2878,6 +2878,7 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() { nil, nil, nil, + "", ) s.NoError(err) @@ -2943,6 +2944,7 @@ func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() { nil, nil, nil, + "", ) s.NoError(err) @@ -6151,6 +6153,79 @@ func (s *mutableStateSuite) TestSetContextMetadata() { s.Equal(taskQueue, tq) } +func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerControlTaskQueue() { + s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + + // Setup workflow execution + _, err := s.mutableState.AddWorkflowExecutionStartedEvent( + &commonpb.WorkflowExecution{WorkflowId: tests.WorkflowID, RunId: tests.RunID}, + &historyservice.StartWorkflowExecutionRequest{ + NamespaceId: tests.NamespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: "workflow-type"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"}, + WorkflowRunTimeout: durationpb.New(200 * time.Second), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + di, err := s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL) + s.NoError(err) + _, _, err = s.mutableState.AddWorkflowTaskStartedEvent( + di.ScheduledEventID, + di.RequestID, + di.TaskQueue, + "identity", + nil, + nil, + nil, + false, + nil, + ) + s.NoError(err) + _, err = s.mutableState.AddWorkflowTaskCompletedEvent( + di, + &workflowservice.RespondWorkflowTaskCompletedRequest{Identity: "identity"}, + workflowTaskCompletionLimits, + ) + s.NoError(err) + + // Schedule activity + workflowTaskCompletedEventID := int64(4) + _, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent( + workflowTaskCompletedEventID, + &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "test-activity-1", + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, + }, + false, + ) + s.NoError(err) + s.Empty(activityInfo.WorkerControlTaskQueue, "WorkerControlTaskQueue should be empty before activity starts") + + // Start activity with workerControlTaskQueue + expectedWorkerControlTaskQueue := "test-control-queue" + _, err = s.mutableState.AddActivityTaskStartedEvent( + activityInfo, + activityInfo.ScheduledEventId, + uuid.NewString(), + "worker-identity", + nil, + nil, + nil, + expectedWorkerControlTaskQueue, + ) + s.NoError(err) + + // Verify workerControlTaskQueue is stored + updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId) + s.True(ok) + s.Equal(expectedWorkerControlTaskQueue, updatedActivityInfo.WorkerControlTaskQueue) +} + func (s *mutableStateSuite) TestCloseTransaction_PrincipalStamped() { for _, tc := range []struct { name string diff --git a/service/matching/forwarder.go b/service/matching/forwarder.go index 48710b2f63..27e67ce8aa 100644 --- a/service/matching/forwarder.go +++ b/service/matching/forwarder.go @@ -251,6 +251,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: fwdr.partition.RpcName(), Conditions: pollMetadata.conditions, @@ -275,6 +276,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: fwdr.partition.RpcName(), Conditions: pollMetadata.conditions, diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index 6eab8ff798..2aeaa85181 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -105,6 +105,7 @@ type ( forwardedFrom string localPollStartTime time.Time workerInstanceKey string + workerControlTaskQueue string } userDataUpdate struct { @@ -686,6 +687,7 @@ pollLoop: forwardedFrom: req.ForwardedSource, conditions: req.Conditions, workerInstanceKey: request.WorkerInstanceKey, + workerControlTaskQueue: request.WorkerControlTaskQueue, } task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata) if err != nil { @@ -958,6 +960,7 @@ pollLoop: forwardedFrom: req.ForwardedSource, conditions: req.Conditions, workerInstanceKey: request.WorkerInstanceKey, + workerControlTaskQueue: request.WorkerControlTaskQueue, } task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata) if err != nil { diff --git a/service/matching/pri_forwarder.go b/service/matching/pri_forwarder.go index bfa6dadc3e..e8f305b5d5 100644 --- a/service/matching/pri_forwarder.go +++ b/service/matching/pri_forwarder.go @@ -239,6 +239,7 @@ func ForwardPollWithTarget( WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: source.RpcName(), Conditions: pollMetadata.conditions, @@ -263,6 +264,7 @@ func ForwardPollWithTarget( WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, WorkerInstanceKey: pollMetadata.workerInstanceKey, + WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue, }, ForwardedSource: source.RpcName(), Conditions: pollMetadata.conditions,