Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ type TemporalWorkerDeploymentStatus struct {
// +optional
LastModifierIdentity string `json:"lastModifierIdentity,omitempty"`

// ManagerIdentity is the identity that has exclusive rights to modify this Worker Deployment's routing config.
// When set, clients whose identity does not match will be blocked from making routing changes.
// Empty by default. Use `temporal worker deployment manager-identity set/unset` to change.
// +optional
ManagerIdentity string `json:"managerIdentity,omitempty"`

// VersionCount is the total number of versions currently known by the worker deployment.
// This includes current, target, ramping, and deprecated versions.
// +optional
Expand Down
73 changes: 34 additions & 39 deletions docs/ownership.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,52 @@

## Problem

If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker
Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the
new target version from ramping because an issue was detected), the controller should not clobber what the human did.
If the worker controller is managing a Worker Deployment (i.e. updating its routing config), but a user makes a manual
change via the CLI, SDK, or gRPC API instead of via the `TemporalWorkerDeployment` CRD interface, the controller should
not clobber the user's change.

At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is
authorized to resume making changes to the Routing Config of the Worker Deployment.
Once the user has finished their manual intervention, they need a way to hand ownership back to the controller.

## Solution

_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `ManagerIdentity`
field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._
The controller uses the Temporal server's `ManagerIdentity` field on Worker Deployments to coordinate exclusive
ownership of routing changes.

In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether
another user has made a change. If another user made a change to the Worker Deployment, the controller will not make
any more changes to ensure a human's change is not clobbered.
When `ManagerIdentity` is set on a Worker Deployment, only clients whose identity matches `ManagerIdentity` can make
routing changes (set current version, set ramping version). The controller's identity is visible in the
`managerIdentity` field of the `TemporalWorkerDeployment` status.

Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready
for the Worker Controller to take over, you can update the metadata to indicate that.
### How the controller claims ownership

There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on
the Current Version of your Worker Deployment.
The first time the controller plans a routing change for a Worker Deployment (i.e. when `ManagerIdentity` is empty),
it calls `SetManagerIdentity` to claim ownership before applying the change. Subsequent routing changes succeed because
the controller's identity already matches `ManagerIdentity`.

Note: The controller decodes this metadata value as a string. Be sure to set the value to the string "true" (not the boolean true).
### Taking manual control

To take manual control away from the controller, set `ManagerIdentity` to your own identity:

```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $CURRENT_VERSION_BUILD_ID \
--metadata 'temporal.io/ignore-last-modifier="true"'
```
Alternatively, if your CLI supports JSON input:
```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $CURRENT_VERSION_BUILD_ID \
--metadata-json '{"temporal.io/ignore-last-modifier":"true"}'
```
In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version
```bash
temporal worker deployment update-metadata-version \
temporal worker deployment manager-identity set \
--deployment-name $MY_DEPLOYMENT \
--build-id $RAMPING_VERSION_BUILD_ID \
--metadata 'temporal.io/ignore-last-modifier="true"'
--self
```
Or with JSON:

The `--self` flag sets `ManagerIdentity` to the identity of the caller (auto-generated by the CLI if not explicitly
provided via `--identity`; similarly, the SDK uses its own auto-generated or configured identity). After this, the
controller's routing change attempts will fail and it will retry on a backoff until ownership is returned.

You can then make routing changes freely (the server enforces `ManagerIdentity` for all clients, not just the
controller).

### Returning ownership to the controller

When you are done with your manual changes and want the controller to resume, clear `ManagerIdentity`:

```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $RAMPING_VERSION_BUILD_ID \
--metadata-json '{"temporal.io/ignore-last-modifier":"true"}'
temporal worker deployment manager-identity unset \
--deployment-name $MY_DEPLOYMENT
```

In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to
set a Current or Ramping Version and then do as instructed above.
On the next reconcile, the controller will detect that `ManagerIdentity` is empty, claim it for itself, and resume
managing routing changes.
4 changes: 4 additions & 0 deletions internal/controller/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ type NewClientOptions struct {
TemporalNamespace string
K8sNamespace string
Spec v1alpha1.TemporalConnectionSpec
Identity string
}

func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewClientOptions) (*sdkclient.Options, *ClientPoolKey, *ClientAuth, error) {
clientOpts := sdkclient.Options{
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
Identity: opts.Identity,
}

var pemCert []byte
Expand Down Expand Up @@ -165,6 +167,7 @@ func (cp *ClientPool) fetchClientUsingAPIKeySecret(secret corev1.Secret, opts Ne
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
Identity: opts.Identity,
ConnectionOptions: sdkclient.ConnectionOptions{
TLS: &tls.Config{},
},
Expand Down Expand Up @@ -193,6 +196,7 @@ func (cp *ClientPool) fetchClientUsingNoCredentials(opts NewClientOptions) (*sdk
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
Identity: opts.Identity,
}

key := ClientPoolKey{
Expand Down
16 changes: 16 additions & 0 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,22 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con
return nil
}

if vcfg.ClaimManagerIdentity {
resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{
Self: true,
ConflictToken: vcfg.ConflictToken,
})
if err != nil {
l.Error(err, "unable to claim manager identity")
r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed,
"Failed to claim manager identity: %v", err)
return fmt.Errorf("unable to claim manager identity: %w", err)
}
l.Info("claimed manager identity", "identity", getControllerIdentity())
// Use the updated conflict token for the subsequent routing config change.
vcfg.ConflictToken = resp.ConflictToken
}

if vcfg.SetCurrent {
l.Info("registering new current version", "buildID", vcfg.BuildID)
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
Expand Down
8 changes: 0 additions & 8 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
ScaleDeployments: make(map[*corev1.ObjectReference]uint32),
}

// Check if we need to force manual strategy due to external modification
rolloutStrategy := w.Spec.RolloutStrategy
if w.Status.LastModifierIdentity != getControllerIdentity() &&
w.Status.LastModifierIdentity != serverDeleteVersionIdentity &&
w.Status.LastModifierIdentity != "" &&
!temporalState.IgnoreLastModifier {
l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity))
rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual
}

// Resolve gate input if gate is configured
var gateInput []byte
Expand Down
1 change: 1 addition & 0 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker
}

status.LastModifierIdentity = m.temporalState.LastModifierIdentity
status.ManagerIdentity = m.temporalState.ManagerIdentity

// Get build IDs directly from temporal state
currentBuildID := m.temporalState.CurrentBuildID
Expand Down
19 changes: 10 additions & 9 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
// status API and may change between releases. Do not write alerting or automation
// that depends on these strings.
const (
ReasonPlanGenerationFailed = "PlanGenerationFailed"
ReasonPlanExecutionFailed = "PlanExecutionFailed"
ReasonDeploymentCreateFailed = "DeploymentCreateFailed"
ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed"
ReasonDeploymentScaleFailed = "DeploymentScaleFailed"
ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed"
ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed"
ReasonVersionPromotionFailed = "VersionPromotionFailed"
ReasonMetadataUpdateFailed = "MetadataUpdateFailed"
ReasonPlanGenerationFailed = "PlanGenerationFailed"
ReasonPlanExecutionFailed = "PlanExecutionFailed"
ReasonDeploymentCreateFailed = "DeploymentCreateFailed"
ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed"
ReasonDeploymentScaleFailed = "DeploymentScaleFailed"
ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed"
ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed"
ReasonVersionPromotionFailed = "VersionPromotionFailed"
ReasonMetadataUpdateFailed = "MetadataUpdateFailed"
ReasonManagerIdentityClaimFailed = "ManagerIdentityClaimFailed"
)

const (
Expand Down
1 change: 1 addition & 0 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
K8sNamespace: workerDeploy.Namespace,
TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
Spec: temporalConnection.Spec,
Identity: getControllerIdentity(),
})
if err != nil {
l.Error(err, "invalid Temporal auth secret")
Expand Down
10 changes: 8 additions & 2 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Plan struct {
ShouldCreateDeployment bool
VersionConfig *VersionConfig
TestWorkflows []WorkflowConfig

// Build IDs of versions from which the controller should
// remove IgnoreLastModifierKey from the version metadata
RemoveIgnoreLastModifierBuilds []string
Expand All @@ -45,6 +46,10 @@ type VersionConfig struct {
SetCurrent bool
// Acceptable values [0,100]
RampPercentage int32

// ClaimManagerIdentity indicates the controller should call SetManagerIdentity(Self=true)
// before applying this version config, because ManagerIdentity is currently unset.
ClaimManagerIdentity bool
Comment on lines +50 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a bool field, how about a string field called ManagerIdentity that matches the definition of the WorkerDeployment.ManagerIdentity message field.. I find mixing imperative/instruction-style fields with declarative-style fields in a struct called VersionConfig to be confusing, personally. Better to have logic and conditionals defined in the code that consumes the VersionConfig struct instead of having those conditionals/logic be encoded in bool fields.

In execplan.go, you could have a separate shouldClaimManagerIdentity() function that examines the VersionConfig.ManagerIdentity field:

func (r *TemporalWorkerDeploymentReconciler) shouldClaimManagerIdentity(
	vcfg *VersionConfig,
) bool {
	return vcfg.ManagerIdentity == ""
}

and a little helper function to perform the claim if necessary:

func (r *TemporalWorkerDeploymentReconciler) claimManagerIdentity(
	ctx context.Context,
	l logr.Logger,
	workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
	deploymentHandler sdkclient.WorkerDeploymentHandle,
	vcfg *VersionConfig,
) error {
	resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{
		Self:          true,
		ConflictToken: vcfg.ConflictToken,
	})
	if err != nil {
		l.Error(err, "unable to claim manager identity")
		r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed,
			"Failed to claim manager identity: %v", err)
		return err
	}
	l.Info("claimed manager identity", "identity", getControllerIdentity())
	// Use the updated conflict token for the subsequent routing config change.
	vcfg.ConflictToken = resp.ConflictToken
	return nil
}

Then updateVersionConfig would be a little easier to follow/read:

func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, deploymentHandler sdkclient.WorkerDeploymentHandle, p *plan) error {
	vcfg := p.UpdateVersionConfig
	if vcfg == nil {
		return nil
	}
	if r.shouldClaimManagerIdentity(vcfg) {
		if err := r.claimManagerIdentity(ctx, l, workerDeploy, vcfg); err != nil {
			return fmt.Errorf("unable to claim manager identity: %w", err)
		}
	}
...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense will do!

}

// WorkflowConfig defines a workflow to be started
Expand Down Expand Up @@ -547,8 +552,9 @@ func getVersionConfigDiff(
}

vcfg := &VersionConfig{
ConflictToken: conflictToken,
BuildID: status.TargetVersion.BuildID,
ConflictToken: conflictToken,
BuildID: status.TargetVersion.BuildID,
ClaimManagerIdentity: temporalState != nil && temporalState.ManagerIdentity == "",
}

// If there is no current version and presence of unversioned pollers is not confirmed for all
Expand Down
100 changes: 100 additions & 0 deletions internal/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestGeneratePlan(t *testing.T) {
expectConfig bool
expectConfigSetCurrent *bool // pointer so we can test nil
expectConfigRampPercent *int32 // pointer so we can test nil, in percentage (0-100)
expectClaimManagerIdentity *bool // pointer so nil means "don't assert"
maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75
}{
{
Expand Down Expand Up @@ -390,6 +391,101 @@ func TestGeneratePlan(t *testing.T) {
},
expectUpdate: 1,
},
{
// VersionConfig is non-nil because a new healthy target needs to be promoted.
// With empty ManagerIdentity the controller should claim before promoting.
name: "claims manager identity when ManagerIdentity is empty and a version config change is pending",
k8sState: &k8s.DeploymentState{
Deployments: map[string]*appsv1.Deployment{
"oldbuild": createDeploymentWithDefaultConnectionSpecHash(1),
"newbuild": createDeploymentWithDefaultConnectionSpecHash(1),
},
DeploymentRefs: map[string]*corev1.ObjectReference{
"oldbuild": {Name: "test-oldbuild"},
"newbuild": {Name: "test-newbuild"},
},
},
status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{
TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
BuildID: "newbuild",
Status: temporaliov1alpha1.VersionStatusInactive,
Deployment: &corev1.ObjectReference{Name: "test-newbuild"},
HealthySince: &metav1.Time{
Time: time.Now().Add(-1 * time.Hour),
},
},
},
CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
BuildID: "oldbuild",
Status: temporaliov1alpha1.VersionStatusCurrent,
Deployment: &corev1.ObjectReference{Name: "test-oldbuild"},
},
},
},
spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{
Replicas: func() *int32 { r := int32(1); return &r }(),
},
state: &temporal.TemporalWorkerState{
ManagerIdentity: "", // empty → controller should claim
},
config: &Config{
RolloutStrategy: temporaliov1alpha1.RolloutStrategy{
Strategy: temporaliov1alpha1.UpdateAllAtOnce,
},
},
expectConfig: true,
expectConfigSetCurrent: func() *bool { b := true; return &b }(),
expectClaimManagerIdentity: func() *bool { b := true; return &b }(),
},
{
// Same routing change scenario but ManagerIdentity is already set — no claim.
name: "does not claim manager identity when ManagerIdentity is already set",
k8sState: &k8s.DeploymentState{
Deployments: map[string]*appsv1.Deployment{
"oldbuild": createDeploymentWithDefaultConnectionSpecHash(1),
"newbuild": createDeploymentWithDefaultConnectionSpecHash(1),
},
DeploymentRefs: map[string]*corev1.ObjectReference{
"oldbuild": {Name: "test-oldbuild"},
"newbuild": {Name: "test-newbuild"},
},
},
status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{
TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
BuildID: "newbuild",
Status: temporaliov1alpha1.VersionStatusInactive,
Deployment: &corev1.ObjectReference{Name: "test-newbuild"},
HealthySince: &metav1.Time{
Time: time.Now().Add(-1 * time.Hour),
},
},
},
CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
BuildID: "oldbuild",
Status: temporaliov1alpha1.VersionStatusCurrent,
Deployment: &corev1.ObjectReference{Name: "test-oldbuild"},
},
},
},
spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{
Replicas: func() *int32 { r := int32(1); return &r }(),
},
state: &temporal.TemporalWorkerState{
ManagerIdentity: "some-other-client",
},
config: &Config{
RolloutStrategy: temporaliov1alpha1.RolloutStrategy{
Strategy: temporaliov1alpha1.UpdateAllAtOnce,
},
},
expectConfig: true,
expectConfigSetCurrent: func() *bool { b := true; return &b }(),
expectClaimManagerIdentity: func() *bool { b := false; return &b }(),
},
}

for _, tc := range testCases {
Expand All @@ -411,6 +507,10 @@ func TestGeneratePlan(t *testing.T) {
assert.Equal(t, tc.expectUpdate, len(plan.UpdateDeployments), "unexpected number of updates")
assert.Equal(t, tc.expectWorkflow, len(plan.TestWorkflows), "unexpected number of test workflows")
assert.Equal(t, tc.expectConfig, plan.VersionConfig != nil, "unexpected version config presence")
if tc.expectClaimManagerIdentity != nil {
require.NotNil(t, plan.VersionConfig, "expected VersionConfig to be non-nil when asserting ClaimManagerIdentity")
assert.Equal(t, *tc.expectClaimManagerIdentity, plan.VersionConfig.ClaimManagerIdentity, "unexpected ClaimManagerIdentity")
}

if tc.expectConfig {
assert.NotNil(t, plan.VersionConfig, "expected version config")
Expand Down
2 changes: 2 additions & 0 deletions internal/temporal/worker_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type TemporalWorkerState struct {
// Versions indexed by build ID
Versions map[string]*VersionInfo
LastModifierIdentity string
ManagerIdentity string
IgnoreLastModifier bool
}

Expand Down Expand Up @@ -106,6 +107,7 @@ func GetWorkerDeploymentState(
}
state.RampPercentage = routingConfig.RampingVersionPercentage
state.LastModifierIdentity = workerDeploymentInfo.LastModifierIdentity
state.ManagerIdentity = workerDeploymentInfo.ManagerIdentity
state.VersionConflictToken = resp.ConflictToken

// Decide whether to ignore LastModifierIdentity
Expand Down
Loading
Loading