Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 184 additions & 3 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package apiserver
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"

"k8s.io/client-go/discovery"
Expand Down Expand Up @@ -82,13 +84,20 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
return fmt.Errorf("failed to collect resource matches: %v", err)
}

// we have two modes of operation for the ApiServerSource adapter:
// 1. Resilient Mode (Default): The adapter uses `reflector.Run()` to continuously retry establishing watches
// we have three modes of operation for the ApiServerSource adapter:
// 1. No-Cache Mode: The adapter skips the initial LIST call and only watches for new events.
// This reduces API server load significantly when watching across many namespaces.
// Pre-existing objects will not emit events on startup.
// 2. Resilient Mode (Default): The adapter uses `reflector.Run()` to continuously retry establishing watches
// on resources, making it resilient to transient errors or delayed permission grants.
// 2. Fail-Fast Mode: In this mode, the adapter uses `reflector.ListAndWatchWithContext()`. If any resource watch
// 3. Fail-Fast Mode: In this mode, the adapter uses `reflector.ListAndWatchWithContext()`. If any resource watch
// fails to be established on the first attempt, the entire adapter will fail immediately. This provides faster
// feedback and a clearer failure state in environments where permissions are expected to be correct at startup.

if a.config.DisableCache {
a.logger.Info("Starting in no-cache mode. Initial LIST is skipped; only new events will be emitted.")
return a.startWatchOnly(ctx, stopCh, delegate, matches)
}
if a.config.FailFast {
a.logger.Info("Starting in fail-fast mode. Any single watch failure will stop the adapter.")
return a.startFailFast(ctx, stopCh, delegate, matches)
Expand All @@ -97,6 +106,178 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
return a.startResilient(ctx, stopCh, delegate, matches)
}

// startWatchOnly starts watches for all matched resources. It performs a lightweight
// LIST (limit=1) per resource interface to obtain the current resourceVersion, then
// issues a Watch from that point. This means pre-existing objects do not produce
// events on startup, and startup API load is O(resources*namespaces) lightweight
// LISTs rather than full object dumps.
//
// When both DisableCache and FailFast are set, DisableCache takes precedence.
func (a *apiServerAdapter) startWatchOnly(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error {
if a.config.FailFast {
a.logger.Warn("disableCache=true takes precedence over failFast=true; running in no-cache mode without fail-fast behavior")
}

watchCtx, cancelWatchers := context.WithCancel(ctx)
defer cancelWatchers()

var wg sync.WaitGroup
for _, match := range matches {
if match.apiResource == nil {
a.logger.Errorf("could not retrieve information about resource %s: it doesn't exist. skipping...", match.resourceWatch.GVR.String())
continue
}
for _, res := range match.resourceInterfaces {
wg.Add(1)
go func() {
defer wg.Done()
a.watchResourceLoop(watchCtx, res, match.resourceWatch.LabelSelector, delegate)
}()
}
}

select {
case <-stopCh:
case <-ctx.Done():
}
cancelWatchers()
wg.Wait()
return nil
}

func noCacheBackoff() wait.Backoff {
return wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Jitter: 0.1,
Steps: math.MaxInt32,
Cap: 60 * time.Second,
}
}

// watchResourceLoop runs a continuous List+Watch loop for a single resource interface.
// A lightweight LIST (limit=1) is issued before each Watch to obtain the current
// resourceVersion so that Watch does not replay synthetic ADDED events for
// pre-existing objects (the behaviour when resourceVersion="" is passed to Watch).
// On watch.Error with StatusReasonGone (HTTP 410 — resourceVersion expired from the
// watch cache), the loop re-lists to get a fresh resourceVersion.
// All failures back off exponentially with jitter before retrying.
func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.ResourceInterface, labelSelector string, delegate cache.Store) {
backoff := noCacheBackoff()
for ctx.Err() == nil {
list, err := ri.List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
Limit: 1,
})
if err != nil {
if ctx.Err() != nil {
return
}
a.logger.Errorw("failed to list for resourceVersion, retrying", zap.Error(err))
t := time.NewTimer(backoff.Step())
select {
case <-ctx.Done():
t.Stop()
return
case <-t.C:
}
continue
}
rv := list.GetResourceVersion()
if rv == "" {
a.logger.Warn("LIST returned empty resourceVersion, will retry after backoff")
t := time.NewTimer(backoff.Step())
select {
case <-ctx.Done():
t.Stop()
return
case <-t.C:
}
continue
}

timeout := int64(5 * 60)
w, err := ri.Watch(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
ResourceVersion: rv,
TimeoutSeconds: &timeout,
})
if err != nil {
if ctx.Err() != nil {
return
}
a.logger.Errorw("watch error, retrying", zap.Error(err))
t := time.NewTimer(backoff.Step())
select {
case <-ctx.Done():
t.Stop()
return
case <-t.C:
}
continue
}

watchStart := time.Now()
a.drainWatchEvents(ctx, w, delegate)
if time.Since(watchStart) > 30*time.Second {
// Watch was long-lived; treat connection as stable and reset backoff.
backoff = noCacheBackoff()
} else {
t := time.NewTimer(backoff.Step())
select {
case <-ctx.Done():
t.Stop()
return
case <-t.C:
}
}
}
}

// drainWatchEvents reads from a watch until the context is done, the watch
// channel closes, or a watch.Error event is received. On StatusReasonGone
// (HTTP 410), it returns immediately so watchResourceLoop can re-list.
func (a *apiServerAdapter) drainWatchEvents(ctx context.Context, w watch.Interface, delegate cache.Store) {
defer w.Stop()
for {
select {
case <-ctx.Done():
return
case event, ok := <-w.ResultChan():
if !ok {
return
}
switch event.Type {
case watch.Added:
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
if err := delegate.Add(obj); err != nil {
a.logger.Errorw("failed to add object to delegate store", zap.Error(err))
}
}
case watch.Modified:
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
if err := delegate.Update(obj); err != nil {
a.logger.Errorw("failed to update object in delegate store", zap.Error(err))
}
}
case watch.Deleted:
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
if err := delegate.Delete(obj); err != nil {
a.logger.Errorw("failed to delete object from delegate store", zap.Error(err))
}
}
case watch.Error:
if status, ok := event.Object.(*metav1.Status); ok && status.Reason == metav1.StatusReasonGone {
a.logger.Info("watch resourceVersion expired (410 Gone), will re-list")
} else {
a.logger.Errorw("received watch error event", zap.Any("object", event.Object))
}
return
}
}
}
}

func (a *apiServerAdapter) startResilient(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error {
// Local stop channel.
stop := make(chan struct{})
Expand Down
Loading
Loading