diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index b0f87b50..5883bca7 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -196,6 +196,12 @@ type TemporalWorkerDeploymentStatus struct { // +optional LastModifierIdentity string `json:"lastModifierIdentity,omitempty"` + // ManagerIdentity is the identity that has exclusive rights to modify this Worker Deployment's routing config. + // When set, clients whose identity does not match will be blocked from making routing changes. + // Empty by default. Use `temporal worker deployment manager-identity set/unset` to change. + // +optional + ManagerIdentity string `json:"managerIdentity,omitempty"` + // VersionCount is the total number of versions currently known by the worker deployment. // This includes current, target, ramping, and deprecated versions. // +optional diff --git a/docs/ownership.md b/docs/ownership.md index 3a51096c..6fb3cc57 100644 --- a/docs/ownership.md +++ b/docs/ownership.md @@ -2,57 +2,52 @@ ## Problem -If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker -Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the -new target version from ramping because an issue was detected), the controller should not clobber what the human did. +If the worker controller is managing a Worker Deployment (i.e. updating its routing config), but a user makes a manual +change via the CLI, SDK, or gRPC API instead of via the `TemporalWorkerDeployment` CRD interface, the controller should +not clobber the user's change. -At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is -authorized to resume making changes to the Routing Config of the Worker Deployment. +Once the user has finished their manual intervention, they need a way to hand ownership back to the controller. ## Solution -_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `ManagerIdentity` -field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._ +The controller uses the Temporal server's `ManagerIdentity` field on Worker Deployments to coordinate exclusive +ownership of routing changes. -In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether -another user has made a change. If another user made a change to the Worker Deployment, the controller will not make -any more changes to ensure a human's change is not clobbered. +When `ManagerIdentity` is set on a Worker Deployment, only clients whose identity matches `ManagerIdentity` can make +routing changes (set current version, set ramping version). The controller's identity is visible in the +`managerIdentity` field of the `TemporalWorkerDeployment` status. -Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready -for the Worker Controller to take over, you can update the metadata to indicate that. +### How the controller claims ownership -There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on -the Current Version of your Worker Deployment. +The first time the controller plans a routing change for a Worker Deployment (i.e. when `ManagerIdentity` is empty), +it calls `SetManagerIdentity` to claim ownership before applying the change. Subsequent routing changes succeed because +the controller's identity already matches `ManagerIdentity`. -Note: The controller decodes this metadata value as a string. Be sure to set the value to the string "true" (not the boolean true). +### Taking manual control + +To take manual control away from the controller, set `ManagerIdentity` to your own identity: ```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $CURRENT_VERSION_BUILD_ID \ - --metadata 'temporal.io/ignore-last-modifier="true"' -``` -Alternatively, if your CLI supports JSON input: -```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $CURRENT_VERSION_BUILD_ID \ - --metadata-json '{"temporal.io/ignore-last-modifier":"true"}' -``` -In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version -```bash -temporal worker deployment update-metadata-version \ +temporal worker deployment manager-identity set \ --deployment-name $MY_DEPLOYMENT \ - --build-id $RAMPING_VERSION_BUILD_ID \ - --metadata 'temporal.io/ignore-last-modifier="true"' + --self ``` -Or with JSON: + +The `--self` flag sets `ManagerIdentity` to the identity of the caller (auto-generated by the CLI if not explicitly +provided via `--identity`; similarly, the SDK uses its own auto-generated or configured identity). After this, the +controller's routing change attempts will fail and it will retry on a backoff until ownership is returned. + +You can then make routing changes freely (the server enforces `ManagerIdentity` for all clients, not just the +controller). + +### Returning ownership to the controller + +When you are done with your manual changes and want the controller to resume, clear `ManagerIdentity`: + ```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $RAMPING_VERSION_BUILD_ID \ - --metadata-json '{"temporal.io/ignore-last-modifier":"true"}' +temporal worker deployment manager-identity unset \ + --deployment-name $MY_DEPLOYMENT ``` -In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to -set a Current or Ramping Version and then do as instructed above. \ No newline at end of file +On the next reconcile, the controller will detect that `ManagerIdentity` is empty, claim it for itself, and resume +managing routing changes. \ No newline at end of file diff --git a/internal/controller/clientpool/clientpool.go b/internal/controller/clientpool/clientpool.go index 154d2666..3fee255c 100644 --- a/internal/controller/clientpool/clientpool.go +++ b/internal/controller/clientpool/clientpool.go @@ -97,6 +97,7 @@ type NewClientOptions struct { TemporalNamespace string K8sNamespace string Spec v1alpha1.TemporalConnectionSpec + Identity string } func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewClientOptions) (*sdkclient.Options, *ClientPoolKey, *ClientAuth, error) { @@ -104,6 +105,7 @@ func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewC Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, } var pemCert []byte @@ -165,6 +167,7 @@ func (cp *ClientPool) fetchClientUsingAPIKeySecret(secret corev1.Secret, opts Ne Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, ConnectionOptions: sdkclient.ConnectionOptions{ TLS: &tls.Config{}, }, @@ -193,6 +196,7 @@ func (cp *ClientPool) fetchClientUsingNoCredentials(opts NewClientOptions) (*sdk Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, } key := ClientPoolKey{ diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index e889682f..b2da014b 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -156,6 +156,22 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con return nil } + if vcfg.ClaimManagerIdentity { + resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, + ConflictToken: vcfg.ConflictToken, + }) + if err != nil { + l.Error(err, "unable to claim manager identity") + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed, + "Failed to claim manager identity: %v", err) + return fmt.Errorf("unable to claim manager identity: %w", err) + } + l.Info("claimed manager identity", "identity", getControllerIdentity()) + // Use the updated conflict token for the subsequent routing config change. + vcfg.ConflictToken = resp.ConflictToken + } + if vcfg.SetCurrent { l.Info("registering new current version", "buildID", vcfg.BuildID) if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index 050158cc..a5fd891e 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -80,15 +80,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( ScaleDeployments: make(map[*corev1.ObjectReference]uint32), } - // Check if we need to force manual strategy due to external modification rolloutStrategy := w.Spec.RolloutStrategy - if w.Status.LastModifierIdentity != getControllerIdentity() && - w.Status.LastModifierIdentity != serverDeleteVersionIdentity && - w.Status.LastModifierIdentity != "" && - !temporalState.IgnoreLastModifier { - l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity)) - rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual - } // Resolve gate input if gate is configured var gateInput []byte diff --git a/internal/controller/state_mapper.go b/internal/controller/state_mapper.go index 804d92a4..600894e4 100644 --- a/internal/controller/state_mapper.go +++ b/internal/controller/state_mapper.go @@ -34,6 +34,7 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker } status.LastModifierIdentity = m.temporalState.LastModifierIdentity + status.ManagerIdentity = m.temporalState.ManagerIdentity // Get build IDs directly from temporal state currentBuildID := m.temporalState.CurrentBuildID diff --git a/internal/controller/util.go b/internal/controller/util.go index 8c12fece..5439be5f 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -18,15 +18,16 @@ import ( // status API and may change between releases. Do not write alerting or automation // that depends on these strings. const ( - ReasonPlanGenerationFailed = "PlanGenerationFailed" - ReasonPlanExecutionFailed = "PlanExecutionFailed" - ReasonDeploymentCreateFailed = "DeploymentCreateFailed" - ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" - ReasonDeploymentScaleFailed = "DeploymentScaleFailed" - ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" - ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" - ReasonVersionPromotionFailed = "VersionPromotionFailed" - ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonPlanGenerationFailed = "PlanGenerationFailed" + ReasonPlanExecutionFailed = "PlanExecutionFailed" + ReasonDeploymentCreateFailed = "DeploymentCreateFailed" + ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" + ReasonDeploymentScaleFailed = "DeploymentScaleFailed" + ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" + ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" + ReasonVersionPromotionFailed = "VersionPromotionFailed" + ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonManagerIdentityClaimFailed = "ManagerIdentityClaimFailed" ) const ( diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 1935c3c1..e4fa6d5d 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -184,6 +184,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req K8sNamespace: workerDeploy.Namespace, TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, Spec: temporalConnection.Spec, + Identity: getControllerIdentity(), }) if err != nil { l.Error(err, "invalid Temporal auth secret") diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 2f9f0158..578980d1 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -27,6 +27,7 @@ type Plan struct { ShouldCreateDeployment bool VersionConfig *VersionConfig TestWorkflows []WorkflowConfig + // Build IDs of versions from which the controller should // remove IgnoreLastModifierKey from the version metadata RemoveIgnoreLastModifierBuilds []string @@ -45,6 +46,10 @@ type VersionConfig struct { SetCurrent bool // Acceptable values [0,100] RampPercentage int32 + + // ClaimManagerIdentity indicates the controller should call SetManagerIdentity(Self=true) + // before applying this version config, because ManagerIdentity is currently unset. + ClaimManagerIdentity bool } // WorkflowConfig defines a workflow to be started @@ -547,8 +552,9 @@ func getVersionConfigDiff( } vcfg := &VersionConfig{ - ConflictToken: conflictToken, - BuildID: status.TargetVersion.BuildID, + ConflictToken: conflictToken, + BuildID: status.TargetVersion.BuildID, + ClaimManagerIdentity: temporalState != nil && temporalState.ManagerIdentity == "", } // If there is no current version and presence of unversioned pollers is not confirmed for all diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index 9873c2b5..e5537b41 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -40,6 +40,7 @@ func TestGeneratePlan(t *testing.T) { expectConfig bool expectConfigSetCurrent *bool // pointer so we can test nil expectConfigRampPercent *int32 // pointer so we can test nil, in percentage (0-100) + expectClaimManagerIdentity *bool // pointer so nil means "don't assert" maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75 }{ { @@ -390,6 +391,101 @@ func TestGeneratePlan(t *testing.T) { }, expectUpdate: 1, }, + { + // VersionConfig is non-nil because a new healthy target needs to be promoted. + // With empty ManagerIdentity the controller should claim before promoting. + name: "claims manager identity when ManagerIdentity is empty and a version config change is pending", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "oldbuild": createDeploymentWithDefaultConnectionSpecHash(1), + "newbuild": createDeploymentWithDefaultConnectionSpecHash(1), + }, + DeploymentRefs: map[string]*corev1.ObjectReference{ + "oldbuild": {Name: "test-oldbuild"}, + "newbuild": {Name: "test-newbuild"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "newbuild", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &corev1.ObjectReference{Name: "test-newbuild"}, + HealthySince: &metav1.Time{ + Time: time.Now().Add(-1 * time.Hour), + }, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "oldbuild", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "test-oldbuild"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{ + ManagerIdentity: "", // empty → controller should claim + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + }, + expectConfig: true, + expectConfigSetCurrent: func() *bool { b := true; return &b }(), + expectClaimManagerIdentity: func() *bool { b := true; return &b }(), + }, + { + // Same routing change scenario but ManagerIdentity is already set — no claim. + name: "does not claim manager identity when ManagerIdentity is already set", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "oldbuild": createDeploymentWithDefaultConnectionSpecHash(1), + "newbuild": createDeploymentWithDefaultConnectionSpecHash(1), + }, + DeploymentRefs: map[string]*corev1.ObjectReference{ + "oldbuild": {Name: "test-oldbuild"}, + "newbuild": {Name: "test-newbuild"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "newbuild", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &corev1.ObjectReference{Name: "test-newbuild"}, + HealthySince: &metav1.Time{ + Time: time.Now().Add(-1 * time.Hour), + }, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "oldbuild", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "test-oldbuild"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{ + ManagerIdentity: "some-other-client", + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + }, + expectConfig: true, + expectConfigSetCurrent: func() *bool { b := true; return &b }(), + expectClaimManagerIdentity: func() *bool { b := false; return &b }(), + }, } for _, tc := range testCases { @@ -411,6 +507,10 @@ func TestGeneratePlan(t *testing.T) { assert.Equal(t, tc.expectUpdate, len(plan.UpdateDeployments), "unexpected number of updates") assert.Equal(t, tc.expectWorkflow, len(plan.TestWorkflows), "unexpected number of test workflows") assert.Equal(t, tc.expectConfig, plan.VersionConfig != nil, "unexpected version config presence") + if tc.expectClaimManagerIdentity != nil { + require.NotNil(t, plan.VersionConfig, "expected VersionConfig to be non-nil when asserting ClaimManagerIdentity") + assert.Equal(t, *tc.expectClaimManagerIdentity, plan.VersionConfig.ClaimManagerIdentity, "unexpected ClaimManagerIdentity") + } if tc.expectConfig { assert.NotNil(t, plan.VersionConfig, "expected version config") diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index e74af888..4dbb6d30 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -62,6 +62,7 @@ type TemporalWorkerState struct { // Versions indexed by build ID Versions map[string]*VersionInfo LastModifierIdentity string + ManagerIdentity string IgnoreLastModifier bool } @@ -106,6 +107,7 @@ func GetWorkerDeploymentState( } state.RampPercentage = routingConfig.RampingVersionPercentage state.LastModifierIdentity = workerDeploymentInfo.LastModifierIdentity + state.ManagerIdentity = workerDeploymentInfo.ManagerIdentity state.VersionConflictToken = resp.ConflictToken // Decide whether to ignore LastModifierIdentity diff --git a/internal/tests/internal/deployment_controller.go b/internal/tests/internal/deployment_controller.go index fa596b3b..05e3c3d8 100644 --- a/internal/tests/internal/deployment_controller.go +++ b/internal/tests/internal/deployment_controller.go @@ -106,12 +106,19 @@ func applyDeployment(t *testing.T, ctx context.Context, k8sClient client.Client, // Set deployment status to `DeploymentAvailable` to simulate a healthy deployment // This is necessary because envtest doesn't actually start pods func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient client.Client, deployment appsv1.Deployment) { + // Refetch to get the latest resourceVersion before updating status, since the + // controller may have modified the Deployment (e.g. rolling update) while workers + // were starting up. + var fresh appsv1.Deployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, &fresh); err != nil { + t.Fatalf("failed to refetch deployment before status update: %v", err) + } now := metav1.Now() - deployment.Status = appsv1.DeploymentStatus{ - Replicas: *deployment.Spec.Replicas, - UpdatedReplicas: *deployment.Spec.Replicas, - ReadyReplicas: *deployment.Spec.Replicas, - AvailableReplicas: *deployment.Spec.Replicas, + fresh.Status = appsv1.DeploymentStatus{ + Replicas: *fresh.Spec.Replicas, + UpdatedReplicas: *fresh.Spec.Replicas, + ReadyReplicas: *fresh.Spec.Replicas, + AvailableReplicas: *fresh.Spec.Replicas, UnavailableReplicas: 0, Conditions: []appsv1.DeploymentCondition{ { @@ -132,8 +139,8 @@ func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient cli }, }, } - t.Logf("started %d healthy workers, updating deployment status", *deployment.Spec.Replicas) - if err := k8sClient.Status().Update(ctx, &deployment); err != nil { + t.Logf("started %d healthy workers, updating deployment status", *fresh.Spec.Replicas) + if err := k8sClient.Status().Update(ctx, &fresh); err != nil { t.Fatalf("failed to update deployment status: %v", err) } } diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index 9c6d399c..78661a72 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -17,13 +17,11 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/controller" "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" "github.com/temporalio/temporal-worker-controller/internal/k8s" - "github.com/temporalio/temporal-worker-controller/internal/temporal" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" temporalClient "go.temporal.io/sdk/client" "go.temporal.io/sdk/log" - "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -284,63 +282,68 @@ func getPollers(ctx context.Context, return resp.GetPollers(), nil } -func setUnversionedCurrent(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// setManagerIdentityToOther sets the Worker Deployment's ManagerIdentity to "some-other-cli-user" +// using a client with that identity. After this call, the controller (which has a different identity) +// will be blocked from making routing changes to this deployment. +func setManagerIdentityToOther(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) - deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) - - _, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: "", - IgnoreMissingTaskQueues: true, + c, err := temporalClient.Dial(temporalClient.Options{ + HostPort: env.Ts.GetFrontendHostPort(), + Namespace: env.Ts.GetDefaultNamespace(), + Identity: "some-other-cli-user", }) if err != nil { - t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + t.Fatalf("failed to create temporal client with other identity: %v", err) } - t.Logf("set current version to unversioned with non-controller identity") + defer c.Close() + deploymentHandle := c.WorkerDeploymentClient().GetHandle(workerDeploymentName) + _, err = deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, + }) + if err != nil { + t.Errorf("error setting manager identity to other: %v", err) + } + t.Logf("set manager identity to 'some-other-cli-user'") } -func setCurrentAndSetIgnoreModifierMetadata(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// setManagerIdentityBlockThenUnblock sets the Worker Deployment's ManagerIdentity to "some-other-cli-user" +// and then immediately clears it. This simulates a transient block: by the time the controller reconciles, +// the ManagerIdentity is empty and the controller can claim it normally. +func setManagerIdentityBlockThenUnblock(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) - deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) - - // change current version arbitrarily so we can be the last modifier - resp, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: "", - IgnoreMissingTaskQueues: true, + c, err := temporalClient.Dial(temporalClient.Options{ + HostPort: env.Ts.GetFrontendHostPort(), + Namespace: env.Ts.GetDefaultNamespace(), + Identity: "some-other-cli-user", }) if err != nil { - t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + t.Fatalf("failed to create temporal client with other identity: %v", err) } - t.Logf("set current version to unversioned with non-controller identity") + defer c.Close() - // set it back to what it was so that it's non-nil - _, err = deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: resp.PreviousVersion.BuildID, + deploymentHandle := c.WorkerDeploymentClient().GetHandle(workerDeploymentName) + resp, err := deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, }) if err != nil { - t.Errorf("error restoring current version: %v", err) + t.Errorf("error setting manager identity to other: %v", err) + return } - t.Logf("set current version to build %v with non-controller identity", resp.PreviousVersion.BuildID) + t.Logf("set manager identity to 'some-other-cli-user'") - // set the IgnoreLastModifier metadata - _, err = deploymentHandle.UpdateVersionMetadata(ctx, temporalClient.WorkerDeploymentUpdateVersionMetadataOptions{ - Version: worker.WorkerDeploymentVersion{ - DeploymentName: workerDeploymentName, - BuildID: resp.PreviousVersion.BuildID, - }, - MetadataUpdate: temporalClient.WorkerDeploymentMetadataUpdate{ - UpsertEntries: map[string]interface{}{ - temporal.IgnoreLastModifierKey: "true", - }, - }, + _, err = deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + ManagerIdentity: "", // clear + ConflictToken: resp.ConflictToken, }) if err != nil { - t.Errorf("error updating version metadata: %v", err) + t.Errorf("error clearing manager identity: %v", err) } - t.Log("set current version's metadata to have \"temporal.io/ignore-last-modifier\"=\"true\"") + t.Logf("cleared manager identity") } -func validateIgnoreLastModifierMetadata(expectShouldIgnore bool) func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// validateManagerIdentity checks that the Worker Deployment's ManagerIdentity matches expected. +func validateManagerIdentity(expected string) func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { return func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) @@ -348,14 +351,10 @@ func validateIgnoreLastModifierMetadata(expectShouldIgnore bool) func(t *testing desc, err := deploymentHandle.Describe(ctx, temporalClient.WorkerDeploymentDescribeOptions{}) if err != nil { t.Errorf("error describing worker deployment: %v", err) + return } - - shouldIgnore, err := temporal.DeploymentShouldIgnoreLastModifier(ctx, deploymentHandle, desc.Info.RoutingConfig) - if err != nil { - t.Errorf("error checking ignore last modifier for worker deployment: %v", err) - } - if shouldIgnore != expectShouldIgnore { - t.Errorf("expected ignore last modifier to be %v, got %v", expectShouldIgnore, shouldIgnore) + if desc.Info.ManagerIdentity != expected { + t.Errorf("expected manager identity to be %q, got %q", expected, desc.Info.ManagerIdentity) } } } diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 4761cf2c..4ed5ea54 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -9,7 +9,6 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/server/common/dynamicconfig" - "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/temporal" "go.temporal.io/server/temporaltest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -226,7 +225,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "manual-rollout-blocked-by-modifier", + name: "manual-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -242,20 +241,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "manual-rollout-unblocked-by-modifier-with-ignore", + name: "manual-rollout-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -270,16 +268,15 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). - WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). // manual strategy, so controller should not promote v1 to current despite being unblocked + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). // manual strategy, so controller should not promote v1 to current WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(true)), // Since this is a manual strategy, the current version at the end of the test is v0 which has the ignore last modifier set to true + ), }, } @@ -426,7 +423,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "all-at-once-rollout-blocked-by-modifier", + name: "all-at-once-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -442,20 +439,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "all-at-once-unblocked-by-modifier-with-ignore", + name: "all-at-once-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -470,7 +466,7 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). @@ -479,8 +475,7 @@ func TestIntegration(t *testing.T) { ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + ), }, } @@ -653,7 +648,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "progressive-rollout-blocked-by-modifier", + name: "progressive-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -669,20 +664,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "progressive-rollout-unblocked-by-modifier-with-ignore", + name: "progressive-rollout-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -697,7 +691,7 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusRamping, 5, true, false). @@ -705,8 +699,7 @@ func TestIntegration(t *testing.T) { ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + ), }, }