From 4cccf8c8a645683373d88a6238aded1714ab24bb Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Thu, 23 Apr 2026 20:34:27 +0000 Subject: [PATCH 1/6] Add mark-and-sweep GC for shared OCI cache The shared OCI cache at data_dir/system/oci-cache grew without bound because neither the pull path nor the registry push path had a cleanup hook. The image retention controller only touches data_dir/images, so manifests and layer blobs that were no longer referenced lived forever. This change adds a new lib/ocicachegc package that walks index.json and every referenced manifest to build the live set of blob digests, then deletes any file under blobs/sha256/ that is not in that set. Blobs whose mtime is within the configured min_blob_age are kept; this grace period is what lets the sweep run safely alongside concurrent pulls (which write layer blobs before updating index.json) and registry pushes. Disabled by default. Enable via: images: oci_cache_gc: enabled: true interval: 1h min_blob_age: 1h --- cmd/api/config/config.go | 19 ++ cmd/api/main.go | 47 +++++ config.example.darwin.yaml | 5 + config.example.yaml | 5 + lib/imageretention/README.md | 2 +- lib/ocicachegc/README.md | 56 ++++++ lib/ocicachegc/gc.go | 326 ++++++++++++++++++++++++++++++++++ lib/ocicachegc/gc_test.go | 335 +++++++++++++++++++++++++++++++++++ lib/ocicachegc/metrics.go | 76 ++++++++ 9 files changed, 870 insertions(+), 1 deletion(-) create mode 100644 lib/ocicachegc/README.md create mode 100644 lib/ocicachegc/gc.go create mode 100644 lib/ocicachegc/gc_test.go create mode 100644 lib/ocicachegc/metrics.go diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index 84fc799a..8173f4f4 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -131,9 +131,17 @@ type ImagesAutoDeleteConfig struct { Allowed []string `koanf:"allowed"` } +// OCICacheGCConfig holds settings for the OCI blob cache garbage collector. +type OCICacheGCConfig struct { + Enabled bool `koanf:"enabled"` + Interval string `koanf:"interval"` + MinBlobAge string `koanf:"min_blob_age"` +} + // ImagesConfig holds image-management settings. type ImagesConfig struct { AutoDelete ImagesAutoDeleteConfig `koanf:"auto_delete"` + OCICacheGC OCICacheGCConfig `koanf:"oci_cache_gc"` } // BuildConfig holds source-to-image build system settings. @@ -346,6 +354,11 @@ func defaultConfig() *Config { UnusedFor: "720h", Allowed: []string{}, }, + OCICacheGC: OCICacheGCConfig{ + Enabled: false, + Interval: "1h", + MinBlobAge: "1h", + }, }, Build: BuildConfig{ @@ -563,6 +576,12 @@ func (c *Config) Validate() error { for i, pattern := range c.Images.AutoDelete.Allowed { c.Images.AutoDelete.Allowed[i] = strings.TrimSpace(pattern) } + if err := validateDuration("images.oci_cache_gc.interval", c.Images.OCICacheGC.Interval); err != nil { + return err + } + if err := validateDuration("images.oci_cache_gc.min_blob_age", c.Images.OCICacheGC.MinBlobAge); err != nil { + return err + } algorithm := strings.ToLower(c.Snapshot.CompressionDefault.Algorithm) c.Snapshot.CompressionDefault.Algorithm = algorithm if c.Snapshot.CompressionDefault.Enabled { diff --git a/cmd/api/main.go b/cmd/api/main.go index 61ee9b0b..99ce7ebc 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -32,6 +32,7 @@ import ( loglib "github.com/kernel/hypeman/lib/logger" mw "github.com/kernel/hypeman/lib/middleware" "github.com/kernel/hypeman/lib/oapi" + "github.com/kernel/hypeman/lib/ocicachegc" "github.com/kernel/hypeman/lib/otel" "github.com/kernel/hypeman/lib/paths" "github.com/kernel/hypeman/lib/registry" @@ -98,6 +99,37 @@ func startImageRetentionController(grp *errgroup.Group, ctx context.Context, con return true } +type ociCacheGCRunner interface { + Run(ctx context.Context) error +} + +func configureOCICacheGC(cfg *config.Config, logger *slog.Logger, meter metric.Meter) (ociCacheGCRunner, error) { + if cfg == nil || !cfg.Images.OCICacheGC.Enabled { + return nil, nil + } + + interval, err := time.ParseDuration(cfg.Images.OCICacheGC.Interval) + if err != nil { + return nil, fmt.Errorf("invalid images.oci_cache_gc.interval %q: %w", cfg.Images.OCICacheGC.Interval, err) + } + minBlobAge, err := time.ParseDuration(cfg.Images.OCICacheGC.MinBlobAge) + if err != nil { + return nil, fmt.Errorf("invalid images.oci_cache_gc.min_blob_age %q: %w", cfg.Images.OCICacheGC.MinBlobAge, err) + } + + return ocicachegc.NewCollector(paths.New(cfg.DataDir), interval, minBlobAge, logger, meter) +} + +func startOCICacheGC(grp *errgroup.Group, ctx context.Context, runner ociCacheGCRunner) bool { + if grp == nil || runner == nil { + return false + } + grp.Go(func() error { + return runner.Run(ctx) + }) + return true +} + func run() error { // Load config early for OTel initialization // Config path can be specified via CONFIG_PATH env var or defaults to platform-specific locations @@ -491,6 +523,21 @@ func run() error { logger.Info("image auto-delete enabled", "unused_for", app.Config.Images.AutoDelete.UnusedFor) } + ociGC, err := configureOCICacheGC( + app.Config, + logger, + otelProvider.MeterFor(loglib.SubsystemImages), + ) + if err != nil { + return err + } + if startOCICacheGC(grp, gctx, ociGC) { + logger.Info("oci cache gc enabled", + "interval", app.Config.Images.OCICacheGC.Interval, + "min_blob_age", app.Config.Images.OCICacheGC.MinBlobAge, + ) + } + // Start build manager background services (vsock handler for builder VMs) if err := app.BuildManager.Start(gctx); err != nil { logger.Error("failed to start build manager", "error", err) diff --git a/config.example.darwin.yaml b/config.example.darwin.yaml index 5b16e672..29f2f750 100644 --- a/config.example.darwin.yaml +++ b/config.example.darwin.yaml @@ -78,6 +78,11 @@ logging: # - docker.io/library/* # match normalized repository names # - ghcr.io/kernel/* # use ["*"] to allow deletion for every repository # # only affects data_dir/images, not the shared OCI cache +# oci_cache_gc: +# enabled: false # mark-and-sweep GC for data_dir/system/oci-cache +# interval: 1h # how often to run a sweep +# min_blob_age: 1h # grace period; blobs written more recently are kept +# # to avoid racing with concurrent pulls # ============================================================================= # Caddy / Ingress Configuration diff --git a/config.example.yaml b/config.example.yaml index 34e59d14..fd88ac17 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -71,6 +71,11 @@ data_dir: /var/lib/hypeman # - docker.io/library/* # match normalized repository names # - ghcr.io/kernel/* # use ["*"] to allow deletion for every repository # # only affects data_dir/images, not the shared OCI cache +# oci_cache_gc: +# enabled: false # mark-and-sweep GC for data_dir/system/oci-cache +# interval: 1h # how often to run a sweep +# min_blob_age: 1h # grace period; blobs written more recently are kept +# # to avoid racing with concurrent pulls # ============================================================================= # Caddy / Ingress Configuration diff --git a/lib/imageretention/README.md b/lib/imageretention/README.md index c70db06f..9ced4fc7 100644 --- a/lib/imageretention/README.md +++ b/lib/imageretention/README.md @@ -16,7 +16,7 @@ When auto-delete is enabled: - The server runs a retention sweep on startup and then every minute. - Only converted cached images under `data_dir/images` are eligible for deletion. -- Shared OCI cache data under `data_dir/system/oci-cache` is not modified. +- Shared OCI cache data under `data_dir/system/oci-cache` is not modified by this feature; see `lib/ocicachegc` for a separate mark-and-sweep collector that reclaims orphaned blobs from that directory. - An image repository must also match at least one `allowed` pattern before any retention state is recorded or deletion is attempted. An image is considered in use if any persisted instance metadata or snapshot record still references it. As long as at least one such reference exists, the image is protected from deletion. diff --git a/lib/ocicachegc/README.md b/lib/ocicachegc/README.md new file mode 100644 index 00000000..b209416b --- /dev/null +++ b/lib/ocicachegc/README.md @@ -0,0 +1,56 @@ +# OCI Cache GC + +Mark-and-sweep garbage collector for the shared OCI cache at +`data_dir/system/oci-cache`. + +The cache is populated every time an image is pulled or pushed and was +previously write-only: nothing ever removed layer, config, or manifest +blobs, so the cache grew unbounded. This collector reclaims the space +used by manifests and layers that are no longer referenced from +`index.json`. + +## Configuration + +```yaml +images: + oci_cache_gc: + enabled: false + interval: 1h + min_blob_age: 1h +``` + +When enabled, the server runs one pass immediately and then every +`interval` until shutdown. + +## Algorithm + +1. **Mark.** Read `index.json` and walk every referenced descriptor. For + each manifest or manifest-index blob we descend into its `config`, + `layers`, `manifests`, and `subject` references. The set of visited + digests is the live set. +2. **Sweep.** List `blobs/sha256/`. Delete every file whose name is a + valid 64-char hex digest, is absent from the live set, and whose + `mtime` is older than `min_blob_age`. + +Blobs that are referenced but unparseable are kept as opaque leaves; the +collector never deletes a blob it cannot prove is dead. + +## Concurrency + +Pulls (`layout.AppendImage`) and pushes (`BlobStore.Put`) write blobs +before updating `index.json`. During that window a blob exists on disk +but is not yet in the live set. `min_blob_age` is the grace period that +protects these in-flight writes — it should comfortably exceed the time +it takes to pull the largest image in your environment. + +Temporary files (`.tmp` used by `BlobStore.Put`) are ignored +entirely because they do not match the blob filename pattern. + +## Metrics + +| Metric | Type | Description | +| ------ | ---- | ----------- | +| `hypeman_oci_cache_gc_sweeps_total` | counter | Sweeps, tagged by status | +| `hypeman_oci_cache_gc_sweep_duration_seconds` | histogram | Sweep duration | +| `hypeman_oci_cache_gc_deleted_blobs_total` | counter | Blobs deleted | +| `hypeman_oci_cache_gc_deleted_bytes_total` | counter | Bytes reclaimed | diff --git a/lib/ocicachegc/gc.go b/lib/ocicachegc/gc.go new file mode 100644 index 00000000..05c7ec86 --- /dev/null +++ b/lib/ocicachegc/gc.go @@ -0,0 +1,326 @@ +// Package ocicachegc performs mark-and-sweep garbage collection of the +// shared OCI cache at data_dir/system/oci-cache. +// +// The cache stores content-addressed blobs under blobs/sha256/ and an +// index.json that references one or more manifests. Blobs are written +// whenever an image is pulled or pushed and are never removed by the +// normal code paths, so without a GC the cache grows unbounded. +// +// The collector walks index.json and every manifest (or manifest index) +// reachable from it to build the set of "live" blob digests, then +// deletes any blob that is not live. Blobs whose mtime is within +// MinBlobAge are always kept; this is the grace period used to avoid +// racing with concurrent pulls, which write layer blobs before updating +// index.json. +package ocicachegc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/fs" + "log/slog" + "os" + "path/filepath" + "regexp" + "sync" + "time" + + "github.com/kernel/hypeman/lib/paths" + "go.opentelemetry.io/otel/metric" +) + +// Collector garbage-collects the shared OCI cache. +type Collector struct { + paths *paths.Paths + interval time.Duration + minBlobAge time.Duration + logger *slog.Logger + metrics *Metrics + now func() time.Time + mu sync.Mutex +} + +// Stats summarises the outcome of one Collect pass. +type Stats struct { + LiveBlobs int + ScannedBlobs int + DeletedBlobs int + DeletedBytes int64 + SkippedRecent int +} + +// NewCollector creates a collector. minBlobAge is the minimum age a blob +// must have before it becomes eligible for deletion; it protects blobs +// that are currently being written by a concurrent pull or push. +func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, logger *slog.Logger, meter metric.Meter) (*Collector, error) { + if p == nil { + return nil, errors.New("paths is required") + } + if interval <= 0 { + return nil, fmt.Errorf("interval must be positive, got %s", interval) + } + if minBlobAge < 0 { + return nil, fmt.Errorf("min_blob_age cannot be negative, got %s", minBlobAge) + } + if logger == nil { + logger = slog.Default() + } + c := &Collector{ + paths: p, + interval: interval, + minBlobAge: minBlobAge, + logger: logger.With("component", "oci_cache_gc"), + now: time.Now, + } + if meter != nil { + m, err := newMetrics(meter) + if err != nil { + return nil, fmt.Errorf("create oci cache gc metrics: %w", err) + } + c.metrics = m + } + return c, nil +} + +// Run performs one Collect pass immediately and then every interval until +// ctx is cancelled. It never returns an error; individual sweep failures +// are logged and metrics are recorded, but the loop keeps running. +func (c *Collector) Run(ctx context.Context) error { + c.logger.InfoContext(ctx, "oci cache gc started", "interval", c.interval, "min_blob_age", c.minBlobAge) + if _, err := c.Collect(ctx); err != nil { + c.logger.ErrorContext(ctx, "oci cache gc sweep failed", "error", err) + } + + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if _, err := c.Collect(ctx); err != nil { + c.logger.ErrorContext(ctx, "oci cache gc sweep failed", "error", err) + } + } + } +} + +// Collect performs one full mark-and-sweep pass over the OCI cache. +func (c *Collector) Collect(ctx context.Context) (Stats, error) { + c.mu.Lock() + defer c.mu.Unlock() + + start := c.now() + stats, err := c.collect(ctx) + status := "success" + if err != nil { + status = "error" + } + if c.metrics != nil { + c.metrics.RecordSweep(ctx, status, c.now().Sub(start), stats) + } + if err != nil { + return stats, err + } + c.logger.DebugContext(ctx, "oci cache gc sweep completed", + "live_blobs", stats.LiveBlobs, + "scanned_blobs", stats.ScannedBlobs, + "deleted_blobs", stats.DeletedBlobs, + "deleted_bytes", stats.DeletedBytes, + "skipped_recent", stats.SkippedRecent, + ) + return stats, nil +} + +func (c *Collector) collect(ctx context.Context) (Stats, error) { + var stats Stats + + blobDir := c.paths.OCICacheBlobDir() + if _, err := os.Stat(blobDir); errors.Is(err, fs.ErrNotExist) { + return stats, nil + } else if err != nil { + return stats, fmt.Errorf("stat blob dir: %w", err) + } + + live, err := liveBlobs(c.paths) + if err != nil { + return stats, fmt.Errorf("compute live set: %w", err) + } + stats.LiveBlobs = len(live) + + cutoff := c.now().Add(-c.minBlobAge) + + entries, err := os.ReadDir(blobDir) + if err != nil { + return stats, fmt.Errorf("read blob dir: %w", err) + } + + for _, entry := range entries { + if err := ctx.Err(); err != nil { + return stats, err + } + if entry.IsDir() { + continue + } + name := entry.Name() + if !isBlobName(name) { + continue + } + stats.ScannedBlobs++ + if _, ok := live[name]; ok { + continue + } + info, err := entry.Info() + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + continue + } + return stats, fmt.Errorf("stat blob %s: %w", name, err) + } + if info.ModTime().After(cutoff) { + stats.SkippedRecent++ + continue + } + path := filepath.Join(blobDir, name) + size := info.Size() + if err := os.Remove(path); err != nil { + if errors.Is(err, fs.ErrNotExist) { + continue + } + return stats, fmt.Errorf("remove blob %s: %w", name, err) + } + stats.DeletedBlobs++ + stats.DeletedBytes += size + c.logger.InfoContext(ctx, "deleted unreferenced oci blob", "digest", name, "size", size) + } + + return stats, nil +} + +// blobNamePattern matches a valid sha256 blob filename (64 hex chars). +// Temporary files (e.g. ".tmp" written by the blob store) are +// intentionally excluded. +var blobNamePattern = regexp.MustCompile(`^[0-9a-f]{64}$`) + +func isBlobName(name string) bool { + return blobNamePattern.MatchString(name) +} + +// descriptor captures the subset of OCI/Docker descriptor fields we care +// about when walking manifests. Fields we do not use (urls, platform, +// size, annotations, mediaType) are omitted. +type descriptor struct { + MediaType string `json:"mediaType"` + Digest string `json:"digest"` +} + +// manifestDoc is a union of the fields found on OCI image manifests, +// OCI image indexes, Docker v2 manifests, and Docker v2 manifest lists. +// Decoding into one shape lets us traverse either format without +// branching on the mediaType, which is often unreliable in practice. +type manifestDoc struct { + Config descriptor `json:"config"` + Layers []descriptor `json:"layers"` + Manifests []descriptor `json:"manifests"` + Subject *descriptor `json:"subject,omitempty"` +} + +// liveBlobs returns the set of blob hex digests reachable from the OCI +// cache index.json. Keys are bare hex (no "sha256:" prefix), matching +// the filenames under blobs/sha256/. +func liveBlobs(p *paths.Paths) (map[string]struct{}, error) { + live := make(map[string]struct{}) + + indexPath := p.OCICacheIndex() + data, err := os.ReadFile(indexPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return live, nil + } + return nil, fmt.Errorf("read index.json: %w", err) + } + + var index manifestDoc + if err := json.Unmarshal(data, &index); err != nil { + return nil, fmt.Errorf("parse index.json: %w", err) + } + + visited := make(map[string]struct{}) + for _, m := range index.Manifests { + if err := walkDescriptor(p, m, live, visited); err != nil { + return nil, err + } + } + return live, nil +} + +// walkDescriptor records desc.Digest in live, then if the blob is a +// manifest (or manifest index) recurses into its referenced blobs. +// Unparseable blobs are treated as opaque leaves (recorded but not +// descended into) so the sweep errs on the side of keeping data when +// disk contents are malformed. +func walkDescriptor(p *paths.Paths, desc descriptor, live, visited map[string]struct{}) error { + hex, ok := digestHex(desc.Digest) + if !ok { + return nil + } + if _, seen := visited[hex]; seen { + return nil + } + visited[hex] = struct{}{} + live[hex] = struct{}{} + + blobPath := p.OCICacheBlob(hex) + data, err := os.ReadFile(blobPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil + } + return fmt.Errorf("read blob %s: %w", hex, err) + } + + var doc manifestDoc + if err := json.Unmarshal(data, &doc); err != nil { + return nil + } + + if h, ok := digestHex(doc.Config.Digest); ok { + live[h] = struct{}{} + } + for _, layer := range doc.Layers { + if h, ok := digestHex(layer.Digest); ok { + live[h] = struct{}{} + } + } + if doc.Subject != nil { + if h, ok := digestHex(doc.Subject.Digest); ok { + live[h] = struct{}{} + } + } + for _, m := range doc.Manifests { + if err := walkDescriptor(p, m, live, visited); err != nil { + return err + } + } + return nil +} + +// digestHex extracts the hex portion of a "sha256:" digest. It +// returns false for empty strings, unsupported algorithms, or malformed +// hex so callers can skip bad data without erroring the sweep. +func digestHex(d string) (string, bool) { + if len(d) != len("sha256:")+64 { + return "", false + } + if d[:7] != "sha256:" { + return "", false + } + hex := d[7:] + if !blobNamePattern.MatchString(hex) { + return "", false + } + return hex, true +} diff --git a/lib/ocicachegc/gc_test.go b/lib/ocicachegc/gc_test.go new file mode 100644 index 00000000..499872df --- /dev/null +++ b/lib/ocicachegc/gc_test.go @@ -0,0 +1,335 @@ +package ocicachegc + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/kernel/hypeman/lib/paths" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// layoutBuilder incrementally writes OCI layout blobs and assembles an +// index.json so tests can set up realistic cache contents. +type layoutBuilder struct { + t *testing.T + paths *paths.Paths + blobsDir string + cacheDir string + entries []indexEntry +} + +type indexEntry struct { + MediaType string `json:"mediaType"` + Digest string `json:"digest"` + Size int `json:"size"` + Annotations map[string]string `json:"annotations,omitempty"` +} + +func newLayoutBuilder(t *testing.T, dataDir string) *layoutBuilder { + t.Helper() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + return &layoutBuilder{ + t: t, + paths: p, + blobsDir: blobsDir, + cacheDir: p.SystemOCICache(), + } +} + +// writeBlob stores content at blobs/sha256/ and returns the +// digest string in canonical "sha256:" form. +func (b *layoutBuilder) writeBlob(content []byte) string { + b.t.Helper() + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(b.t, os.WriteFile(filepath.Join(b.blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest +} + +// writeOrphan stores a blob that won't be referenced by any manifest. +// Returns the filename (hex digest) so tests can assert on it. +func (b *layoutBuilder) writeOrphan(content []byte, mtime time.Time) string { + b.t.Helper() + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + path := filepath.Join(b.blobsDir, hexDigest) + require.NoError(b.t, os.WriteFile(path, content, 0o644)) + if !mtime.IsZero() { + require.NoError(b.t, os.Chtimes(path, mtime, mtime)) + } + return hexDigest +} + +// addImage appends an image manifest to the layout. Config and layer +// blobs are written first, then the manifest itself, then an index entry +// is recorded so writeIndex will include it. +func (b *layoutBuilder) addImage(tag string, configContent []byte, layerContents [][]byte) { + b.t.Helper() + + configDigest := b.writeBlob(configContent) + + type desc struct { + MediaType string `json:"mediaType"` + Digest string `json:"digest"` + Size int `json:"size"` + } + layers := make([]desc, len(layerContents)) + for i, content := range layerContents { + digest := b.writeBlob(content) + layers[i] = desc{ + MediaType: "application/vnd.oci.image.layer.v1.tar+gzip", + Digest: digest, + Size: len(content), + } + } + + manifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": desc{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: configDigest, + Size: len(configContent), + }, + "layers": layers, + } + manifestBytes, err := json.Marshal(manifest) + require.NoError(b.t, err) + manifestDigest := b.writeBlob(manifestBytes) + + b.entries = append(b.entries, indexEntry{ + MediaType: "application/vnd.oci.image.manifest.v1+json", + Digest: manifestDigest, + Size: len(manifestBytes), + Annotations: map[string]string{ + "org.opencontainers.image.ref.name": tag, + }, + }) +} + +func (b *layoutBuilder) writeIndex() { + b.t.Helper() + index := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": b.entries, + } + data, err := json.Marshal(index) + require.NoError(b.t, err) + require.NoError(b.t, os.WriteFile(b.paths.OCICacheIndex(), data, 0o644)) + + layout := map[string]string{"imageLayoutVersion": "1.0.0"} + layoutBytes, err := json.Marshal(layout) + require.NoError(b.t, err) + require.NoError(b.t, os.WriteFile(b.paths.OCICacheLayout(), layoutBytes, 0o644)) +} + +func newCollectorForTest(t *testing.T, dataDir string, minBlobAge time.Duration, now time.Time) *Collector { + t.Helper() + c, err := NewCollector(paths.New(dataDir), time.Hour, minBlobAge, nil, nil) + require.NoError(t, err) + c.now = func() time.Time { return now } + return c +} + +func TestCollectNoCacheDirIsNoop(t *testing.T) { + dataDir := t.TempDir() + c := newCollectorForTest(t, dataDir, time.Hour, time.Now()) + + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + assert.Equal(t, 0, stats.ScannedBlobs) + assert.Equal(t, 0, stats.DeletedBlobs) +} + +func TestCollectKeepsLiveBlobs(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"config":"a"}`), [][]byte{[]byte("layer-a-1"), []byte("layer-a-2")}) + b.addImage("img-b", []byte(`{"config":"b"}`), [][]byte{[]byte("layer-b-1")}) + b.writeIndex() + + now := time.Now() + c := newCollectorForTest(t, dataDir, time.Minute, now) + + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // 2 manifests + 2 configs + 3 layers = 7 live blobs, all present. + assert.Equal(t, 7, stats.LiveBlobs) + assert.Equal(t, 7, stats.ScannedBlobs) + assert.Equal(t, 0, stats.DeletedBlobs) + + // All blob files should still exist. + entries, err := os.ReadDir(b.blobsDir) + require.NoError(t, err) + assert.Len(t, entries, 7) +} + +func TestCollectDeletesOrphans(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"config":"a"}`), [][]byte{[]byte("layer-a")}) + b.writeIndex() + + now := time.Now() + // Old orphan: well outside the grace period. + orphan := b.writeOrphan([]byte("orphaned-layer"), now.Add(-2*time.Hour)) + + c := newCollectorForTest(t, dataDir, time.Minute, now) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + assert.Equal(t, 3, stats.LiveBlobs) + assert.Equal(t, 4, stats.ScannedBlobs) + assert.Equal(t, 1, stats.DeletedBlobs) + assert.Equal(t, int64(len("orphaned-layer")), stats.DeletedBytes) + + _, err = os.Stat(filepath.Join(b.blobsDir, orphan)) + assert.True(t, os.IsNotExist(err), "orphan should be deleted") +} + +func TestCollectSkipsRecentBlobs(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"config":"a"}`), [][]byte{[]byte("layer-a")}) + b.writeIndex() + + now := time.Now() + // Orphan written recently — within grace period. + orphan := b.writeOrphan([]byte("still-being-pulled"), now.Add(-30*time.Second)) + + c := newCollectorForTest(t, dataDir, time.Minute, now) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + assert.Equal(t, 1, stats.SkippedRecent) + assert.Equal(t, 0, stats.DeletedBlobs) + + _, err = os.Stat(filepath.Join(b.blobsDir, orphan)) + assert.NoError(t, err, "recent orphan should be preserved") +} + +func TestCollectIgnoresTempAndNonBlobFiles(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"c":1}`), [][]byte{[]byte("layer")}) + b.writeIndex() + + // Simulate an in-progress BlobStore.Put: .tmp file. + tmpName := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.tmp" + require.NoError(t, os.WriteFile(filepath.Join(b.blobsDir, tmpName), []byte("partial"), 0o644)) + + // Also something unexpected with a wrong-length name. + require.NoError(t, os.WriteFile(filepath.Join(b.blobsDir, "not-a-blob"), []byte("x"), 0o644)) + + // Make both files "old" so the grace period doesn't protect them. + past := time.Now().Add(-2 * time.Hour) + require.NoError(t, os.Chtimes(filepath.Join(b.blobsDir, tmpName), past, past)) + require.NoError(t, os.Chtimes(filepath.Join(b.blobsDir, "not-a-blob"), past, past)) + + c := newCollectorForTest(t, dataDir, time.Minute, time.Now()) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // Neither the .tmp file nor the non-hex name should be touched. + assert.Equal(t, 0, stats.DeletedBlobs) + _, err = os.Stat(filepath.Join(b.blobsDir, tmpName)) + assert.NoError(t, err) + _, err = os.Stat(filepath.Join(b.blobsDir, "not-a-blob")) + assert.NoError(t, err) +} + +func TestCollectFollowsManifestIndex(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + + // Build an inner image: config + layer + manifest. + writeBlob := func(content []byte) string { + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(t, os.WriteFile(filepath.Join(blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest + } + + configContent := []byte(`{"inner-config":true}`) + layerContent := []byte("inner-layer") + configDigest := writeBlob(configContent) + layerDigest := writeBlob(layerContent) + + innerManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": configDigest, "size": len(configContent)}, + "layers": []map[string]any{{"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": layerDigest, "size": len(layerContent)}}, + } + innerBytes, err := json.Marshal(innerManifest) + require.NoError(t, err) + innerDigest := writeBlob(innerBytes) + + // Build an outer manifest index that references the inner manifest. + outerIndex := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": []map[string]any{{"mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": innerDigest, "size": len(innerBytes)}}, + } + outerBytes, err := json.Marshal(outerIndex) + require.NoError(t, err) + outerDigest := writeBlob(outerBytes) + + // Cache index.json points at the outer manifest index. + index := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": []map[string]any{{"mediaType": "application/vnd.oci.image.index.v1+json", "digest": outerDigest, "size": len(outerBytes)}}, + } + indexBytes, err := json.Marshal(index) + require.NoError(t, err) + require.NoError(t, os.WriteFile(p.OCICacheIndex(), indexBytes, 0o644)) + + // Drop an unrelated orphan to verify it still gets swept. + orphan := writeBlob([]byte("orphan-bytes")) + // writeBlob returns a sha256: prefix; we need the hex for os.Stat. + orphanHex := orphan[7:] + // Force past the grace period. + past := time.Now().Add(-2 * time.Hour) + require.NoError(t, os.Chtimes(filepath.Join(blobsDir, orphanHex), past, past)) + + c := newCollectorForTest(t, dataDir, time.Minute, time.Now()) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // Live: outer index + inner manifest + config + layer = 4. + assert.Equal(t, 4, stats.LiveBlobs) + assert.Equal(t, 1, stats.DeletedBlobs, "only the orphan should be deleted") + _, err = os.Stat(filepath.Join(blobsDir, orphanHex)) + assert.True(t, os.IsNotExist(err)) +} + +func TestNewCollectorValidatesArgs(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + + _, err := NewCollector(nil, time.Hour, time.Minute, nil, nil) + assert.Error(t, err) + + _, err = NewCollector(p, 0, time.Minute, nil, nil) + assert.Error(t, err) + + _, err = NewCollector(p, time.Hour, -time.Minute, nil, nil) + assert.Error(t, err) + + _, err = NewCollector(p, time.Hour, time.Minute, nil, nil) + assert.NoError(t, err) +} diff --git a/lib/ocicachegc/metrics.go b/lib/ocicachegc/metrics.go new file mode 100644 index 00000000..9ba9843e --- /dev/null +++ b/lib/ocicachegc/metrics.go @@ -0,0 +1,76 @@ +package ocicachegc + +import ( + "context" + "time" + + hypotel "github.com/kernel/hypeman/lib/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Metrics holds the OTel instruments for the OCI cache collector. +type Metrics struct { + sweepsTotal metric.Int64Counter + sweepDuration metric.Float64Histogram + deletedBlobs metric.Int64Counter + deletedBytes metric.Int64Counter +} + +func newMetrics(meter metric.Meter) (*Metrics, error) { + sweepsTotal, err := meter.Int64Counter( + "hypeman_oci_cache_gc_sweeps_total", + metric.WithDescription("Total number of OCI cache GC sweeps"), + ) + if err != nil { + return nil, err + } + + sweepDuration, err := meter.Float64Histogram( + "hypeman_oci_cache_gc_sweep_duration_seconds", + metric.WithDescription("Duration of OCI cache GC sweeps"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(hypotel.CommonDurationHistogramBuckets()...), + ) + if err != nil { + return nil, err + } + + deletedBlobs, err := meter.Int64Counter( + "hypeman_oci_cache_gc_deleted_blobs_total", + metric.WithDescription("Total number of blobs deleted by the OCI cache GC"), + ) + if err != nil { + return nil, err + } + + deletedBytes, err := meter.Int64Counter( + "hypeman_oci_cache_gc_deleted_bytes_total", + metric.WithDescription("Total bytes freed by the OCI cache GC"), + metric.WithUnit("By"), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + sweepsTotal: sweepsTotal, + sweepDuration: sweepDuration, + deletedBlobs: deletedBlobs, + deletedBytes: deletedBytes, + }, nil +} + +// RecordSweep records the outcome of one sweep. +func (m *Metrics) RecordSweep(ctx context.Context, status string, duration time.Duration, stats Stats) { + if m == nil { + return + } + attrs := metric.WithAttributes(attribute.String("status", status)) + m.sweepsTotal.Add(ctx, 1, attrs) + m.sweepDuration.Record(ctx, duration.Seconds(), attrs) + if stats.DeletedBlobs > 0 { + m.deletedBlobs.Add(ctx, int64(stats.DeletedBlobs)) + m.deletedBytes.Add(ctx, stats.DeletedBytes) + } +} From 8c46b4dcb786a2675276ae72d4ac5bafd24ef2cd Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Thu, 23 Apr 2026 17:22:33 -0400 Subject: [PATCH 2/6] test: cover oci cache gc config validation --- cmd/api/config/config.go | 16 +++-- cmd/api/config/config_test.go | 60 ++++++++++++++++++ cmd/api/oci_cache_gc_test.go | 112 ++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+), 4 deletions(-) create mode 100644 cmd/api/oci_cache_gc_test.go diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index 8173f4f4..a39e801d 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -576,11 +576,19 @@ func (c *Config) Validate() error { for i, pattern := range c.Images.AutoDelete.Allowed { c.Images.AutoDelete.Allowed[i] = strings.TrimSpace(pattern) } - if err := validateDuration("images.oci_cache_gc.interval", c.Images.OCICacheGC.Interval); err != nil { - return err + ociCacheGCInterval, err := time.ParseDuration(c.Images.OCICacheGC.Interval) + if err != nil { + return fmt.Errorf("images.oci_cache_gc.interval must be a valid duration, got %q: %w", c.Images.OCICacheGC.Interval, err) } - if err := validateDuration("images.oci_cache_gc.min_blob_age", c.Images.OCICacheGC.MinBlobAge); err != nil { - return err + if ociCacheGCInterval <= 0 { + return fmt.Errorf("images.oci_cache_gc.interval must be positive, got %q", c.Images.OCICacheGC.Interval) + } + ociCacheGCMinBlobAge, err := time.ParseDuration(c.Images.OCICacheGC.MinBlobAge) + if err != nil { + return fmt.Errorf("images.oci_cache_gc.min_blob_age must be a valid duration, got %q: %w", c.Images.OCICacheGC.MinBlobAge, err) + } + if ociCacheGCMinBlobAge < 0 { + return fmt.Errorf("images.oci_cache_gc.min_blob_age cannot be negative, got %q", c.Images.OCICacheGC.MinBlobAge) } algorithm := strings.ToLower(c.Snapshot.CompressionDefault.Algorithm) c.Snapshot.CompressionDefault.Algorithm = algorithm diff --git a/cmd/api/config/config_test.go b/cmd/api/config/config_test.go index b63bda18..e7183504 100644 --- a/cmd/api/config/config_test.go +++ b/cmd/api/config/config_test.go @@ -43,6 +43,15 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) { if len(cfg.Images.AutoDelete.Allowed) != 0 { t.Fatalf("expected default images.auto_delete.allowed to be empty, got %v", cfg.Images.AutoDelete.Allowed) } + if cfg.Images.OCICacheGC.Enabled { + t.Fatalf("expected default images.oci_cache_gc.enabled to be false") + } + if cfg.Images.OCICacheGC.Interval != "1h" { + t.Fatalf("expected default images.oci_cache_gc.interval to be 1h, got %q", cfg.Images.OCICacheGC.Interval) + } + if cfg.Images.OCICacheGC.MinBlobAge != "1h" { + t.Fatalf("expected default images.oci_cache_gc.min_blob_age to be 1h, got %q", cfg.Images.OCICacheGC.MinBlobAge) + } if cfg.Instances.LifecycleEventBufferSize != 256 { t.Fatalf("expected default instances.lifecycle_event_buffer_size to be 256, got %d", cfg.Instances.LifecycleEventBufferSize) } @@ -247,6 +256,57 @@ func TestValidateRejectsInvalidImageAutoDeleteUnusedFor(t *testing.T) { } } +func TestLoadUsesDefaultOCICacheGCSettingsWhenEnabledOnly(t *testing.T) { + tmp := t.TempDir() + cfgPath := filepath.Join(tmp, "config.yaml") + if err := os.WriteFile(cfgPath, []byte("images:\n oci_cache_gc:\n enabled: true\n"), 0600); err != nil { + t.Fatalf("write temp config: %v", err) + } + + cfg, err := Load(cfgPath) + if err != nil { + t.Fatalf("load config: %v", err) + } + + if !cfg.Images.OCICacheGC.Enabled { + t.Fatalf("expected images.oci_cache_gc.enabled override to be true") + } + if cfg.Images.OCICacheGC.Interval != "1h" { + t.Fatalf("expected default images.oci_cache_gc.interval to remain 1h, got %q", cfg.Images.OCICacheGC.Interval) + } + if cfg.Images.OCICacheGC.MinBlobAge != "1h" { + t.Fatalf("expected default images.oci_cache_gc.min_blob_age to remain 1h, got %q", cfg.Images.OCICacheGC.MinBlobAge) + } +} + +func TestValidateRejectsInvalidOCICacheGCInterval(t *testing.T) { + cfg := defaultConfig() + cfg.Images.OCICacheGC.Interval = "not-a-duration" + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected validation error for invalid images.oci_cache_gc.interval") + } + + cfg = defaultConfig() + cfg.Images.OCICacheGC.Interval = "0s" + + err = cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "must be positive") { + t.Fatalf("expected positive validation error for zero images.oci_cache_gc.interval, got %v", err) + } +} + +func TestValidateRejectsNegativeOCICacheGCMinBlobAge(t *testing.T) { + cfg := defaultConfig() + cfg.Images.OCICacheGC.MinBlobAge = "-1s" + + err := cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "cannot be negative") { + t.Fatalf("expected non-negative validation error for images.oci_cache_gc.min_blob_age, got %v", err) + } +} + func TestValidateTrimsImageAutoDeleteAllowedPatterns(t *testing.T) { cfg := defaultConfig() cfg.Images.AutoDelete.Allowed = []string{" docker.io/library/* ", " ", "ghcr.io/kernel/*"} diff --git a/cmd/api/oci_cache_gc_test.go b/cmd/api/oci_cache_gc_test.go new file mode 100644 index 00000000..9887bf48 --- /dev/null +++ b/cmd/api/oci_cache_gc_test.go @@ -0,0 +1,112 @@ +package main + +import ( + "context" + "io" + "log/slog" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/kernel/hypeman/cmd/api/config" + "golang.org/x/sync/errgroup" +) + +type stubOCICacheGCRunner struct { + runCount atomic.Int32 +} + +func (s *stubOCICacheGCRunner) Run(ctx context.Context) error { + s.runCount.Add(1) + <-ctx.Done() + return nil +} + +func loadTestConfig(t *testing.T) *config.Config { + t.Helper() + + tmp := t.TempDir() + cfgPath := filepath.Join(tmp, "config.yaml") + if err := os.WriteFile(cfgPath, []byte("{}\n"), 0o600); err != nil { + t.Fatalf("write temp config: %v", err) + } + + cfg, err := config.Load(cfgPath) + if err != nil { + t.Fatalf("load temp config: %v", err) + } + return cfg +} + +func TestConfigureOCICacheGCSkipsDisabledConfig(t *testing.T) { + cfg := loadTestConfig(t) + + runner, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + if err != nil { + t.Fatalf("configure disabled oci cache gc: %v", err) + } + if runner != nil { + t.Fatalf("expected disabled oci cache gc to return nil runner") + } +} + +func TestConfigureOCICacheGCBuildsCollectorWhenEnabled(t *testing.T) { + cfg := loadTestConfig(t) + cfg.Images.OCICacheGC.Enabled = true + cfg.Images.OCICacheGC.Interval = "2m" + cfg.Images.OCICacheGC.MinBlobAge = "30s" + + runner, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + if err != nil { + t.Fatalf("configure enabled oci cache gc: %v", err) + } + if runner == nil { + t.Fatalf("expected enabled oci cache gc to return runner") + } +} + +func TestConfigureOCICacheGCRejectsInvalidInterval(t *testing.T) { + cfg := loadTestConfig(t) + cfg.Images.OCICacheGC.Enabled = true + cfg.Images.OCICacheGC.Interval = "0s" + + if _, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil); err == nil { + t.Fatalf("expected invalid oci cache gc interval to fail") + } +} + +func TestStartOCICacheGCSkipsNilRunner(t *testing.T) { + grp, ctx := errgroup.WithContext(context.Background()) + + started := startOCICacheGC(grp, ctx, nil) + if started { + t.Fatalf("expected nil oci cache gc runner not to start") + } +} + +func TestStartOCICacheGCStartsRunner(t *testing.T) { + grp, ctx := errgroup.WithContext(context.Background()) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + runner := &stubOCICacheGCRunner{} + started := startOCICacheGC(grp, ctx, runner) + if !started { + t.Fatalf("expected oci cache gc runner to start") + } + + deadline := time.Now().Add(time.Second) + for runner.runCount.Load() == 0 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + if runner.runCount.Load() != 1 { + t.Fatalf("expected runner to be started once, got %d", runner.runCount.Load()) + } + + cancel() + if err := grp.Wait(); err != nil { + t.Fatalf("wait for oci cache gc runner: %v", err) + } +} From a4a40fff3fbd29e28beef6f06e88060145237dd4 Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Fri, 24 Apr 2026 19:30:28 +0000 Subject: [PATCH 3/6] Recurse into subject descriptor in OCI GC mark phase Previously walkDescriptor added subject.Digest to the live set as a leaf without descending, so the subject manifest's own config and layers could be swept. Recurse like manifests[] so the full referrer chain stays marked. Co-Authored-By: Claude Opus 4.7 --- lib/ocicachegc/gc.go | 4 +-- lib/ocicachegc/gc_test.go | 73 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/lib/ocicachegc/gc.go b/lib/ocicachegc/gc.go index 05c7ec86..1a1a6904 100644 --- a/lib/ocicachegc/gc.go +++ b/lib/ocicachegc/gc.go @@ -296,8 +296,8 @@ func walkDescriptor(p *paths.Paths, desc descriptor, live, visited map[string]st } } if doc.Subject != nil { - if h, ok := digestHex(doc.Subject.Digest); ok { - live[h] = struct{}{} + if err := walkDescriptor(p, *doc.Subject, live, visited); err != nil { + return err } } for _, m := range doc.Manifests { diff --git a/lib/ocicachegc/gc_test.go b/lib/ocicachegc/gc_test.go index 499872df..b385e5fd 100644 --- a/lib/ocicachegc/gc_test.go +++ b/lib/ocicachegc/gc_test.go @@ -317,6 +317,79 @@ func TestCollectFollowsManifestIndex(t *testing.T) { assert.True(t, os.IsNotExist(err)) } +func TestCollectRecursesIntoSubject(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + + writeBlob := func(content []byte) string { + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(t, os.WriteFile(filepath.Join(blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest + } + + // Subject image: config + layer + manifest. + subjectConfig := []byte(`{"subject-config":true}`) + subjectLayer := []byte("subject-layer") + subjectConfigDigest := writeBlob(subjectConfig) + subjectLayerDigest := writeBlob(subjectLayer) + + subjectManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": subjectConfigDigest, "size": len(subjectConfig)}, + "layers": []map[string]any{{"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": subjectLayerDigest, "size": len(subjectLayer)}}, + } + subjectBytes, err := json.Marshal(subjectManifest) + require.NoError(t, err) + subjectDigest := writeBlob(subjectBytes) + + // Referrer manifest: has its own config + layer, points at subject. + referrerConfig := []byte(`{"referrer-config":true}`) + referrerLayer := []byte("referrer-layer") + referrerConfigDigest := writeBlob(referrerConfig) + referrerLayerDigest := writeBlob(referrerLayer) + + referrerManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": referrerConfigDigest, "size": len(referrerConfig)}, + "layers": []map[string]any{{"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": referrerLayerDigest, "size": len(referrerLayer)}}, + "subject": map[string]any{"mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": subjectDigest, "size": len(subjectBytes)}, + } + referrerBytes, err := json.Marshal(referrerManifest) + require.NoError(t, err) + referrerDigest := writeBlob(referrerBytes) + + // Cache index.json references only the referrer; the subject should + // stay live via the subject link. + index := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": []map[string]any{{"mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": referrerDigest, "size": len(referrerBytes)}}, + } + indexBytes, err := json.Marshal(index) + require.NoError(t, err) + require.NoError(t, os.WriteFile(p.OCICacheIndex(), indexBytes, 0o644)) + + c := newCollectorForTest(t, dataDir, time.Minute, time.Now()) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // Live: referrer manifest + referrer config + referrer layer + // + subject manifest + subject config + subject layer = 6. + assert.Equal(t, 6, stats.LiveBlobs) + assert.Equal(t, 0, stats.DeletedBlobs, "subject's config and layers must not be swept") + + // Double-check subject's transitive blobs still exist. + for _, d := range []string{subjectConfigDigest, subjectLayerDigest, subjectDigest} { + _, err := os.Stat(filepath.Join(blobsDir, d[7:])) + assert.NoError(t, err, "subject-reachable blob %s should remain", d) + } +} + func TestNewCollectorValidatesArgs(t *testing.T) { dataDir := t.TempDir() p := paths.New(dataDir) From 8507ca05694015bee5f651945059de69af9f1fa5 Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Thu, 7 May 2026 22:12:50 +0000 Subject: [PATCH 4/6] Mark BuildKit cache manifests live in OCI cache GC The registry stores manifest and layer blobs for cache/* pushes in the shared OCI blob dir but skips triggerConversion, so those blobs are never rooted in index.json. With GC enabled this caused the sweep to delete cache blobs the registry was still serving from its in-memory tag map, breaking BuildKit cache exports. Track cache/* tag -> manifest digest in the registry and expose the set via LiveCacheManifestDigests. The GC takes a RootsProvider; on every sweep it walks those manifests' configs and layers as additional roots alongside index.json. Co-Authored-By: Claude Opus 4.7 --- cmd/api/main.go | 5 +- cmd/api/oci_cache_gc_test.go | 6 +-- lib/ocicachegc/README.md | 7 ++- lib/ocicachegc/gc.go | 69 +++++++++++++++++++------- lib/ocicachegc/gc_test.go | 88 +++++++++++++++++++++++++++++++-- lib/registry/cache_tags_test.go | 41 +++++++++++++++ lib/registry/registry.go | 54 +++++++++++++++++++- 7 files changed, 239 insertions(+), 31 deletions(-) create mode 100644 lib/registry/cache_tags_test.go diff --git a/cmd/api/main.go b/cmd/api/main.go index 99ce7ebc..b6917040 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -103,7 +103,7 @@ type ociCacheGCRunner interface { Run(ctx context.Context) error } -func configureOCICacheGC(cfg *config.Config, logger *slog.Logger, meter metric.Meter) (ociCacheGCRunner, error) { +func configureOCICacheGC(cfg *config.Config, roots ocicachegc.RootsProvider, logger *slog.Logger, meter metric.Meter) (ociCacheGCRunner, error) { if cfg == nil || !cfg.Images.OCICacheGC.Enabled { return nil, nil } @@ -117,7 +117,7 @@ func configureOCICacheGC(cfg *config.Config, logger *slog.Logger, meter metric.M return nil, fmt.Errorf("invalid images.oci_cache_gc.min_blob_age %q: %w", cfg.Images.OCICacheGC.MinBlobAge, err) } - return ocicachegc.NewCollector(paths.New(cfg.DataDir), interval, minBlobAge, logger, meter) + return ocicachegc.NewCollector(paths.New(cfg.DataDir), interval, minBlobAge, roots, logger, meter) } func startOCICacheGC(grp *errgroup.Group, ctx context.Context, runner ociCacheGCRunner) bool { @@ -525,6 +525,7 @@ func run() error { ociGC, err := configureOCICacheGC( app.Config, + app.Registry, logger, otelProvider.MeterFor(loglib.SubsystemImages), ) diff --git a/cmd/api/oci_cache_gc_test.go b/cmd/api/oci_cache_gc_test.go index 9887bf48..c7ac5d86 100644 --- a/cmd/api/oci_cache_gc_test.go +++ b/cmd/api/oci_cache_gc_test.go @@ -43,7 +43,7 @@ func loadTestConfig(t *testing.T) *config.Config { func TestConfigureOCICacheGCSkipsDisabledConfig(t *testing.T) { cfg := loadTestConfig(t) - runner, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + runner, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) if err != nil { t.Fatalf("configure disabled oci cache gc: %v", err) } @@ -58,7 +58,7 @@ func TestConfigureOCICacheGCBuildsCollectorWhenEnabled(t *testing.T) { cfg.Images.OCICacheGC.Interval = "2m" cfg.Images.OCICacheGC.MinBlobAge = "30s" - runner, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + runner, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) if err != nil { t.Fatalf("configure enabled oci cache gc: %v", err) } @@ -72,7 +72,7 @@ func TestConfigureOCICacheGCRejectsInvalidInterval(t *testing.T) { cfg.Images.OCICacheGC.Enabled = true cfg.Images.OCICacheGC.Interval = "0s" - if _, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil); err == nil { + if _, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil); err == nil { t.Fatalf("expected invalid oci cache gc interval to fail") } } diff --git a/lib/ocicachegc/README.md b/lib/ocicachegc/README.md index b209416b..96341ec0 100644 --- a/lib/ocicachegc/README.md +++ b/lib/ocicachegc/README.md @@ -26,8 +26,11 @@ When enabled, the server runs one pass immediately and then every 1. **Mark.** Read `index.json` and walk every referenced descriptor. For each manifest or manifest-index blob we descend into its `config`, - `layers`, `manifests`, and `subject` references. The set of visited - digests is the live set. + `layers`, `manifests`, and `subject` references. Any extra digests + returned by the configured `RootsProvider` are walked the same way; + this is how BuildKit cache exports under `cache/*` (which the + registry tracks in memory but does not root in `index.json`) stay + marked. The set of visited digests is the live set. 2. **Sweep.** List `blobs/sha256/`. Delete every file whose name is a valid 64-char hex digest, is absent from the live set, and whose `mtime` is older than `min_blob_age`. diff --git a/lib/ocicachegc/gc.go b/lib/ocicachegc/gc.go index 1a1a6904..85da3899 100644 --- a/lib/ocicachegc/gc.go +++ b/lib/ocicachegc/gc.go @@ -31,6 +31,14 @@ import ( "go.opentelemetry.io/otel/metric" ) +// RootsProvider returns extra manifest digests (in "sha256:" form) +// that should be treated as live alongside everything reachable from +// index.json. Used for blobs the registry tracks in memory but does not +// root in the OCI layout (e.g. BuildKit cache exports under cache/*). +type RootsProvider interface { + LiveCacheManifestDigests() []string +} + // Collector garbage-collects the shared OCI cache. type Collector struct { paths *paths.Paths @@ -38,6 +46,7 @@ type Collector struct { minBlobAge time.Duration logger *slog.Logger metrics *Metrics + roots RootsProvider now func() time.Time mu sync.Mutex } @@ -53,8 +62,10 @@ type Stats struct { // NewCollector creates a collector. minBlobAge is the minimum age a blob // must have before it becomes eligible for deletion; it protects blobs -// that are currently being written by a concurrent pull or push. -func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, logger *slog.Logger, meter metric.Meter) (*Collector, error) { +// that are currently being written by a concurrent pull or push. roots +// may be nil; when non-nil it is consulted on every sweep for additional +// manifest digests to mark live. +func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, roots RootsProvider, logger *slog.Logger, meter metric.Meter) (*Collector, error) { if p == nil { return nil, errors.New("paths is required") } @@ -72,6 +83,7 @@ func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, logger *sl interval: interval, minBlobAge: minBlobAge, logger: logger.With("component", "oci_cache_gc"), + roots: roots, now: time.Now, } if meter != nil { @@ -145,7 +157,11 @@ func (c *Collector) collect(ctx context.Context) (Stats, error) { return stats, fmt.Errorf("stat blob dir: %w", err) } - live, err := liveBlobs(c.paths) + var extraRoots []string + if c.roots != nil { + extraRoots = c.roots.LiveCacheManifestDigests() + } + live, err := liveBlobs(c.paths, extraRoots) if err != nil { return stats, fmt.Errorf("compute live set: %w", err) } @@ -229,28 +245,35 @@ type manifestDoc struct { } // liveBlobs returns the set of blob hex digests reachable from the OCI -// cache index.json. Keys are bare hex (no "sha256:" prefix), matching -// the filenames under blobs/sha256/. -func liveBlobs(p *paths.Paths) (map[string]struct{}, error) { +// cache index.json plus any extraRoots provided. Keys are bare hex +// (no "sha256:" prefix), matching the filenames under blobs/sha256/. +// extraRoots entries are accepted in either "sha256:" or bare hex +// form and silently skipped if malformed. +func liveBlobs(p *paths.Paths, extraRoots []string) (map[string]struct{}, error) { live := make(map[string]struct{}) + visited := make(map[string]struct{}) indexPath := p.OCICacheIndex() data, err := os.ReadFile(indexPath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - return live, nil - } + switch { + case errors.Is(err, fs.ErrNotExist): + // Cache hasn't been initialised yet; only extraRoots are live. + case err != nil: return nil, fmt.Errorf("read index.json: %w", err) + default: + var index manifestDoc + if err := json.Unmarshal(data, &index); err != nil { + return nil, fmt.Errorf("parse index.json: %w", err) + } + for _, m := range index.Manifests { + if err := walkDescriptor(p, m, live, visited); err != nil { + return nil, err + } + } } - var index manifestDoc - if err := json.Unmarshal(data, &index); err != nil { - return nil, fmt.Errorf("parse index.json: %w", err) - } - - visited := make(map[string]struct{}) - for _, m := range index.Manifests { - if err := walkDescriptor(p, m, live, visited); err != nil { + for _, digest := range extraRoots { + if err := walkDescriptor(p, descriptor{Digest: normaliseDigest(digest)}, live, visited); err != nil { return nil, err } } @@ -308,6 +331,16 @@ func walkDescriptor(p *paths.Paths, desc descriptor, live, visited map[string]st return nil } +// normaliseDigest accepts either "sha256:" or a bare 64-char hex +// string and returns the "sha256:" form expected by walkDescriptor. +// Anything else is returned unchanged so digestHex can reject it. +func normaliseDigest(d string) string { + if len(d) == 64 && blobNamePattern.MatchString(d) { + return "sha256:" + d + } + return d +} + // digestHex extracts the hex portion of a "sha256:" digest. It // returns false for empty strings, unsupported algorithms, or malformed // hex so callers can skip bad data without erroring the sweep. diff --git a/lib/ocicachegc/gc_test.go b/lib/ocicachegc/gc_test.go index b385e5fd..48796676 100644 --- a/lib/ocicachegc/gc_test.go +++ b/lib/ocicachegc/gc_test.go @@ -135,7 +135,7 @@ func (b *layoutBuilder) writeIndex() { func newCollectorForTest(t *testing.T, dataDir string, minBlobAge time.Duration, now time.Time) *Collector { t.Helper() - c, err := NewCollector(paths.New(dataDir), time.Hour, minBlobAge, nil, nil) + c, err := NewCollector(paths.New(dataDir), time.Hour, minBlobAge, nil, nil, nil) require.NoError(t, err) c.now = func() time.Time { return now } return c @@ -390,19 +390,97 @@ func TestCollectRecursesIntoSubject(t *testing.T) { } } +// stubRoots is a RootsProvider that returns a fixed list of digests. +type stubRoots struct{ digests []string } + +func (s stubRoots) LiveCacheManifestDigests() []string { return s.digests } + +func TestCollectKeepsBlobsReachableFromExtraRoots(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + + writeBlob := func(content []byte) string { + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(t, os.WriteFile(filepath.Join(blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest + } + + // Write a registry-only cache manifest (e.g. a BuildKit cache export): + // it has a config + layer, but no entry in index.json. + cacheConfig := []byte(`{"cache-config":true}`) + cacheLayer := []byte("cache-layer-bytes") + cacheConfigDigest := writeBlob(cacheConfig) + cacheLayerDigest := writeBlob(cacheLayer) + + cacheManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": cacheConfigDigest, "size": len(cacheConfig)}, + "layers": []map[string]any{{"mediaType": "application/vnd.buildkit.cacheconfig.v0", "digest": cacheLayerDigest, "size": len(cacheLayer)}}, + } + cacheBytes, err := json.Marshal(cacheManifest) + require.NoError(t, err) + cacheDigest := writeBlob(cacheBytes) + + // Empty index.json: nothing rooted there at all. + emptyIndex := map[string]any{"schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests": []any{}} + indexBytes, err := json.Marshal(emptyIndex) + require.NoError(t, err) + require.NoError(t, os.WriteFile(p.OCICacheIndex(), indexBytes, 0o644)) + + // Force every blob's mtime past the grace period. + past := time.Now().Add(-2 * time.Hour) + for _, d := range []string{cacheConfigDigest, cacheLayerDigest, cacheDigest} { + require.NoError(t, os.Chtimes(filepath.Join(blobsDir, d[7:]), past, past)) + } + + // Without roots, every blob is unreachable and should be swept. + cWithout, err := NewCollector(p, time.Hour, time.Minute, nil, nil, nil) + require.NoError(t, err) + cWithout.now = func() time.Time { return time.Now() } + statsWithout, err := cWithout.Collect(context.Background()) + require.NoError(t, err) + assert.Equal(t, 3, statsWithout.DeletedBlobs) + + // Recreate the blobs and re-run with the registry as a RootsProvider. + writeBlob(cacheConfig) + writeBlob(cacheLayer) + writeBlob(cacheBytes) + for _, d := range []string{cacheConfigDigest, cacheLayerDigest, cacheDigest} { + require.NoError(t, os.Chtimes(filepath.Join(blobsDir, d[7:]), past, past)) + } + + roots := stubRoots{digests: []string{cacheDigest}} + cWith, err := NewCollector(p, time.Hour, time.Minute, roots, nil, nil) + require.NoError(t, err) + cWith.now = func() time.Time { return time.Now() } + statsWith, err := cWith.Collect(context.Background()) + require.NoError(t, err) + assert.Equal(t, 3, statsWith.LiveBlobs, "manifest + config + layer all marked live via root") + assert.Equal(t, 0, statsWith.DeletedBlobs, "nothing should be deleted when registry holds the manifest") + + for _, d := range []string{cacheConfigDigest, cacheLayerDigest, cacheDigest} { + _, err := os.Stat(filepath.Join(blobsDir, d[7:])) + assert.NoError(t, err, "blob %s must remain", d) + } +} + func TestNewCollectorValidatesArgs(t *testing.T) { dataDir := t.TempDir() p := paths.New(dataDir) - _, err := NewCollector(nil, time.Hour, time.Minute, nil, nil) + _, err := NewCollector(nil, time.Hour, time.Minute, nil, nil, nil) assert.Error(t, err) - _, err = NewCollector(p, 0, time.Minute, nil, nil) + _, err = NewCollector(p, 0, time.Minute, nil, nil, nil) assert.Error(t, err) - _, err = NewCollector(p, time.Hour, -time.Minute, nil, nil) + _, err = NewCollector(p, time.Hour, -time.Minute, nil, nil, nil) assert.Error(t, err) - _, err = NewCollector(p, time.Hour, time.Minute, nil, nil) + _, err = NewCollector(p, time.Hour, time.Minute, nil, nil, nil) assert.NoError(t, err) } diff --git a/lib/registry/cache_tags_test.go b/lib/registry/cache_tags_test.go new file mode 100644 index 00000000..ad2a9f0b --- /dev/null +++ b/lib/registry/cache_tags_test.go @@ -0,0 +1,41 @@ +package registry + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsCacheRepo(t *testing.T) { + cases := map[string]bool{ + "cache/global/node": true, + "cache/tenant-x": true, + "10.102.0.1:8083/cache/global/node": true, + "builds/abc123": false, + "docker.io/library/alpine": false, + "": false, + "prefixcache/foo": false, + } + for repo, want := range cases { + assert.Equal(t, want, isCacheRepo(repo), "repo=%q", repo) + } +} + +func TestLiveCacheManifestDigestsTracksAndDeduplicates(t *testing.T) { + r := &Registry{cacheTags: map[string]string{}} + + assert.Empty(t, r.LiveCacheManifestDigests(), "empty registry has no cache roots") + + r.recordCacheTag("cache/global/node", "v1", "sha256:aaa") + r.recordCacheTag("cache/global/node", "v2", "sha256:bbb") + // Two tags pointing at the same digest should yield one entry. + r.recordCacheTag("cache/global/python", "v1", "sha256:aaa") + + got := r.LiveCacheManifestDigests() + assert.ElementsMatch(t, []string{"sha256:aaa", "sha256:bbb"}, got) + + // Overwriting a tag replaces the old digest for that tag. + r.recordCacheTag("cache/global/node", "v1", "sha256:ccc") + got = r.LiveCacheManifestDigests() + assert.ElementsMatch(t, []string{"sha256:aaa", "sha256:bbb", "sha256:ccc"}, got) +} diff --git a/lib/registry/registry.go b/lib/registry/registry.go index ea81d32a..bece275e 100644 --- a/lib/registry/registry.go +++ b/lib/registry/registry.go @@ -14,6 +14,7 @@ import ( "os" "regexp" "strings" + "sync" "github.com/google/go-containerregistry/pkg/registry" v1 "github.com/google/go-containerregistry/pkg/v1" @@ -31,6 +32,15 @@ type Registry struct { imageManager images.Manager blobStore *BlobStore handler http.Handler + + // cacheTags tracks BuildKit cache repo tags ("cache/<...>:") + // to their manifest digest. The underlying go-containerregistry registry + // keeps tags in memory and they are not rooted in the OCI cache + // index.json, so without this map the OCI cache GC has no way to mark + // the manifest, config, and layer blobs that BuildKit cache exports + // rely on. Cleared with the process; tags do not survive restart. + cacheTagsMu sync.RWMutex + cacheTags map[string]string } // manifestPutPattern matches PUT requests to /v2/{name}/manifests/{reference} @@ -54,11 +64,50 @@ func New(p *paths.Paths, imgManager images.Manager) (*Registry, error) { imageManager: imgManager, blobStore: blobStore, handler: regHandler, + cacheTags: make(map[string]string), } return r, nil } +// LiveCacheManifestDigests returns the manifest digests of every BuildKit +// cache tag the registry has accepted since startup. Used by the OCI cache +// GC as additional roots: the in-memory registry never adds these to +// index.json, so without these roots the GC would sweep cache blobs that +// are still being served to BuildKit clients. +func (r *Registry) LiveCacheManifestDigests() []string { + r.cacheTagsMu.RLock() + defer r.cacheTagsMu.RUnlock() + if len(r.cacheTags) == 0 { + return nil + } + seen := make(map[string]struct{}, len(r.cacheTags)) + out := make([]string, 0, len(r.cacheTags)) + for _, digest := range r.cacheTags { + if _, ok := seen[digest]; ok { + continue + } + seen[digest] = struct{}{} + out = append(out, digest) + } + return out +} + +// recordCacheTag stores the (repo, reference) -> digest mapping for a +// BuildKit cache push. Replaces any prior digest for the same tag. +func (r *Registry) recordCacheTag(repo, reference, digest string) { + r.cacheTagsMu.Lock() + defer r.cacheTagsMu.Unlock() + r.cacheTags[repo+":"+reference] = digest +} + +// isCacheRepo reports whether a registry repo path is a BuildKit cache +// repo. The repo may include a host prefix (e.g. +// "10.0.0.1:8083/cache/global/node"). +func isCacheRepo(repo string) bool { + return strings.HasPrefix(repo, "cache/") || strings.Contains(repo, "/cache/") +} + // Handler returns the http.Handler for the registry endpoints. // This wraps the underlying registry to intercept manifest PUTs and trigger conversion. func (r *Registry) Handler() http.Handler { @@ -94,6 +143,9 @@ func (r *Registry) Handler() http.Handler { r.handler.ServeHTTP(wrapper, req) if wrapper.statusCode == http.StatusCreated { + if isCacheRepo(pathRepo) { + r.recordCacheTag(pathRepo, reference, digest) + } // Use pathRepo (without registry host prefix) so pushed images // are stored under their short name. This ensures consistency: // `hypeman push myapp` stores as "docker.io/library/myapp:latest" @@ -140,7 +192,7 @@ func (r *Registry) triggerConversion(repo, reference, dockerDigest string) { // unpacked as a standard OCI image. BuildKit imports them directly from // the registry without needing local conversion. // Note: repo may include host prefix (e.g., "10.102.0.1:8083/cache/global/node") - if strings.HasPrefix(repo, "cache/") || strings.Contains(repo, "/cache/") { + if isCacheRepo(repo) { return } From 395b8b79f5e6981d50ec437f7e87c70c4f32c7a6 Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Thu, 7 May 2026 22:25:43 +0000 Subject: [PATCH 5/6] Walk index.json's top-level subject in OCI GC mark phase OCI v1.1 lets the index itself carry a subject descriptor. liveBlobs only iterated index.Manifests, so a blob reachable solely via the index-level subject was never marked and could be swept. Co-Authored-By: Claude Opus 4.7 --- lib/ocicachegc/gc.go | 7 ++++++ lib/ocicachegc/gc_test.go | 49 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/lib/ocicachegc/gc.go b/lib/ocicachegc/gc.go index 85da3899..2cfc110c 100644 --- a/lib/ocicachegc/gc.go +++ b/lib/ocicachegc/gc.go @@ -270,6 +270,13 @@ func liveBlobs(p *paths.Paths, extraRoots []string) (map[string]struct{}, error) return nil, err } } + // OCI v1.1 allows index.json itself to carry a subject descriptor; + // recurse into it so anything reachable that way stays marked. + if index.Subject != nil { + if err := walkDescriptor(p, *index.Subject, live, visited); err != nil { + return nil, err + } + } } for _, digest := range extraRoots { diff --git a/lib/ocicachegc/gc_test.go b/lib/ocicachegc/gc_test.go index 48796676..55efbb8a 100644 --- a/lib/ocicachegc/gc_test.go +++ b/lib/ocicachegc/gc_test.go @@ -390,6 +390,55 @@ func TestCollectRecursesIntoSubject(t *testing.T) { } } +func TestCollectFollowsIndexSubject(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + + writeBlob := func(content []byte) string { + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(t, os.WriteFile(filepath.Join(blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest + } + + // Subject manifest: config + layer. + subjectConfig := []byte(`{"index-subject":true}`) + subjectLayer := []byte("index-subject-layer") + subjectConfigDigest := writeBlob(subjectConfig) + subjectLayerDigest := writeBlob(subjectLayer) + + subjectManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": subjectConfigDigest, "size": len(subjectConfig)}, + "layers": []map[string]any{{"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": subjectLayerDigest, "size": len(subjectLayer)}}, + } + subjectBytes, err := json.Marshal(subjectManifest) + require.NoError(t, err) + subjectDigest := writeBlob(subjectBytes) + + // index.json with no manifests[] entries but a top-level subject. + index := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": []any{}, + "subject": map[string]any{"mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": subjectDigest, "size": len(subjectBytes)}, + } + indexBytes, err := json.Marshal(index) + require.NoError(t, err) + require.NoError(t, os.WriteFile(p.OCICacheIndex(), indexBytes, 0o644)) + + c := newCollectorForTest(t, dataDir, time.Minute, time.Now()) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // subject manifest + its config + layer = 3 live blobs. + assert.Equal(t, 3, stats.LiveBlobs) + assert.Equal(t, 0, stats.DeletedBlobs) +} + // stubRoots is a RootsProvider that returns a fixed list of digests. type stubRoots struct{ digests []string } From 5554c9d61b8db2d6bc647741407c7b50adfda071 Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Thu, 7 May 2026 23:38:35 +0000 Subject: [PATCH 6/6] Add tracing and tune OCI cache GC observability Adds a tracer to the collector with spans around the sweep, the mark phase, and the blob sweep loop, plus span attributes capturing live, scanned, deleted, and skipped-recent counts. Records live blob count per successful sweep as a histogram metric so cache size is observable from metrics alone. Demotes the per-blob delete log to DEBUG (an ongoing maintenance event) and promotes the sweep summary to INFO only when blobs were actually deleted, leaving idle sweeps at DEBUG. --- cmd/api/main.go | 6 ++-- cmd/api/oci_cache_gc_test.go | 6 ++-- lib/ocicachegc/gc.go | 53 ++++++++++++++++++++++++++++++++++-- lib/ocicachegc/gc_test.go | 14 +++++----- lib/ocicachegc/metrics.go | 13 +++++++++ 5 files changed, 77 insertions(+), 15 deletions(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index b6917040..446d9c93 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -41,6 +41,7 @@ import ( nethttpmiddleware "github.com/oapi-codegen/nethttp-middleware" "github.com/riandyrn/otelchi" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) @@ -103,7 +104,7 @@ type ociCacheGCRunner interface { Run(ctx context.Context) error } -func configureOCICacheGC(cfg *config.Config, roots ocicachegc.RootsProvider, logger *slog.Logger, meter metric.Meter) (ociCacheGCRunner, error) { +func configureOCICacheGC(cfg *config.Config, roots ocicachegc.RootsProvider, logger *slog.Logger, meter metric.Meter, tracer trace.Tracer) (ociCacheGCRunner, error) { if cfg == nil || !cfg.Images.OCICacheGC.Enabled { return nil, nil } @@ -117,7 +118,7 @@ func configureOCICacheGC(cfg *config.Config, roots ocicachegc.RootsProvider, log return nil, fmt.Errorf("invalid images.oci_cache_gc.min_blob_age %q: %w", cfg.Images.OCICacheGC.MinBlobAge, err) } - return ocicachegc.NewCollector(paths.New(cfg.DataDir), interval, minBlobAge, roots, logger, meter) + return ocicachegc.NewCollector(paths.New(cfg.DataDir), interval, minBlobAge, roots, logger, meter, tracer) } func startOCICacheGC(grp *errgroup.Group, ctx context.Context, runner ociCacheGCRunner) bool { @@ -528,6 +529,7 @@ func run() error { app.Registry, logger, otelProvider.MeterFor(loglib.SubsystemImages), + otelProvider.TracerFor(loglib.SubsystemImages), ) if err != nil { return err diff --git a/cmd/api/oci_cache_gc_test.go b/cmd/api/oci_cache_gc_test.go index c7ac5d86..b68d9f7f 100644 --- a/cmd/api/oci_cache_gc_test.go +++ b/cmd/api/oci_cache_gc_test.go @@ -43,7 +43,7 @@ func loadTestConfig(t *testing.T) *config.Config { func TestConfigureOCICacheGCSkipsDisabledConfig(t *testing.T) { cfg := loadTestConfig(t) - runner, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + runner, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil, nil) if err != nil { t.Fatalf("configure disabled oci cache gc: %v", err) } @@ -58,7 +58,7 @@ func TestConfigureOCICacheGCBuildsCollectorWhenEnabled(t *testing.T) { cfg.Images.OCICacheGC.Interval = "2m" cfg.Images.OCICacheGC.MinBlobAge = "30s" - runner, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + runner, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil, nil) if err != nil { t.Fatalf("configure enabled oci cache gc: %v", err) } @@ -72,7 +72,7 @@ func TestConfigureOCICacheGCRejectsInvalidInterval(t *testing.T) { cfg.Images.OCICacheGC.Enabled = true cfg.Images.OCICacheGC.Interval = "0s" - if _, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil); err == nil { + if _, err := configureOCICacheGC(cfg, nil, slog.New(slog.NewTextHandler(io.Discard, nil)), nil, nil); err == nil { t.Fatalf("expected invalid oci cache gc interval to fail") } } diff --git a/lib/ocicachegc/gc.go b/lib/ocicachegc/gc.go index 2cfc110c..6e3baf93 100644 --- a/lib/ocicachegc/gc.go +++ b/lib/ocicachegc/gc.go @@ -28,7 +28,11 @@ import ( "time" "github.com/kernel/hypeman/lib/paths" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" ) // RootsProvider returns extra manifest digests (in "sha256:" form) @@ -46,6 +50,7 @@ type Collector struct { minBlobAge time.Duration logger *slog.Logger metrics *Metrics + tracer trace.Tracer roots RootsProvider now func() time.Time mu sync.Mutex @@ -65,7 +70,7 @@ type Stats struct { // that are currently being written by a concurrent pull or push. roots // may be nil; when non-nil it is consulted on every sweep for additional // manifest digests to mark live. -func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, roots RootsProvider, logger *slog.Logger, meter metric.Meter) (*Collector, error) { +func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, roots RootsProvider, logger *slog.Logger, meter metric.Meter, tracer trace.Tracer) (*Collector, error) { if p == nil { return nil, errors.New("paths is required") } @@ -78,11 +83,15 @@ func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, roots Root if logger == nil { logger = slog.Default() } + if tracer == nil { + tracer = otel.Tracer("hypeman/oci_cache_gc") + } c := &Collector{ paths: p, interval: interval, minBlobAge: minBlobAge, logger: logger.With("component", "oci_cache_gc"), + tracer: tracer, roots: roots, now: time.Now, } @@ -125,6 +134,9 @@ func (c *Collector) Collect(ctx context.Context) (Stats, error) { c.mu.Lock() defer c.mu.Unlock() + ctx, span := c.tracer.Start(ctx, "oci_cache_gc.sweep") + defer span.End() + start := c.now() stats, err := c.collect(ctx) status := "success" @@ -134,10 +146,24 @@ func (c *Collector) Collect(ctx context.Context) (Stats, error) { if c.metrics != nil { c.metrics.RecordSweep(ctx, status, c.now().Sub(start), stats) } + span.SetAttributes( + attribute.Int("live_blobs", stats.LiveBlobs), + attribute.Int("scanned_blobs", stats.ScannedBlobs), + attribute.Int("deleted_blobs", stats.DeletedBlobs), + attribute.Int64("deleted_bytes", stats.DeletedBytes), + attribute.Int("skipped_recent", stats.SkippedRecent), + ) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return stats, err } - c.logger.DebugContext(ctx, "oci cache gc sweep completed", + span.SetStatus(codes.Ok, "") + logFn := c.logger.DebugContext + if stats.DeletedBlobs > 0 { + logFn = c.logger.InfoContext + } + logFn(ctx, "oci cache gc sweep completed", "live_blobs", stats.LiveBlobs, "scanned_blobs", stats.ScannedBlobs, "deleted_blobs", stats.DeletedBlobs, @@ -161,11 +187,20 @@ func (c *Collector) collect(ctx context.Context) (Stats, error) { if c.roots != nil { extraRoots = c.roots.LiveCacheManifestDigests() } + _, markSpan := c.tracer.Start(ctx, "oci_cache_gc.mark") live, err := liveBlobs(c.paths, extraRoots) if err != nil { + markSpan.RecordError(err) + markSpan.SetStatus(codes.Error, err.Error()) + markSpan.End() return stats, fmt.Errorf("compute live set: %w", err) } stats.LiveBlobs = len(live) + markSpan.SetAttributes( + attribute.Int("live_blobs", stats.LiveBlobs), + attribute.Int("extra_roots", len(extraRoots)), + ) + markSpan.End() cutoff := c.now().Add(-c.minBlobAge) @@ -174,6 +209,18 @@ func (c *Collector) collect(ctx context.Context) (Stats, error) { return stats, fmt.Errorf("read blob dir: %w", err) } + ctx, sweepSpan := c.tracer.Start(ctx, "oci_cache_gc.sweep_blobs", + trace.WithAttributes(attribute.Int("blob_dir_entries", len(entries))), + ) + defer func() { + sweepSpan.SetAttributes( + attribute.Int("scanned_blobs", stats.ScannedBlobs), + attribute.Int("deleted_blobs", stats.DeletedBlobs), + attribute.Int("skipped_recent", stats.SkippedRecent), + ) + sweepSpan.End() + }() + for _, entry := range entries { if err := ctx.Err(); err != nil { return stats, err @@ -210,7 +257,7 @@ func (c *Collector) collect(ctx context.Context) (Stats, error) { } stats.DeletedBlobs++ stats.DeletedBytes += size - c.logger.InfoContext(ctx, "deleted unreferenced oci blob", "digest", name, "size", size) + c.logger.DebugContext(ctx, "deleted unreferenced oci blob", "digest", name, "size", size) } return stats, nil diff --git a/lib/ocicachegc/gc_test.go b/lib/ocicachegc/gc_test.go index 55efbb8a..906a4e4f 100644 --- a/lib/ocicachegc/gc_test.go +++ b/lib/ocicachegc/gc_test.go @@ -135,7 +135,7 @@ func (b *layoutBuilder) writeIndex() { func newCollectorForTest(t *testing.T, dataDir string, minBlobAge time.Duration, now time.Time) *Collector { t.Helper() - c, err := NewCollector(paths.New(dataDir), time.Hour, minBlobAge, nil, nil, nil) + c, err := NewCollector(paths.New(dataDir), time.Hour, minBlobAge, nil, nil, nil, nil) require.NoError(t, err) c.now = func() time.Time { return now } return c @@ -487,7 +487,7 @@ func TestCollectKeepsBlobsReachableFromExtraRoots(t *testing.T) { } // Without roots, every blob is unreachable and should be swept. - cWithout, err := NewCollector(p, time.Hour, time.Minute, nil, nil, nil) + cWithout, err := NewCollector(p, time.Hour, time.Minute, nil, nil, nil, nil) require.NoError(t, err) cWithout.now = func() time.Time { return time.Now() } statsWithout, err := cWithout.Collect(context.Background()) @@ -503,7 +503,7 @@ func TestCollectKeepsBlobsReachableFromExtraRoots(t *testing.T) { } roots := stubRoots{digests: []string{cacheDigest}} - cWith, err := NewCollector(p, time.Hour, time.Minute, roots, nil, nil) + cWith, err := NewCollector(p, time.Hour, time.Minute, roots, nil, nil, nil) require.NoError(t, err) cWith.now = func() time.Time { return time.Now() } statsWith, err := cWith.Collect(context.Background()) @@ -521,15 +521,15 @@ func TestNewCollectorValidatesArgs(t *testing.T) { dataDir := t.TempDir() p := paths.New(dataDir) - _, err := NewCollector(nil, time.Hour, time.Minute, nil, nil, nil) + _, err := NewCollector(nil, time.Hour, time.Minute, nil, nil, nil, nil) assert.Error(t, err) - _, err = NewCollector(p, 0, time.Minute, nil, nil, nil) + _, err = NewCollector(p, 0, time.Minute, nil, nil, nil, nil) assert.Error(t, err) - _, err = NewCollector(p, time.Hour, -time.Minute, nil, nil, nil) + _, err = NewCollector(p, time.Hour, -time.Minute, nil, nil, nil, nil) assert.Error(t, err) - _, err = NewCollector(p, time.Hour, time.Minute, nil, nil, nil) + _, err = NewCollector(p, time.Hour, time.Minute, nil, nil, nil, nil) assert.NoError(t, err) } diff --git a/lib/ocicachegc/metrics.go b/lib/ocicachegc/metrics.go index 9ba9843e..3e600079 100644 --- a/lib/ocicachegc/metrics.go +++ b/lib/ocicachegc/metrics.go @@ -15,6 +15,7 @@ type Metrics struct { sweepDuration metric.Float64Histogram deletedBlobs metric.Int64Counter deletedBytes metric.Int64Counter + liveBlobs metric.Int64Histogram } func newMetrics(meter metric.Meter) (*Metrics, error) { @@ -53,11 +54,20 @@ func newMetrics(meter metric.Meter) (*Metrics, error) { return nil, err } + liveBlobs, err := meter.Int64Histogram( + "hypeman_oci_cache_gc_live_blobs", + metric.WithDescription("Number of live blobs observed in the OCI cache per sweep"), + ) + if err != nil { + return nil, err + } + return &Metrics{ sweepsTotal: sweepsTotal, sweepDuration: sweepDuration, deletedBlobs: deletedBlobs, deletedBytes: deletedBytes, + liveBlobs: liveBlobs, }, nil } @@ -69,6 +79,9 @@ func (m *Metrics) RecordSweep(ctx context.Context, status string, duration time. attrs := metric.WithAttributes(attribute.String("status", status)) m.sweepsTotal.Add(ctx, 1, attrs) m.sweepDuration.Record(ctx, duration.Seconds(), attrs) + if status == "success" { + m.liveBlobs.Record(ctx, int64(stats.LiveBlobs)) + } if stats.DeletedBlobs > 0 { m.deletedBlobs.Add(ctx, int64(stats.DeletedBlobs)) m.deletedBytes.Add(ctx, stats.DeletedBytes)