Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions apps/workspace-engine/pkg/db/computed_resources.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions apps/workspace-engine/pkg/db/deployments.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions apps/workspace-engine/pkg/db/queries/computed_resources.sql
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,26 @@ WHERE cdr.deployment_id = @deployment_id
AND cdr.resource_id = @resource_id
LIMIT 1;

-- name: GetReleaseTargetsForDeploymentAndResource :many
-- Returns every release target for a (deployment, resource) pair across all
-- environments where the deployment-resource combination is valid. Used by
-- the dependency-downstream dispatcher to fan out reconciliation events to
-- every downstream release target on the same resource.
SELECT DISTINCT
cdr.deployment_id,
cer.environment_id,
cdr.resource_id
FROM computed_deployment_resource cdr
JOIN computed_environment_resource cer
ON cer.resource_id = cdr.resource_id
JOIN system_deployment sd
ON sd.deployment_id = cdr.deployment_id
JOIN system_environment se
ON se.environment_id = cer.environment_id
AND se.system_id = sd.system_id
WHERE cdr.deployment_id = @deployment_id
AND cdr.resource_id = @resource_id;

-- name: GetReleaseTargetsForEnvironment :many
-- Returns all valid release targets for an environment by joining computed
-- resource tables through the system link tables.
Expand Down
5 changes: 2 additions & 3 deletions apps/workspace-engine/pkg/db/queries/deployments.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ SELECT deployment_id FROM system_deployment WHERE system_id = $1;
DELETE FROM deployment WHERE id = $1;

-- name: GetDeploymentDependenciesByVersionID :many
-- Dependencies are pinned per deployment_version, so this returns the dep
-- edges that belong to a single deployment_version row.
SELECT dependency_deployment_id, version_selector
FROM deployment_version_dependency
WHERE deployment_version_id = $1
Expand All @@ -67,5 +65,6 @@ ORDER BY dependency_deployment_id;
SELECT DISTINCT dv.deployment_id
FROM deployment_version_dependency dvd
JOIN deployment_version dv ON dv.id = dvd.deployment_version_id
WHERE dvd.dependency_deployment_id = $1;
WHERE dvd.dependency_deployment_id = $1
ORDER BY dv.deployment_id;

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package jobdispatch

import (
"context"
"fmt"

"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"workspace-engine/pkg/db"
"workspace-engine/pkg/reconcile"
"workspace-engine/pkg/reconcile/events"
)

// dispatchDependencyDownstreamTargets enqueues desired-release evaluations
// for release targets belonging to deployments whose versions declare a
// dependency on the deployment whose job just succeeded. The fan-out is
// scoped to the same resource as the upstream job, since deployment-version
// dependency gates are evaluated per (version, resource).
//
// Only callers that have observed a JobStatusSuccessful transition should
// invoke this — that is the only state change that flips a deployment's
// "current release" on a resource and therefore the only thing a downstream
// gate could newly accept.
func dispatchDependencyDownstreamTargets(
ctx context.Context,
queue reconcile.Queue,
jobID uuid.UUID,
) error {
ctx, span := tracer.Start(ctx, "DispatchDependencyDownstreamTargets",
trace.WithAttributes(attribute.String("job.id", jobID.String())),
)
defer span.End()

queries := db.GetQueries(ctx)

release, err := queries.GetReleaseByJobID(ctx, jobID)
if err != nil {
return fmt.Errorf("get release by job id: %w", err)
}

wsID, err := queries.GetWorkspaceIDByJobID(ctx, jobID)
if err != nil {
return fmt.Errorf("get workspace id: %w", err)
}

span.SetAttributes(
attribute.String("workspace.id", wsID.String()),
attribute.String("upstream.deployment.id", release.DeploymentID.String()),
attribute.String("resource.id", release.ResourceID.String()),
)

downstreamDeps, err := queries.GetDeploymentsWithVersionsDependingOn(ctx, release.DeploymentID)
if err != nil {
return fmt.Errorf("get deployments with versions depending on: %w", err)
}
Comment on lines +53 to +56
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetDeploymentsWithVersionsDependingOn is not scoped to the upstream job's workspace, but you enqueue downstream desired-release evals using the upstream wsID for every item. If a deployment version dependency can reference deployments across workspaces (schema doesn't appear to prevent it), this could enqueue work items into the wrong workspace / leak IDs. Consider scoping the query by workspace (e.g., add workspace_id param and filter on deployment_version.workspace_id or join deployment) or fetching each downstream deployment's workspace ID and using that when enqueueing.

Copilot uses AI. Check for mistakes.

span.SetAttributes(attribute.Int("downstream_deployments.count", len(downstreamDeps)))

if len(downstreamDeps) == 0 {
return nil
}

wsIDStr := wsID.String()
var params []events.DesiredReleaseEvalParams

for _, downstreamDepID := range downstreamDeps {
rts, err := queries.GetReleaseTargetsForDeploymentAndResource(
ctx,
db.GetReleaseTargetsForDeploymentAndResourceParams{
DeploymentID: downstreamDepID,
ResourceID: release.ResourceID,
},
)
if err != nil {
return fmt.Errorf(
"get release targets for downstream deployment %s: %w",
downstreamDepID,
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error format string uses %s with downstreamDepID (a uuid.UUID), which will produce a malformed %!s(uuid.UUID=...) message. Use downstreamDepID.String() (or %v) so the deployment ID is correctly rendered in errors/logs.

Suggested change
downstreamDepID,
downstreamDepID.String(),

Copilot uses AI. Check for mistakes.
err,
)
}
for _, rt := range rts {
params = append(params, events.DesiredReleaseEvalParams{
WorkspaceID: wsIDStr,
ResourceID: rt.ResourceID.String(),
EnvironmentID: rt.EnvironmentID.String(),
DeploymentID: rt.DeploymentID.String(),
})
}
}

Comment on lines +65 to +91
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does one DB query per downstream deployment (GetReleaseTargetsForDeploymentAndResource inside the loop). If a deployment has many downstream dependents, this becomes an N+1 query pattern. Consider adding a single sqlc query that accepts downstream_deployment_ids (uuid[]) + resource_id and returns all (deployment, environment, resource) targets in one round-trip, then build params from that result.

Suggested change
var params []events.DesiredReleaseEvalParams
for _, downstreamDepID := range downstreamDeps {
rts, err := queries.GetReleaseTargetsForDeploymentAndResource(
ctx,
db.GetReleaseTargetsForDeploymentAndResourceParams{
DeploymentID: downstreamDepID,
ResourceID: release.ResourceID,
},
)
if err != nil {
return fmt.Errorf(
"get release targets for downstream deployment %s: %w",
downstreamDepID,
err,
)
}
for _, rt := range rts {
params = append(params, events.DesiredReleaseEvalParams{
WorkspaceID: wsIDStr,
ResourceID: rt.ResourceID.String(),
EnvironmentID: rt.EnvironmentID.String(),
DeploymentID: rt.DeploymentID.String(),
})
}
}
rts, err := queries.GetReleaseTargetsForDeploymentsAndResource(
ctx,
db.GetReleaseTargetsForDeploymentsAndResourceParams{
DeploymentIds: downstreamDeps,
ResourceID: release.ResourceID,
},
)
if err != nil {
return fmt.Errorf("get release targets for downstream deployments: %w", err)
}
params := make([]events.DesiredReleaseEvalParams, 0, len(rts))
for _, rt := range rts {
params = append(params, events.DesiredReleaseEvalParams{
WorkspaceID: wsIDStr,
ResourceID: rt.ResourceID.String(),
EnvironmentID: rt.EnvironmentID.String(),
DeploymentID: rt.DeploymentID.String(),
})
}

Copilot uses AI. Check for mistakes.
span.SetAttributes(attribute.Int("release_targets.count", len(params)))

if len(params) == 0 {
return nil
}

if err := events.EnqueueManyDesiredRelease(queue, ctx, params); err != nil {
return fmt.Errorf("enqueue desired releases: %w", err)
}

span.AddEvent("desired releases enqueued", trace.WithAttributes(
attribute.Int("enqueued.count", len(params)),
))

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ func (s *PostgresSetter) UpdateJob(
return fmt.Errorf("dispatch progression targets: %w", err)
}

// Deployment-version dependency gates only react to "current release"
// changes, which only happen when a job becomes successful.
if status == oapi.JobStatusSuccessful {
if err := dispatchDependencyDownstreamTargets(ctx, s.Queue, jobIDUUID); err != nil {
return fmt.Errorf("dispatch dependency downstream targets: %w", err)
}
}
Comment on lines +110 to +116
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New behavior is introduced here (enqueueing desired-release reconciliations for downstream deployment-version dependencies when a job becomes successful), but there doesn't appear to be any test coverage asserting the downstream fan-out and enqueue behavior. Please add a unit/integration test that exercises a job status transition to successful and verifies the expected desired-release work items are enqueued for downstream deployments on the same resource.

Copilot uses AI. Check for mistakes.

return nil
}

Expand Down
Loading