Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions pkg/fluence/fluence.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,19 +441,57 @@ func (f *Fluence) patchPodAnnotations(ctx context.Context, ns, name string, ann
return err
}

// registerCancelHandlers watches PodGroup and Pod deletions and frees the
// corresponding Fluxion allocation. The framework has no deletion extension
// point, so this is informer-driven.
// registerCancelHandlers watches PodGroup and Pod deletions AND pod completion,
// and frees the corresponding Fluxion allocation. The framework has no deletion
// or completion extension point, so this is informer-driven. A finished pod is
// not deleted (it lingers in Succeeded/Failed), so completion must be watched
// separately from deletion or the allocation leaks until the pod is removed.
func (f *Fluence) registerCancelHandlers() {
sif := f.handle.SharedInformerFactory()
_, _ = sif.Scheduling().V1alpha2().PodGroups().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: f.onPodGroupDeleted,
})
_, _ = sif.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: f.onPodUpdated,
DeleteFunc: f.onPodDeleted,
})
}

// podTerminal reports whether a pod has reached a terminal phase (run to
// completion or failed) — at which point its held allocation should be freed.
func podTerminal(p *corev1.Pod) bool {
return p.Status.Phase == corev1.PodSucceeded || p.Status.Phase == corev1.PodFailed
}

// onPodUpdated frees an ungrouped pod's allocation when the pod finishes
// (transitions into a terminal phase). A completed pod is not deleted, so this
// is the path that releases resources when a job ends normally; the DeleteFunc
// path remains as a backstop for pods removed without completing. Cancel is
// idempotent, so a completion-cancel followed by a later delete-cancel is safe.
//
// Grouped pods are ignored here for the same reason as onPodDeleted: the gang's
// allocation is owned by the PodGroup, and freeing it when one pod finishes would
// release the whole gang's resources while its siblings still run.
func (f *Fluence) onPodUpdated(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
return
}
newPod, ok := newObj.(*corev1.Pod)
if !ok {
return
}
// Only act on the transition INTO a terminal phase (UpdateFunc fires on every
// pod update; this avoids re-cancelling on each subsequent update).
if podTerminal(oldPod) || !podTerminal(newPod) {
return
}
if placement.PodGroupName(newPod) != "" {
return
}
f.cancelGroup(newPod.Namespace+"/"+newPod.Name, newPod.Annotations)
}

// onPodGroupDeleted frees the gang's allocation when its PodGroup is deleted.
func (f *Fluence) onPodGroupDeleted(obj interface{}) {
pg, ok := obj.(*schedv1a2.PodGroup)
Expand Down
Loading