diff --git a/apps/workspace-engine/pkg/db/computed_resources.sql.go b/apps/workspace-engine/pkg/db/computed_resources.sql.go index 0776c2671..accb1daaa 100644 --- a/apps/workspace-engine/pkg/db/computed_resources.sql.go +++ b/apps/workspace-engine/pkg/db/computed_resources.sql.go @@ -143,6 +143,58 @@ func (q *Queries) GetReleaseTargetsForDeploymentAndEnvironment(ctx context.Conte return items, nil } +const getReleaseTargetsForDeploymentAndResource = `-- name: GetReleaseTargetsForDeploymentAndResource :many +SELECT DISTINCT + cdr.deployment_id, + cer.environment_id, + cdr.resource_id +FROM computed_deployment_resource cdr +JOIN computed_environment_resource cer + ON cer.resource_id = cdr.resource_id +JOIN system_deployment sd + ON sd.deployment_id = cdr.deployment_id +JOIN system_environment se + ON se.environment_id = cer.environment_id + AND se.system_id = sd.system_id +WHERE cdr.deployment_id = $1 + AND cdr.resource_id = $2 +` + +type GetReleaseTargetsForDeploymentAndResourceParams struct { + DeploymentID uuid.UUID + ResourceID uuid.UUID +} + +type GetReleaseTargetsForDeploymentAndResourceRow struct { + DeploymentID uuid.UUID + EnvironmentID uuid.UUID + ResourceID uuid.UUID +} + +// Returns every release target for a (deployment, resource) pair across all +// environments where the deployment-resource combination is valid. Used by +// the dependency-downstream dispatcher to fan out reconciliation events to +// every downstream release target on the same resource. +func (q *Queries) GetReleaseTargetsForDeploymentAndResource(ctx context.Context, arg GetReleaseTargetsForDeploymentAndResourceParams) ([]GetReleaseTargetsForDeploymentAndResourceRow, error) { + rows, err := q.db.Query(ctx, getReleaseTargetsForDeploymentAndResource, arg.DeploymentID, arg.ResourceID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetReleaseTargetsForDeploymentAndResourceRow + for rows.Next() { + var i GetReleaseTargetsForDeploymentAndResourceRow + if err := rows.Scan(&i.DeploymentID, &i.EnvironmentID, &i.ResourceID); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getReleaseTargetsForEnvironment = `-- name: GetReleaseTargetsForEnvironment :many SELECT DISTINCT cdr.deployment_id, diff --git a/apps/workspace-engine/pkg/db/deployments.sql.go b/apps/workspace-engine/pkg/db/deployments.sql.go index 5b9bdb1f8..dc6e9551f 100644 --- a/apps/workspace-engine/pkg/db/deployments.sql.go +++ b/apps/workspace-engine/pkg/db/deployments.sql.go @@ -78,8 +78,6 @@ type GetDeploymentDependenciesByVersionIDRow struct { VersionSelector string } -// Dependencies are pinned per deployment_version, so this returns the dep -// edges that belong to a single deployment_version row. func (q *Queries) GetDeploymentDependenciesByVersionID(ctx context.Context, dollar_1 uuid.UUID) ([]GetDeploymentDependenciesByVersionIDRow, error) { rows, err := q.db.Query(ctx, getDeploymentDependenciesByVersionID, dollar_1) if err != nil { @@ -129,6 +127,7 @@ SELECT DISTINCT dv.deployment_id FROM deployment_version_dependency dvd JOIN deployment_version dv ON dv.id = dvd.deployment_version_id WHERE dvd.dependency_deployment_id = $1 +ORDER BY dv.deployment_id ` // Returns the distinct set of deployment IDs that have at least one version diff --git a/apps/workspace-engine/pkg/db/queries/computed_resources.sql b/apps/workspace-engine/pkg/db/queries/computed_resources.sql index cf680e0cc..25f2c556d 100644 --- a/apps/workspace-engine/pkg/db/queries/computed_resources.sql +++ b/apps/workspace-engine/pkg/db/queries/computed_resources.sql @@ -92,6 +92,26 @@ WHERE cdr.deployment_id = @deployment_id AND cdr.resource_id = @resource_id LIMIT 1; +-- name: GetReleaseTargetsForDeploymentAndResource :many +-- Returns every release target for a (deployment, resource) pair across all +-- environments where the deployment-resource combination is valid. Used by +-- the dependency-downstream dispatcher to fan out reconciliation events to +-- every downstream release target on the same resource. +SELECT DISTINCT + cdr.deployment_id, + cer.environment_id, + cdr.resource_id +FROM computed_deployment_resource cdr +JOIN computed_environment_resource cer + ON cer.resource_id = cdr.resource_id +JOIN system_deployment sd + ON sd.deployment_id = cdr.deployment_id +JOIN system_environment se + ON se.environment_id = cer.environment_id + AND se.system_id = sd.system_id +WHERE cdr.deployment_id = @deployment_id + AND cdr.resource_id = @resource_id; + -- name: GetReleaseTargetsForEnvironment :many -- Returns all valid release targets for an environment by joining computed -- resource tables through the system link tables. diff --git a/apps/workspace-engine/pkg/db/queries/deployments.sql b/apps/workspace-engine/pkg/db/queries/deployments.sql index dc5f908ca..3eba72990 100644 --- a/apps/workspace-engine/pkg/db/queries/deployments.sql +++ b/apps/workspace-engine/pkg/db/queries/deployments.sql @@ -52,8 +52,6 @@ SELECT deployment_id FROM system_deployment WHERE system_id = $1; DELETE FROM deployment WHERE id = $1; -- name: GetDeploymentDependenciesByVersionID :many --- Dependencies are pinned per deployment_version, so this returns the dep --- edges that belong to a single deployment_version row. SELECT dependency_deployment_id, version_selector FROM deployment_version_dependency WHERE deployment_version_id = $1 @@ -67,5 +65,6 @@ ORDER BY dependency_deployment_id; SELECT DISTINCT dv.deployment_id FROM deployment_version_dependency dvd JOIN deployment_version dv ON dv.id = dvd.deployment_version_id -WHERE dvd.dependency_deployment_id = $1; +WHERE dvd.dependency_deployment_id = $1 +ORDER BY dv.deployment_id; diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/dispatch_dependency_downstream.go b/apps/workspace-engine/svc/controllers/jobdispatch/dispatch_dependency_downstream.go new file mode 100644 index 000000000..46cc9be3b --- /dev/null +++ b/apps/workspace-engine/svc/controllers/jobdispatch/dispatch_dependency_downstream.go @@ -0,0 +1,107 @@ +package jobdispatch + +import ( + "context" + "fmt" + + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "workspace-engine/pkg/db" + "workspace-engine/pkg/reconcile" + "workspace-engine/pkg/reconcile/events" +) + +// dispatchDependencyDownstreamTargets enqueues desired-release evaluations +// for release targets belonging to deployments whose versions declare a +// dependency on the deployment whose job just succeeded. The fan-out is +// scoped to the same resource as the upstream job, since deployment-version +// dependency gates are evaluated per (version, resource). +// +// Only callers that have observed a JobStatusSuccessful transition should +// invoke this — that is the only state change that flips a deployment's +// "current release" on a resource and therefore the only thing a downstream +// gate could newly accept. +func dispatchDependencyDownstreamTargets( + ctx context.Context, + queue reconcile.Queue, + jobID uuid.UUID, +) error { + ctx, span := tracer.Start(ctx, "DispatchDependencyDownstreamTargets", + trace.WithAttributes(attribute.String("job.id", jobID.String())), + ) + defer span.End() + + queries := db.GetQueries(ctx) + + release, err := queries.GetReleaseByJobID(ctx, jobID) + if err != nil { + return fmt.Errorf("get release by job id: %w", err) + } + + wsID, err := queries.GetWorkspaceIDByJobID(ctx, jobID) + if err != nil { + return fmt.Errorf("get workspace id: %w", err) + } + + span.SetAttributes( + attribute.String("workspace.id", wsID.String()), + attribute.String("upstream.deployment.id", release.DeploymentID.String()), + attribute.String("resource.id", release.ResourceID.String()), + ) + + downstreamDeps, err := queries.GetDeploymentsWithVersionsDependingOn(ctx, release.DeploymentID) + if err != nil { + return fmt.Errorf("get deployments with versions depending on: %w", err) + } + + span.SetAttributes(attribute.Int("downstream_deployments.count", len(downstreamDeps))) + + if len(downstreamDeps) == 0 { + return nil + } + + wsIDStr := wsID.String() + var params []events.DesiredReleaseEvalParams + + for _, downstreamDepID := range downstreamDeps { + rts, err := queries.GetReleaseTargetsForDeploymentAndResource( + ctx, + db.GetReleaseTargetsForDeploymentAndResourceParams{ + DeploymentID: downstreamDepID, + ResourceID: release.ResourceID, + }, + ) + if err != nil { + return fmt.Errorf( + "get release targets for downstream deployment %s: %w", + downstreamDepID, + err, + ) + } + for _, rt := range rts { + params = append(params, events.DesiredReleaseEvalParams{ + WorkspaceID: wsIDStr, + ResourceID: rt.ResourceID.String(), + EnvironmentID: rt.EnvironmentID.String(), + DeploymentID: rt.DeploymentID.String(), + }) + } + } + + span.SetAttributes(attribute.Int("release_targets.count", len(params))) + + if len(params) == 0 { + return nil + } + + if err := events.EnqueueManyDesiredRelease(queue, ctx, params); err != nil { + return fmt.Errorf("enqueue desired releases: %w", err) + } + + span.AddEvent("desired releases enqueued", trace.WithAttributes( + attribute.Int("enqueued.count", len(params)), + )) + + return nil +} diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go b/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go index 190cd9471..331b24365 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go @@ -107,6 +107,14 @@ func (s *PostgresSetter) UpdateJob( return fmt.Errorf("dispatch progression targets: %w", err) } + // Deployment-version dependency gates only react to "current release" + // changes, which only happen when a job becomes successful. + if status == oapi.JobStatusSuccessful { + if err := dispatchDependencyDownstreamTargets(ctx, s.Queue, jobIDUUID); err != nil { + return fmt.Errorf("dispatch dependency downstream targets: %w", err) + } + } + return nil }