From 9c8bbd3b39f4d26bacbf02bb181bdd2ea94d75ce Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Fri, 6 Mar 2026 19:39:44 -0800 Subject: [PATCH 1/5] replace LastModifierIdentity + ignore-last-modifier metadata hack with ManagerIdentity API --- api/v1alpha1/worker_types.go | 6 ++++++ internal/controller/execplan.go | 16 ++++++++++++++++ internal/controller/genplan.go | 16 +++++++++------- internal/controller/state_mapper.go | 1 + internal/controller/util.go | 3 ++- internal/temporal/worker_deployment.go | 2 ++ 6 files changed, 36 insertions(+), 8 deletions(-) diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index b0f87b50..5883bca7 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -196,6 +196,12 @@ type TemporalWorkerDeploymentStatus struct { // +optional LastModifierIdentity string `json:"lastModifierIdentity,omitempty"` + // ManagerIdentity is the identity that has exclusive rights to modify this Worker Deployment's routing config. + // When set, clients whose identity does not match will be blocked from making routing changes. + // Empty by default. Use `temporal worker deployment manager-identity set/unset` to change. + // +optional + ManagerIdentity string `json:"managerIdentity,omitempty"` + // VersionCount is the total number of versions currently known by the worker deployment. // This includes current, target, ramping, and deprecated versions. // +optional diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index e889682f..6a76bff5 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -156,6 +156,22 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con return nil } + if p.ClaimManagerIdentity { + resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, + ConflictToken: vcfg.ConflictToken, + }) + if err != nil { + l.Error(err, "unable to claim manager identity") + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed, + "Failed to claim manager identity: %v", err) + return fmt.Errorf("unable to claim manager identity: %w", err) + } + l.Info("claimed manager identity", "identity", getControllerIdentity()) + // Use the updated conflict token for the subsequent routing config change. + vcfg.ConflictToken = resp.ConflictToken + } + if vcfg.SetCurrent { l.Info("registering new current version", "buildID", vcfg.BuildID) if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index 050158cc..fd488778 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -32,6 +32,10 @@ type plan struct { // Register new versions as current or with ramp UpdateVersionConfig *planner.VersionConfig + // ClaimManagerIdentity indicates the controller should call SetManagerIdentity(Self=true) + // because ManagerIdentity is currently unset on the Worker Deployment. + ClaimManagerIdentity bool + // Start a workflow startTestWorkflows []startWorkflowConfig @@ -80,14 +84,12 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( ScaleDeployments: make(map[*corev1.ObjectReference]uint32), } - // Check if we need to force manual strategy due to external modification + // If ManagerIdentity is unset, the controller will claim it during plan execution + // (once a version config change is ready). If another client owns it, the routing + // config update will fail at the Temporal server level. rolloutStrategy := w.Spec.RolloutStrategy - if w.Status.LastModifierIdentity != getControllerIdentity() && - w.Status.LastModifierIdentity != serverDeleteVersionIdentity && - w.Status.LastModifierIdentity != "" && - !temporalState.IgnoreLastModifier { - l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity)) - rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual + if temporalState.ManagerIdentity == "" { + plan.ClaimManagerIdentity = true } // Resolve gate input if gate is configured diff --git a/internal/controller/state_mapper.go b/internal/controller/state_mapper.go index 804d92a4..600894e4 100644 --- a/internal/controller/state_mapper.go +++ b/internal/controller/state_mapper.go @@ -34,6 +34,7 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker } status.LastModifierIdentity = m.temporalState.LastModifierIdentity + status.ManagerIdentity = m.temporalState.ManagerIdentity // Get build IDs directly from temporal state currentBuildID := m.temporalState.CurrentBuildID diff --git a/internal/controller/util.go b/internal/controller/util.go index 8c12fece..edc53035 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -26,7 +26,8 @@ const ( ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" ReasonVersionPromotionFailed = "VersionPromotionFailed" - ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonManagerIdentityClaimFailed = "ManagerIdentityClaimFailed" ) const ( diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index e74af888..4dbb6d30 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -62,6 +62,7 @@ type TemporalWorkerState struct { // Versions indexed by build ID Versions map[string]*VersionInfo LastModifierIdentity string + ManagerIdentity string IgnoreLastModifier bool } @@ -106,6 +107,7 @@ func GetWorkerDeploymentState( } state.RampPercentage = routingConfig.RampingVersionPercentage state.LastModifierIdentity = workerDeploymentInfo.LastModifierIdentity + state.ManagerIdentity = workerDeploymentInfo.ManagerIdentity state.VersionConflictToken = resp.ConflictToken // Decide whether to ignore LastModifierIdentity From 1133f35b176621d55dcecbe3cdff1c6d037fee7f Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Sat, 7 Mar 2026 16:58:46 -0800 Subject: [PATCH 2/5] Add ClaimManagerIdentity to planner, update docs and integration tests Move ClaimManagerIdentity decision into planner.VersionConfig so it can be unit-tested alongside other plan decisions. Add two planner tests covering the claim (ManagerIdentity empty) and no-claim (ManagerIdentity already set) cases. Replace the six LastModifierIdentity integration tests with equivalent ManagerIdentity-based tests: blocked tests set ManagerIdentity to an external identity and assert routing changes are rejected; unblocked tests set then immediately clear ManagerIdentity so the controller can claim it normally. Remove the old helper functions (setUnversionedCurrent, setCurrentAndSetIgnoreModifierMetadata, validateIgnoreLastModifierMetadata) and add setManagerIdentityToOther, setManagerIdentityBlockThenUnblock, and validateManagerIdentity. Rewrite docs/ownership.md to document the ManagerIdentity mechanism and the temporal worker deployment manager-identity set/unset CLI commands. Co-Authored-By: Claude Sonnet 4.6 --- docs/ownership.md | 73 +++++++------- internal/controller/execplan.go | 2 +- internal/controller/genplan.go | 10 -- internal/planner/planner.go | 10 +- internal/planner/planner_test.go | 100 ++++++++++++++++++++ internal/tests/internal/env_helpers.go | 89 +++++++++-------- internal/tests/internal/integration_test.go | 51 +++++----- 7 files changed, 209 insertions(+), 126 deletions(-) diff --git a/docs/ownership.md b/docs/ownership.md index 3a51096c..6fb3cc57 100644 --- a/docs/ownership.md +++ b/docs/ownership.md @@ -2,57 +2,52 @@ ## Problem -If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker -Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the -new target version from ramping because an issue was detected), the controller should not clobber what the human did. +If the worker controller is managing a Worker Deployment (i.e. updating its routing config), but a user makes a manual +change via the CLI, SDK, or gRPC API instead of via the `TemporalWorkerDeployment` CRD interface, the controller should +not clobber the user's change. -At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is -authorized to resume making changes to the Routing Config of the Worker Deployment. +Once the user has finished their manual intervention, they need a way to hand ownership back to the controller. ## Solution -_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `ManagerIdentity` -field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._ +The controller uses the Temporal server's `ManagerIdentity` field on Worker Deployments to coordinate exclusive +ownership of routing changes. -In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether -another user has made a change. If another user made a change to the Worker Deployment, the controller will not make -any more changes to ensure a human's change is not clobbered. +When `ManagerIdentity` is set on a Worker Deployment, only clients whose identity matches `ManagerIdentity` can make +routing changes (set current version, set ramping version). The controller's identity is visible in the +`managerIdentity` field of the `TemporalWorkerDeployment` status. -Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready -for the Worker Controller to take over, you can update the metadata to indicate that. +### How the controller claims ownership -There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on -the Current Version of your Worker Deployment. +The first time the controller plans a routing change for a Worker Deployment (i.e. when `ManagerIdentity` is empty), +it calls `SetManagerIdentity` to claim ownership before applying the change. Subsequent routing changes succeed because +the controller's identity already matches `ManagerIdentity`. -Note: The controller decodes this metadata value as a string. Be sure to set the value to the string "true" (not the boolean true). +### Taking manual control + +To take manual control away from the controller, set `ManagerIdentity` to your own identity: ```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $CURRENT_VERSION_BUILD_ID \ - --metadata 'temporal.io/ignore-last-modifier="true"' -``` -Alternatively, if your CLI supports JSON input: -```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $CURRENT_VERSION_BUILD_ID \ - --metadata-json '{"temporal.io/ignore-last-modifier":"true"}' -``` -In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version -```bash -temporal worker deployment update-metadata-version \ +temporal worker deployment manager-identity set \ --deployment-name $MY_DEPLOYMENT \ - --build-id $RAMPING_VERSION_BUILD_ID \ - --metadata 'temporal.io/ignore-last-modifier="true"' + --self ``` -Or with JSON: + +The `--self` flag sets `ManagerIdentity` to the identity of the caller (auto-generated by the CLI if not explicitly +provided via `--identity`; similarly, the SDK uses its own auto-generated or configured identity). After this, the +controller's routing change attempts will fail and it will retry on a backoff until ownership is returned. + +You can then make routing changes freely (the server enforces `ManagerIdentity` for all clients, not just the +controller). + +### Returning ownership to the controller + +When you are done with your manual changes and want the controller to resume, clear `ManagerIdentity`: + ```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $RAMPING_VERSION_BUILD_ID \ - --metadata-json '{"temporal.io/ignore-last-modifier":"true"}' +temporal worker deployment manager-identity unset \ + --deployment-name $MY_DEPLOYMENT ``` -In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to -set a Current or Ramping Version and then do as instructed above. \ No newline at end of file +On the next reconcile, the controller will detect that `ManagerIdentity` is empty, claim it for itself, and resume +managing routing changes. \ No newline at end of file diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index 6a76bff5..b2da014b 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -156,7 +156,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con return nil } - if p.ClaimManagerIdentity { + if vcfg.ClaimManagerIdentity { resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{ Self: true, ConflictToken: vcfg.ConflictToken, diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index fd488778..a5fd891e 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -32,10 +32,6 @@ type plan struct { // Register new versions as current or with ramp UpdateVersionConfig *planner.VersionConfig - // ClaimManagerIdentity indicates the controller should call SetManagerIdentity(Self=true) - // because ManagerIdentity is currently unset on the Worker Deployment. - ClaimManagerIdentity bool - // Start a workflow startTestWorkflows []startWorkflowConfig @@ -84,13 +80,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( ScaleDeployments: make(map[*corev1.ObjectReference]uint32), } - // If ManagerIdentity is unset, the controller will claim it during plan execution - // (once a version config change is ready). If another client owns it, the routing - // config update will fail at the Temporal server level. rolloutStrategy := w.Spec.RolloutStrategy - if temporalState.ManagerIdentity == "" { - plan.ClaimManagerIdentity = true - } // Resolve gate input if gate is configured var gateInput []byte diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 2f9f0158..578980d1 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -27,6 +27,7 @@ type Plan struct { ShouldCreateDeployment bool VersionConfig *VersionConfig TestWorkflows []WorkflowConfig + // Build IDs of versions from which the controller should // remove IgnoreLastModifierKey from the version metadata RemoveIgnoreLastModifierBuilds []string @@ -45,6 +46,10 @@ type VersionConfig struct { SetCurrent bool // Acceptable values [0,100] RampPercentage int32 + + // ClaimManagerIdentity indicates the controller should call SetManagerIdentity(Self=true) + // before applying this version config, because ManagerIdentity is currently unset. + ClaimManagerIdentity bool } // WorkflowConfig defines a workflow to be started @@ -547,8 +552,9 @@ func getVersionConfigDiff( } vcfg := &VersionConfig{ - ConflictToken: conflictToken, - BuildID: status.TargetVersion.BuildID, + ConflictToken: conflictToken, + BuildID: status.TargetVersion.BuildID, + ClaimManagerIdentity: temporalState != nil && temporalState.ManagerIdentity == "", } // If there is no current version and presence of unversioned pollers is not confirmed for all diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index 9873c2b5..e5537b41 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -40,6 +40,7 @@ func TestGeneratePlan(t *testing.T) { expectConfig bool expectConfigSetCurrent *bool // pointer so we can test nil expectConfigRampPercent *int32 // pointer so we can test nil, in percentage (0-100) + expectClaimManagerIdentity *bool // pointer so nil means "don't assert" maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75 }{ { @@ -390,6 +391,101 @@ func TestGeneratePlan(t *testing.T) { }, expectUpdate: 1, }, + { + // VersionConfig is non-nil because a new healthy target needs to be promoted. + // With empty ManagerIdentity the controller should claim before promoting. + name: "claims manager identity when ManagerIdentity is empty and a version config change is pending", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "oldbuild": createDeploymentWithDefaultConnectionSpecHash(1), + "newbuild": createDeploymentWithDefaultConnectionSpecHash(1), + }, + DeploymentRefs: map[string]*corev1.ObjectReference{ + "oldbuild": {Name: "test-oldbuild"}, + "newbuild": {Name: "test-newbuild"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "newbuild", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &corev1.ObjectReference{Name: "test-newbuild"}, + HealthySince: &metav1.Time{ + Time: time.Now().Add(-1 * time.Hour), + }, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "oldbuild", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "test-oldbuild"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{ + ManagerIdentity: "", // empty → controller should claim + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + }, + expectConfig: true, + expectConfigSetCurrent: func() *bool { b := true; return &b }(), + expectClaimManagerIdentity: func() *bool { b := true; return &b }(), + }, + { + // Same routing change scenario but ManagerIdentity is already set — no claim. + name: "does not claim manager identity when ManagerIdentity is already set", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "oldbuild": createDeploymentWithDefaultConnectionSpecHash(1), + "newbuild": createDeploymentWithDefaultConnectionSpecHash(1), + }, + DeploymentRefs: map[string]*corev1.ObjectReference{ + "oldbuild": {Name: "test-oldbuild"}, + "newbuild": {Name: "test-newbuild"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "newbuild", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &corev1.ObjectReference{Name: "test-newbuild"}, + HealthySince: &metav1.Time{ + Time: time.Now().Add(-1 * time.Hour), + }, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "oldbuild", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "test-oldbuild"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{ + ManagerIdentity: "some-other-client", + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + }, + expectConfig: true, + expectConfigSetCurrent: func() *bool { b := true; return &b }(), + expectClaimManagerIdentity: func() *bool { b := false; return &b }(), + }, } for _, tc := range testCases { @@ -411,6 +507,10 @@ func TestGeneratePlan(t *testing.T) { assert.Equal(t, tc.expectUpdate, len(plan.UpdateDeployments), "unexpected number of updates") assert.Equal(t, tc.expectWorkflow, len(plan.TestWorkflows), "unexpected number of test workflows") assert.Equal(t, tc.expectConfig, plan.VersionConfig != nil, "unexpected version config presence") + if tc.expectClaimManagerIdentity != nil { + require.NotNil(t, plan.VersionConfig, "expected VersionConfig to be non-nil when asserting ClaimManagerIdentity") + assert.Equal(t, *tc.expectClaimManagerIdentity, plan.VersionConfig.ClaimManagerIdentity, "unexpected ClaimManagerIdentity") + } if tc.expectConfig { assert.NotNil(t, plan.VersionConfig, "expected version config") diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index 9c6d399c..78661a72 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -17,13 +17,11 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/controller" "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" "github.com/temporalio/temporal-worker-controller/internal/k8s" - "github.com/temporalio/temporal-worker-controller/internal/temporal" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" temporalClient "go.temporal.io/sdk/client" "go.temporal.io/sdk/log" - "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -284,63 +282,68 @@ func getPollers(ctx context.Context, return resp.GetPollers(), nil } -func setUnversionedCurrent(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// setManagerIdentityToOther sets the Worker Deployment's ManagerIdentity to "some-other-cli-user" +// using a client with that identity. After this call, the controller (which has a different identity) +// will be blocked from making routing changes to this deployment. +func setManagerIdentityToOther(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) - deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) - - _, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: "", - IgnoreMissingTaskQueues: true, + c, err := temporalClient.Dial(temporalClient.Options{ + HostPort: env.Ts.GetFrontendHostPort(), + Namespace: env.Ts.GetDefaultNamespace(), + Identity: "some-other-cli-user", }) if err != nil { - t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + t.Fatalf("failed to create temporal client with other identity: %v", err) } - t.Logf("set current version to unversioned with non-controller identity") + defer c.Close() + deploymentHandle := c.WorkerDeploymentClient().GetHandle(workerDeploymentName) + _, err = deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, + }) + if err != nil { + t.Errorf("error setting manager identity to other: %v", err) + } + t.Logf("set manager identity to 'some-other-cli-user'") } -func setCurrentAndSetIgnoreModifierMetadata(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// setManagerIdentityBlockThenUnblock sets the Worker Deployment's ManagerIdentity to "some-other-cli-user" +// and then immediately clears it. This simulates a transient block: by the time the controller reconciles, +// the ManagerIdentity is empty and the controller can claim it normally. +func setManagerIdentityBlockThenUnblock(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) - deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) - - // change current version arbitrarily so we can be the last modifier - resp, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: "", - IgnoreMissingTaskQueues: true, + c, err := temporalClient.Dial(temporalClient.Options{ + HostPort: env.Ts.GetFrontendHostPort(), + Namespace: env.Ts.GetDefaultNamespace(), + Identity: "some-other-cli-user", }) if err != nil { - t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + t.Fatalf("failed to create temporal client with other identity: %v", err) } - t.Logf("set current version to unversioned with non-controller identity") + defer c.Close() - // set it back to what it was so that it's non-nil - _, err = deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: resp.PreviousVersion.BuildID, + deploymentHandle := c.WorkerDeploymentClient().GetHandle(workerDeploymentName) + resp, err := deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, }) if err != nil { - t.Errorf("error restoring current version: %v", err) + t.Errorf("error setting manager identity to other: %v", err) + return } - t.Logf("set current version to build %v with non-controller identity", resp.PreviousVersion.BuildID) + t.Logf("set manager identity to 'some-other-cli-user'") - // set the IgnoreLastModifier metadata - _, err = deploymentHandle.UpdateVersionMetadata(ctx, temporalClient.WorkerDeploymentUpdateVersionMetadataOptions{ - Version: worker.WorkerDeploymentVersion{ - DeploymentName: workerDeploymentName, - BuildID: resp.PreviousVersion.BuildID, - }, - MetadataUpdate: temporalClient.WorkerDeploymentMetadataUpdate{ - UpsertEntries: map[string]interface{}{ - temporal.IgnoreLastModifierKey: "true", - }, - }, + _, err = deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + ManagerIdentity: "", // clear + ConflictToken: resp.ConflictToken, }) if err != nil { - t.Errorf("error updating version metadata: %v", err) + t.Errorf("error clearing manager identity: %v", err) } - t.Log("set current version's metadata to have \"temporal.io/ignore-last-modifier\"=\"true\"") + t.Logf("cleared manager identity") } -func validateIgnoreLastModifierMetadata(expectShouldIgnore bool) func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// validateManagerIdentity checks that the Worker Deployment's ManagerIdentity matches expected. +func validateManagerIdentity(expected string) func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { return func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) @@ -348,14 +351,10 @@ func validateIgnoreLastModifierMetadata(expectShouldIgnore bool) func(t *testing desc, err := deploymentHandle.Describe(ctx, temporalClient.WorkerDeploymentDescribeOptions{}) if err != nil { t.Errorf("error describing worker deployment: %v", err) + return } - - shouldIgnore, err := temporal.DeploymentShouldIgnoreLastModifier(ctx, deploymentHandle, desc.Info.RoutingConfig) - if err != nil { - t.Errorf("error checking ignore last modifier for worker deployment: %v", err) - } - if shouldIgnore != expectShouldIgnore { - t.Errorf("expected ignore last modifier to be %v, got %v", expectShouldIgnore, shouldIgnore) + if desc.Info.ManagerIdentity != expected { + t.Errorf("expected manager identity to be %q, got %q", expected, desc.Info.ManagerIdentity) } } } diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 4761cf2c..4ed5ea54 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -9,7 +9,6 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/server/common/dynamicconfig" - "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/temporal" "go.temporal.io/server/temporaltest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -226,7 +225,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "manual-rollout-blocked-by-modifier", + name: "manual-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -242,20 +241,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "manual-rollout-unblocked-by-modifier-with-ignore", + name: "manual-rollout-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -270,16 +268,15 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). - WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). // manual strategy, so controller should not promote v1 to current despite being unblocked + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). // manual strategy, so controller should not promote v1 to current WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(true)), // Since this is a manual strategy, the current version at the end of the test is v0 which has the ignore last modifier set to true + ), }, } @@ -426,7 +423,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "all-at-once-rollout-blocked-by-modifier", + name: "all-at-once-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -442,20 +439,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "all-at-once-unblocked-by-modifier-with-ignore", + name: "all-at-once-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -470,7 +466,7 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). @@ -479,8 +475,7 @@ func TestIntegration(t *testing.T) { ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + ), }, } @@ -653,7 +648,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "progressive-rollout-blocked-by-modifier", + name: "progressive-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -669,20 +664,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "progressive-rollout-unblocked-by-modifier-with-ignore", + name: "progressive-rollout-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -697,7 +691,7 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusRamping, 5, true, false). @@ -705,8 +699,7 @@ func TestIntegration(t *testing.T) { ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + ), }, } From 4bab0d4992eab5d87d76c62124786267b784c555 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Sat, 7 Mar 2026 16:59:16 -0800 Subject: [PATCH 3/5] fmt-imports --- internal/controller/util.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/controller/util.go b/internal/controller/util.go index edc53035..5439be5f 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -18,16 +18,16 @@ import ( // status API and may change between releases. Do not write alerting or automation // that depends on these strings. const ( - ReasonPlanGenerationFailed = "PlanGenerationFailed" - ReasonPlanExecutionFailed = "PlanExecutionFailed" - ReasonDeploymentCreateFailed = "DeploymentCreateFailed" - ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" - ReasonDeploymentScaleFailed = "DeploymentScaleFailed" - ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" - ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" - ReasonVersionPromotionFailed = "VersionPromotionFailed" - ReasonMetadataUpdateFailed = "MetadataUpdateFailed" - ReasonManagerIdentityClaimFailed = "ManagerIdentityClaimFailed" + ReasonPlanGenerationFailed = "PlanGenerationFailed" + ReasonPlanExecutionFailed = "PlanExecutionFailed" + ReasonDeploymentCreateFailed = "DeploymentCreateFailed" + ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" + ReasonDeploymentScaleFailed = "DeploymentScaleFailed" + ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" + ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" + ReasonVersionPromotionFailed = "VersionPromotionFailed" + ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonManagerIdentityClaimFailed = "ManagerIdentityClaimFailed" ) const ( From e4f52d307bafad41b6dadcdd9abc35196afff03c Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Sat, 7 Mar 2026 22:49:53 -0800 Subject: [PATCH 4/5] use identity in all controller calls --- internal/controller/clientpool/clientpool.go | 4 ++++ internal/controller/worker_controller.go | 1 + 2 files changed, 5 insertions(+) diff --git a/internal/controller/clientpool/clientpool.go b/internal/controller/clientpool/clientpool.go index 154d2666..3fee255c 100644 --- a/internal/controller/clientpool/clientpool.go +++ b/internal/controller/clientpool/clientpool.go @@ -97,6 +97,7 @@ type NewClientOptions struct { TemporalNamespace string K8sNamespace string Spec v1alpha1.TemporalConnectionSpec + Identity string } func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewClientOptions) (*sdkclient.Options, *ClientPoolKey, *ClientAuth, error) { @@ -104,6 +105,7 @@ func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewC Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, } var pemCert []byte @@ -165,6 +167,7 @@ func (cp *ClientPool) fetchClientUsingAPIKeySecret(secret corev1.Secret, opts Ne Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, ConnectionOptions: sdkclient.ConnectionOptions{ TLS: &tls.Config{}, }, @@ -193,6 +196,7 @@ func (cp *ClientPool) fetchClientUsingNoCredentials(opts NewClientOptions) (*sdk Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, } key := ClientPoolKey{ diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 1935c3c1..e4fa6d5d 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -184,6 +184,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req K8sNamespace: workerDeploy.Namespace, TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, Spec: temporalConnection.Spec, + Identity: getControllerIdentity(), }) if err != nil { l.Error(err, "invalid Temporal auth secret") From a7be4d10b506a1f8d8ae2973669925be14615997 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Sun, 8 Mar 2026 13:28:59 -0700 Subject: [PATCH 5/5] Fix test race in setHealthyDeploymentStatus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refetch the Deployment immediately before calling Status().Update so the resourceVersion is always current. Previously, the deployment was read at the start of applyDeployment, workers were started (up to 30s), and then the stale object was used for the status update — racing with the controller performing a rolling update in the meantime. Co-Authored-By: Claude Sonnet 4.6 --- .../tests/internal/deployment_controller.go | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/internal/tests/internal/deployment_controller.go b/internal/tests/internal/deployment_controller.go index fa596b3b..05e3c3d8 100644 --- a/internal/tests/internal/deployment_controller.go +++ b/internal/tests/internal/deployment_controller.go @@ -106,12 +106,19 @@ func applyDeployment(t *testing.T, ctx context.Context, k8sClient client.Client, // Set deployment status to `DeploymentAvailable` to simulate a healthy deployment // This is necessary because envtest doesn't actually start pods func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient client.Client, deployment appsv1.Deployment) { + // Refetch to get the latest resourceVersion before updating status, since the + // controller may have modified the Deployment (e.g. rolling update) while workers + // were starting up. + var fresh appsv1.Deployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, &fresh); err != nil { + t.Fatalf("failed to refetch deployment before status update: %v", err) + } now := metav1.Now() - deployment.Status = appsv1.DeploymentStatus{ - Replicas: *deployment.Spec.Replicas, - UpdatedReplicas: *deployment.Spec.Replicas, - ReadyReplicas: *deployment.Spec.Replicas, - AvailableReplicas: *deployment.Spec.Replicas, + fresh.Status = appsv1.DeploymentStatus{ + Replicas: *fresh.Spec.Replicas, + UpdatedReplicas: *fresh.Spec.Replicas, + ReadyReplicas: *fresh.Spec.Replicas, + AvailableReplicas: *fresh.Spec.Replicas, UnavailableReplicas: 0, Conditions: []appsv1.DeploymentCondition{ { @@ -132,8 +139,8 @@ func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient cli }, }, } - t.Logf("started %d healthy workers, updating deployment status", *deployment.Spec.Replicas) - if err := k8sClient.Status().Update(ctx, &deployment); err != nil { + t.Logf("started %d healthy workers, updating deployment status", *fresh.Spec.Replicas) + if err := k8sClient.Status().Update(ctx, &fresh); err != nil { t.Fatalf("failed to update deployment status: %v", err) } }