From e9c66fca36b6e0a11ad0137c14f20a2d2149a97e Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 18 Jun 2026 12:30:54 -0700 Subject: [PATCH] fluence: ensure we cancel the job in the graph on complete we currently clear space on deletion, meaning the pod is terminated I think we can easily cancel when the entire group finishes or a single completion is done, because resources are being used - the object is left as a courtesy to the user to inspect. Signed-off-by: vsoch --- pkg/fluence/fluence.go | 44 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index 439e0d1..82384ab 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -441,19 +441,57 @@ func (f *Fluence) patchPodAnnotations(ctx context.Context, ns, name string, ann return err } -// registerCancelHandlers watches PodGroup and Pod deletions and frees the -// corresponding Fluxion allocation. The framework has no deletion extension -// point, so this is informer-driven. +// registerCancelHandlers watches PodGroup and Pod deletions AND pod completion, +// and frees the corresponding Fluxion allocation. The framework has no deletion +// or completion extension point, so this is informer-driven. A finished pod is +// not deleted (it lingers in Succeeded/Failed), so completion must be watched +// separately from deletion or the allocation leaks until the pod is removed. func (f *Fluence) registerCancelHandlers() { sif := f.handle.SharedInformerFactory() _, _ = sif.Scheduling().V1alpha2().PodGroups().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: f.onPodGroupDeleted, }) _, _ = sif.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: f.onPodUpdated, DeleteFunc: f.onPodDeleted, }) } +// podTerminal reports whether a pod has reached a terminal phase (run to +// completion or failed) — at which point its held allocation should be freed. +func podTerminal(p *corev1.Pod) bool { + return p.Status.Phase == corev1.PodSucceeded || p.Status.Phase == corev1.PodFailed +} + +// onPodUpdated frees an ungrouped pod's allocation when the pod finishes +// (transitions into a terminal phase). A completed pod is not deleted, so this +// is the path that releases resources when a job ends normally; the DeleteFunc +// path remains as a backstop for pods removed without completing. Cancel is +// idempotent, so a completion-cancel followed by a later delete-cancel is safe. +// +// Grouped pods are ignored here for the same reason as onPodDeleted: the gang's +// allocation is owned by the PodGroup, and freeing it when one pod finishes would +// release the whole gang's resources while its siblings still run. +func (f *Fluence) onPodUpdated(oldObj, newObj interface{}) { + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + return + } + newPod, ok := newObj.(*corev1.Pod) + if !ok { + return + } + // Only act on the transition INTO a terminal phase (UpdateFunc fires on every + // pod update; this avoids re-cancelling on each subsequent update). + if podTerminal(oldPod) || !podTerminal(newPod) { + return + } + if placement.PodGroupName(newPod) != "" { + return + } + f.cancelGroup(newPod.Namespace+"/"+newPod.Name, newPod.Annotations) +} + // onPodGroupDeleted frees the gang's allocation when its PodGroup is deleted. func (f *Fluence) onPodGroupDeleted(obj interface{}) { pg, ok := obj.(*schedv1a2.PodGroup)