diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 4606912988f..afb068b40d9 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -19,6 +19,7 @@ package apiserver import ( "context" "fmt" + "math" "sync" "time" @@ -28,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" @@ -82,13 +84,20 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er return fmt.Errorf("failed to collect resource matches: %v", err) } - // we have two modes of operation for the ApiServerSource adapter: - // 1. Resilient Mode (Default): The adapter uses `reflector.Run()` to continuously retry establishing watches + // we have three modes of operation for the ApiServerSource adapter: + // 1. No-Cache Mode: The adapter skips the initial LIST call and only watches for new events. + // This reduces API server load significantly when watching across many namespaces. + // Pre-existing objects will not emit events on startup. + // 2. Resilient Mode (Default): The adapter uses `reflector.Run()` to continuously retry establishing watches // on resources, making it resilient to transient errors or delayed permission grants. - // 2. Fail-Fast Mode: In this mode, the adapter uses `reflector.ListAndWatchWithContext()`. If any resource watch + // 3. Fail-Fast Mode: In this mode, the adapter uses `reflector.ListAndWatchWithContext()`. If any resource watch // fails to be established on the first attempt, the entire adapter will fail immediately. This provides faster // feedback and a clearer failure state in environments where permissions are expected to be correct at startup. + if a.config.DisableCache { + a.logger.Info("Starting in no-cache mode. Initial LIST is skipped; only new events will be emitted.") + return a.startWatchOnly(ctx, stopCh, delegate, matches) + } if a.config.FailFast { a.logger.Info("Starting in fail-fast mode. Any single watch failure will stop the adapter.") return a.startFailFast(ctx, stopCh, delegate, matches) @@ -97,6 +106,178 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er return a.startResilient(ctx, stopCh, delegate, matches) } +// startWatchOnly starts watches for all matched resources. It performs a lightweight +// LIST (limit=1) per resource interface to obtain the current resourceVersion, then +// issues a Watch from that point. This means pre-existing objects do not produce +// events on startup, and startup API load is O(resources*namespaces) lightweight +// LISTs rather than full object dumps. +// +// When both DisableCache and FailFast are set, DisableCache takes precedence. +func (a *apiServerAdapter) startWatchOnly(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error { + if a.config.FailFast { + a.logger.Warn("disableCache=true takes precedence over failFast=true; running in no-cache mode without fail-fast behavior") + } + + watchCtx, cancelWatchers := context.WithCancel(ctx) + defer cancelWatchers() + + var wg sync.WaitGroup + for _, match := range matches { + if match.apiResource == nil { + a.logger.Errorf("could not retrieve information about resource %s: it doesn't exist. skipping...", match.resourceWatch.GVR.String()) + continue + } + for _, res := range match.resourceInterfaces { + wg.Add(1) + go func() { + defer wg.Done() + a.watchResourceLoop(watchCtx, res, match.resourceWatch.LabelSelector, delegate) + }() + } + } + + select { + case <-stopCh: + case <-ctx.Done(): + } + cancelWatchers() + wg.Wait() + return nil +} + +func noCacheBackoff() wait.Backoff { + return wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Jitter: 0.1, + Steps: math.MaxInt32, + Cap: 60 * time.Second, + } +} + +// watchResourceLoop runs a continuous List+Watch loop for a single resource interface. +// A lightweight LIST (limit=1) is issued before each Watch to obtain the current +// resourceVersion so that Watch does not replay synthetic ADDED events for +// pre-existing objects (the behaviour when resourceVersion="" is passed to Watch). +// On watch.Error with StatusReasonGone (HTTP 410 — resourceVersion expired from the +// watch cache), the loop re-lists to get a fresh resourceVersion. +// All failures back off exponentially with jitter before retrying. +func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.ResourceInterface, labelSelector string, delegate cache.Store) { + backoff := noCacheBackoff() + for ctx.Err() == nil { + list, err := ri.List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + Limit: 1, + }) + if err != nil { + if ctx.Err() != nil { + return + } + a.logger.Errorw("failed to list for resourceVersion, retrying", zap.Error(err)) + t := time.NewTimer(backoff.Step()) + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + } + continue + } + rv := list.GetResourceVersion() + if rv == "" { + a.logger.Warn("LIST returned empty resourceVersion, will retry after backoff") + t := time.NewTimer(backoff.Step()) + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + } + continue + } + + timeout := int64(5 * 60) + w, err := ri.Watch(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + ResourceVersion: rv, + TimeoutSeconds: &timeout, + }) + if err != nil { + if ctx.Err() != nil { + return + } + a.logger.Errorw("watch error, retrying", zap.Error(err)) + t := time.NewTimer(backoff.Step()) + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + } + continue + } + + watchStart := time.Now() + a.drainWatchEvents(ctx, w, delegate) + if time.Since(watchStart) > 30*time.Second { + // Watch was long-lived; treat connection as stable and reset backoff. + backoff = noCacheBackoff() + } else { + t := time.NewTimer(backoff.Step()) + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + } + } + } +} + +// drainWatchEvents reads from a watch until the context is done, the watch +// channel closes, or a watch.Error event is received. On StatusReasonGone +// (HTTP 410), it returns immediately so watchResourceLoop can re-list. +func (a *apiServerAdapter) drainWatchEvents(ctx context.Context, w watch.Interface, delegate cache.Store) { + defer w.Stop() + for { + select { + case <-ctx.Done(): + return + case event, ok := <-w.ResultChan(): + if !ok { + return + } + switch event.Type { + case watch.Added: + if obj, ok := event.Object.(*unstructured.Unstructured); ok { + if err := delegate.Add(obj); err != nil { + a.logger.Errorw("failed to add object to delegate store", zap.Error(err)) + } + } + case watch.Modified: + if obj, ok := event.Object.(*unstructured.Unstructured); ok { + if err := delegate.Update(obj); err != nil { + a.logger.Errorw("failed to update object in delegate store", zap.Error(err)) + } + } + case watch.Deleted: + if obj, ok := event.Object.(*unstructured.Unstructured); ok { + if err := delegate.Delete(obj); err != nil { + a.logger.Errorw("failed to delete object from delegate store", zap.Error(err)) + } + } + case watch.Error: + if status, ok := event.Object.(*metav1.Status); ok && status.Reason == metav1.StatusReasonGone { + a.logger.Info("watch resourceVersion expired (410 Gone), will re-list") + } else { + a.logger.Errorw("received watch error event", zap.Any("object", event.Object)) + } + return + } + } + } +} + func (a *apiServerAdapter) startResilient(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error { // Local stop channel. stop := make(chan struct{}) diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index dc408a8aae6..a44f524c0c0 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -18,6 +18,7 @@ package apiserver import ( "context" + "sync" "testing" "time" @@ -28,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" discoveryfake "k8s.io/client-go/discovery/fake" "k8s.io/client-go/dynamic" @@ -380,3 +382,240 @@ func TestAdapter_FailFast(t *testing.T) { }) } } + +func TestAdapter_DisableCache(t *testing.T) { + ce := adaptertest.NewTestClient() + + config := Config{ + Namespaces: []string{"default"}, + Resources: []ResourceWatch{{ + GVR: schema.GroupVersionResource{ + Version: "v1", + Resource: "pods", + }, + }}, + EventMode: "Resource", + DisableCache: true, + } + + ctx, _ := pkgtesting.SetupFakeContext(t) + + a := &apiServerAdapter{ + ce: ce, + logger: logging.FromContext(ctx), + config: config, + + discover: makeDiscoveryClient(), + k8s: makeDynamicClient(simplePod("foo", "default")), + source: "unit-test", + name: "unittest", + } + + ctx, cancel := context.WithCancel(ctx) + done := make(chan struct{}) + go func() { + defer close(done) + err := a.Start(ctx) + if err != nil { + t.Logf("Start returned error: %v", err) + } + }() + + cancel() + <-done +} + +// TestAdapter_DisableCacheLightweightList verifies that DisableCache=true uses a +// lightweight LIST (Limit=1) to obtain the current resourceVersion before watching, +// rather than skipping LIST entirely (which would cause rv="" and replay all existing objects). +func TestAdapter_DisableCacheLightweightList(t *testing.T) { + gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + tracker := &listOptionsTracker{} + + dynClient := makeDynamicClient(simplePod("existing-pod", "default")) + trackedClient := &listOptsTrackingClient{Interface: dynClient, gvr: gvr, tracker: tracker} + + config := Config{ + Namespaces: []string{"default"}, + Resources: []ResourceWatch{{GVR: gvr}}, + EventMode: "Resource", + DisableCache: true, + } + + ctx, _ := pkgtesting.SetupFakeContext(t) + ctx, cancel := context.WithCancel(ctx) + + a := &apiServerAdapter{ + ce: adaptertest.NewTestClient(), + logger: logging.FromContext(ctx), + config: config, + discover: makeDiscoveryClient(), + k8s: trackedClient, + source: "unit-test", + name: "unittest", + } + + done := make(chan struct{}) + go func() { + defer close(done) + _ = a.Start(ctx) + }() + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && len(tracker.get()) == 0 { + time.Sleep(10 * time.Millisecond) + } + cancel() + <-done + + opts := tracker.get() + if len(opts) == 0 { + t.Fatal("expected at least one LIST call to obtain resourceVersion, got none") + } + for _, o := range opts { + if o.Limit != 1 { + t.Errorf("expected LIST with Limit=1 (lightweight), got Limit=%d", o.Limit) + } + } +} + +// TestAdapter_DisableCacheEventDelivery verifies that watch events are converted +// to CloudEvents and delivered to the sink when DisableCache=true. +func TestAdapter_DisableCacheEventDelivery(t *testing.T) { + testCE := adaptertest.NewTestClient() + gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + + fakeWatcher := watch.NewRaceFreeFake() + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + dynClient := dynamicfake.NewSimpleDynamicClient(scheme, simplePod("existing-pod", "default")) + dynClient.PrependReactor("list", "pods", func(_ kubetesting.Action) (bool, runtime.Object, error) { + list := &unstructured.UnstructuredList{} + list.SetResourceVersion("100") + return true, list, nil + }) + dynClient.PrependWatchReactor("pods", func(_ kubetesting.Action) (bool, watch.Interface, error) { + return true, fakeWatcher, nil + }) + + config := Config{ + Namespaces: []string{"default"}, + Resources: []ResourceWatch{{GVR: gvr}}, + EventMode: "Resource", + DisableCache: true, + } + + ctx, _ := pkgtesting.SetupFakeContext(t) + ctx, cancel := context.WithCancel(ctx) + + a := &apiServerAdapter{ + ce: testCE, + logger: logging.FromContext(ctx), + config: config, + discover: makeDiscoveryClient(), + k8s: dynClient, + source: "unit-test", + name: "unittest", + namespace: "default", + } + + // Pre-buffer the event before starting the adapter. The fake watcher has a + // 100-item channel; the adapter will drain it once the Watch() call is made. + newPod := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "new-pod", + "namespace": "default", + "resourceVersion": "12345", + }, + }, + } + fakeWatcher.Add(newPod) + + done := make(chan struct{}) + go func() { + defer close(done) + _ = a.Start(ctx) + }() + + // Poll until at least one CloudEvent is sent (or timeout). + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && len(testCE.Sent()) == 0 { + time.Sleep(50 * time.Millisecond) + } + + cancel() + <-done + + if len(testCE.Sent()) == 0 { + t.Error("expected at least one CloudEvent to be sent, but none were received") + } +} + +// listOptionsTracker records ListOptions from List calls for later assertion. +type listOptionsTracker struct { + mu sync.Mutex + options []metav1.ListOptions +} + +func (t *listOptionsTracker) record(opts metav1.ListOptions) { + t.mu.Lock() + defer t.mu.Unlock() + t.options = append(t.options, opts) +} + +func (t *listOptionsTracker) get() []metav1.ListOptions { + t.mu.Lock() + defer t.mu.Unlock() + result := make([]metav1.ListOptions, len(t.options)) + copy(result, t.options) + return result +} + +// listOptsTrackingClient wraps dynamic.Interface to record List calls for a specific GVR. +type listOptsTrackingClient struct { + dynamic.Interface + gvr schema.GroupVersionResource + tracker *listOptionsTracker +} + +func (c *listOptsTrackingClient) Resource(gvr schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { + return &listOptsTrackingResourceClient{ + NamespaceableResourceInterface: c.Interface.Resource(gvr), + gvr: gvr, + targetGVR: c.gvr, + tracker: c.tracker, + } +} + +type listOptsTrackingResourceClient struct { + dynamic.NamespaceableResourceInterface + gvr schema.GroupVersionResource + targetGVR schema.GroupVersionResource + tracker *listOptionsTracker +} + +func (r *listOptsTrackingResourceClient) Namespace(ns string) dynamic.ResourceInterface { + return &listOptsTrackingNamespacedClient{ + ResourceInterface: r.NamespaceableResourceInterface.Namespace(ns), + gvr: r.gvr, + targetGVR: r.targetGVR, + tracker: r.tracker, + } +} + +type listOptsTrackingNamespacedClient struct { + dynamic.ResourceInterface + gvr schema.GroupVersionResource + targetGVR schema.GroupVersionResource + tracker *listOptionsTracker +} + +func (r *listOptsTrackingNamespacedClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { + if r.gvr == r.targetGVR { + r.tracker.record(opts) + } + return r.ResourceInterface.List(ctx, opts) +} diff --git a/pkg/adapter/apiserver/config.go b/pkg/adapter/apiserver/config.go index 58831190a53..3d423d246e2 100644 --- a/pkg/adapter/apiserver/config.go +++ b/pkg/adapter/apiserver/config.go @@ -74,4 +74,9 @@ type Config struct { // (via the features.knative.dev/apiserversource-skip-permissions-check annotation), and the ApiServerSource // adapter should not keep trying to establish watches on resources that it perhaps does not have permissions for. FailFast bool `json:"failFast,omitempty"` + + // DisableCache if true, the adapter skips the initial LIST call and only watches for new events. + // This reduces API server load when watching resources across many namespaces. + // Pre-existing objects will not emit events on adapter startup. + DisableCache bool `json:"disableCache,omitempty"` } diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go index 7510771f040..84c10e5fa0e 100644 --- a/pkg/apis/sources/v1/apiserver_types.go +++ b/pkg/apis/sources/v1/apiserver_types.go @@ -25,6 +25,14 @@ import ( "knative.dev/pkg/kmeta" ) +const ( + // DisableCacheAnnotation controls whether the adapter skips the initial LIST + // and only watches for new events, reducing API server load in large clusters. + DisableCacheAnnotation = "features.knative.dev/apiserversource-disable-cache" + // SkipPermissionsAnnotation controls whether the adapter skips permission checks on startup. + SkipPermissionsAnnotation = "features.knative.dev/apiserversource-skip-permissions-check" +) + // +genclient // +genreconciler // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/sources/v1/apiserver_validation_test.go b/pkg/apis/sources/v1/apiserver_validation_test.go index 883a04a6e9a..ae44325e766 100644 --- a/pkg/apis/sources/v1/apiserver_validation_test.go +++ b/pkg/apis/sources/v1/apiserver_validation_test.go @@ -269,6 +269,49 @@ func TestAPIServerValidationCallsSpecValidation(t *testing.T) { assert.EqualError(t, err, "missing field(s): spec.resources", "Spec is not validated!") } +func TestAPIServerDisableCacheAnnotationValidation(t *testing.T) { + validSrc := func() *ApiServerSource { + return &ApiServerSource{ + Spec: ApiServerSourceSpec{ + EventMode: "Resource", + Resources: []APIVersionKindSelector{{APIVersion: "v1", Kind: "Pod"}}, + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{APIVersion: "v1", Kind: "Service", Name: "svc"}, + }, + }, + }, + } + } + + t.Run("both annotations set is valid", func(t *testing.T) { + src := validSrc() + src.Annotations = map[string]string{ + DisableCacheAnnotation: "true", + SkipPermissionsAnnotation: "true", + } + if err := src.Validate(context.TODO()); err != nil { + t.Errorf("unexpected error for combined annotations: %v", err) + } + }) + + t.Run("only disable-cache annotation is valid", func(t *testing.T) { + src := validSrc() + src.Annotations = map[string]string{DisableCacheAnnotation: "true"} + if err := src.Validate(context.TODO()); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("only skip-permissions annotation is valid", func(t *testing.T) { + src := validSrc() + src.Annotations = map[string]string{SkipPermissionsAnnotation: "true"} + if err := src.Validate(context.TODO()); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) +} + func TestAPIServerFiltersValidation(t *testing.T) { tests := []struct { name string diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index 2d87187c4ea..f4ce648b999 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -60,8 +60,7 @@ const ( apiserversourceDeploymentCreated = "ApiServerSourceDeploymentCreated" apiserversourceDeploymentUpdated = "ApiServerSourceDeploymentUpdated" - component = "apiserversource" - skipPermissionsAnnotation = "features.knative.dev/apiserversource-skip-permissions-check" + component = "apiserversource" ) func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event { @@ -150,7 +149,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour // We don't check if it really exists because in case it does not exist, the value is an empty string // which also serves our purposes as by default we will check permissions annotations := source.GetAnnotations() - skipPermissions := annotations[skipPermissionsAnnotation] + skipPermissions := annotations[v1.SkipPermissionsAnnotation] if skipPermissions == "true" { // If skip permissions, mark enough permissions directly source.Status.MarkSufficientPermissions() @@ -236,7 +235,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer // } annotations := src.GetAnnotations() - skipPermissions := annotations[skipPermissionsAnnotation] + skipPermissions := annotations[v1.SkipPermissionsAnnotation] featureFlags := feature.FromContext(ctx) @@ -252,6 +251,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer AllNamespaces: allNamespaces, NodeSelector: featureFlags.NodeSelector(), FailFast: skipPermissions == "true", + DisableCache: src.Annotations[v1.DisableCacheAnnotation] == "true", } expected, err := resources.MakeReceiveAdapter(&adapterArgs) diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index b9af0d94d4a..baf012ba41d 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -51,6 +51,7 @@ type ReceiveAdapterArgs struct { AllNamespaces bool NodeSelector map[string]string FailFast bool + DisableCache bool } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for @@ -140,6 +141,7 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) { AllNamespaces: args.AllNamespaces, Filters: args.Source.Spec.Filters, FailFast: args.FailFast, + DisableCache: args.DisableCache, } for _, r := range args.Source.Spec.Resources { diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index 70a7f0058de..1923995957a 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -332,3 +332,76 @@ Test certificate content here }) } } + +func TestMakeReceiveAdapterWithDisableCache(t *testing.T) { + name := "source-name" + + tests := []struct { + name string + disableCache bool + expectedConfig string + }{ + { + name: "DisableCache true", + disableCache: true, + expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource","disableCache":true}`, + }, + { + name: "DisableCache false", + disableCache: false, + expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource"}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + src := &v1.ApiServerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "source-namespace", + UID: "1234", + }, + Spec: v1.ApiServerSourceSpec{ + Resources: []v1.APIVersionKindSelector{{ + APIVersion: "", + Kind: "Namespace", + }}, + EventMode: "Resource", + ServiceAccountName: "source-svc-acct", + }, + } + + got, err := MakeReceiveAdapter(&ReceiveAdapterArgs{ + Image: "test-image", + Source: src, + Labels: map[string]string{"test-key": "test-value"}, + SinkURI: "sink-uri", + Configs: &source.EmptyVarsGenerator{}, + Namespaces: []string{"source-namespace"}, + DisableCache: tt.disableCache, + }) + + if err != nil { + t.Fatalf("MakeReceiveAdapter() error = %v", err) + } + + var sourceConfigValue string + for _, container := range got.Spec.Template.Spec.Containers { + for _, env := range container.Env { + if env.Name == "K_SOURCE_CONFIG" { + sourceConfigValue = env.Value + break + } + } + } + + if sourceConfigValue == "" { + t.Fatal("K_SOURCE_CONFIG environment variable not found") + } + + if sourceConfigValue != tt.expectedConfig { + t.Errorf("K_SOURCE_CONFIG value mismatch:\nexpected: %s\ngot: %s", tt.expectedConfig, sourceConfigValue) + } + }) + } +}