feat: add io weight control on image operator#26
feat: add io weight control on image operator#26zijiren233 wants to merge 10 commits intolabring:v1.7from
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds IO weight control functionality to containerd's image operations (pull, unpack, commit) using cgroups v2. The feature allows prioritizing or deprioritizing IO operations through BFQ scheduler weights or io.weight configuration, enabling better resource management in multi-tenant or resource-constrained environments.
Key Changes:
- New
sys/blkiorunpackage implementing cgroups v2-based IO weight control with systemd slice management - Integration of IO weight control into image pull, unpack, and snapshot commit operations
- Configuration support for IO weight settings in containerd's config file
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| sys/blkiorun/config.go | Defines Config struct for IO weight configuration (10-1000 range) |
| sys/blkiorun/blkiorun_linux.go | Core Linux implementation with cgroup management, BFQ/io.weight conversion, and goroutine execution wrappers |
| sys/blkiorun/blkiorun_other.go | No-op implementation for non-Linux platforms |
| sys/blkiorun/blkiorun_test.go | Comprehensive test coverage for weight conversions and cgroup operations |
| services/server/config/config.go | Adds BlkioConfig struct to CgroupConfig for weight, slice path, and slice name configuration |
| cmd/containerd/command/main.go | Initializes blkiorun on containerd startup with configured settings |
| pull.go | Wraps pull operation with blkiorun.Go to execute in IO-weighted goroutine |
| pkg/unpack/unpacker.go | Wraps layer apply and fetch operations with blkiorun.Local for IO weight control |
| services/snapshots/service.go | Wraps snapshot commit with blkiorun.Go for IO weight control |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| bfqSupportedOnce.Do(func() { | ||
| bfqPath := filepath.Join(cgroupPath, "io.bfq.weight") | ||
| if _, err := os.Stat(bfqPath); err == nil { | ||
| bfqSupported = true | ||
| } | ||
| }) | ||
| return bfqSupported | ||
| } | ||
|
|
||
| // readIOWeight reads the current IO weight from cgroups. | ||
| // Returns BFQ weight if available, otherwise io.weight converted to BFQ range. |
There was a problem hiding this comment.
The isBFQSupported function uses sync.Once to set the bfqSupported global variable, but the cgroupPath parameter is passed in each call. If this function is called with different cgroupPath values, only the first path will be checked, and subsequent calls with different paths will return the cached result from the first call. This could lead to incorrect BFQ support detection if different cgroup paths have different BFQ availability.
| bfqSupportedOnce.Do(func() { | |
| bfqPath := filepath.Join(cgroupPath, "io.bfq.weight") | |
| if _, err := os.Stat(bfqPath); err == nil { | |
| bfqSupported = true | |
| } | |
| }) | |
| return bfqSupported | |
| } | |
| // readIOWeight reads the current IO weight from cgroups. | |
| // Returns BFQ weight if available, otherwise io.weight converted to BFQ range. | |
| bfqPath := filepath.Join(cgroupPath, "io.bfq.weight") | |
| if _, err := os.Stat(bfqPath); err == nil { | |
| return true | |
| } | |
| return false | |
| } | |
| // readIOWeight reads the current IO weight from cgroups. | |
| // Returns BFQ weight if available, otherwise io.weight converted to BFQ range. | |
| // Returns BFQ weight if available, otherwise io.weight converted to BFQ range. |
| _, err = blkiorun.Go(func() (struct{}, error) { | ||
| return struct{}{}, sn.Commit(ctx, cr.Name, cr.Key, opts...) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return nil, errdefs.ToGRPC(err) |
There was a problem hiding this comment.
The error from blkiorun.Go is assigned to err, but this shadows the original err variable. If the Commit operation succeeds but there's an error in the blkiorun.Go wrapper itself (which shouldn't happen since it's returning a struct{} and the actual error), this could lead to confusion. Consider using a different variable name or ensuring the error handling is clear.
| _, err = blkiorun.Go(func() (struct{}, error) { | |
| return struct{}{}, sn.Commit(ctx, cr.Name, cr.Key, opts...) | |
| }) | |
| if err != nil { | |
| return nil, errdefs.ToGRPC(err) | |
| _, commitErr := blkiorun.Go(func() (struct{}, error) { | |
| return struct{}{}, sn.Commit(ctx, cr.Name, cr.Key, opts...) | |
| }) | |
| if commitErr != nil { | |
| return nil, errdefs.ToGRPC(commitErr) |
| func Init(cfg Config, slicePath, sliceName string) error { | ||
| var initErr error | ||
|
|
||
| stateOnce.Do(func() { | ||
| s := &globalState{} | ||
| defer func() { state = s }() | ||
|
|
||
| if cfg.Weight == 0 { | ||
| log.L.Debug("blkiorun: disabled (weight=0)") | ||
| return | ||
| } | ||
|
|
||
| if cfg.Weight < BFQWeightMin || cfg.Weight > BFQWeightMax { | ||
| log.L.Warnf("blkiorun: weight %d out of range [%d, %d], disabled", cfg.Weight, BFQWeightMin, BFQWeightMax) | ||
| return | ||
| } | ||
|
|
||
| s.config = cfg | ||
| log.L.Infof("blkiorun: weight configured: %d", cfg.Weight) | ||
|
|
||
| if !isCgroupV2() { | ||
| log.L.Warn("blkiorun: cgroups v2 not available, disabled") | ||
| return | ||
| } | ||
|
|
||
| // Get containerd's cgroup path | ||
| var err error | ||
| s.containerdPath, err = getCurrentCgroupPath() | ||
| if err != nil { | ||
| initErr = fmt.Errorf("failed to get cgroup path: %w", err) | ||
| return | ||
| } | ||
| log.L.Debugf("blkiorun: containerd cgroup: %s", s.containerdPath) | ||
|
|
||
| var cgroupPath string | ||
| if slicePath != "" { | ||
| cgroupPath = slicePath | ||
| log.L.Debugf("blkiorun: using configured path: %s", cgroupPath) | ||
| } else { | ||
| // Create systemd slice | ||
| if sliceName == "" { | ||
| sliceName = DefaultSliceName | ||
| } | ||
| if !strings.HasSuffix(sliceName, ".slice") { | ||
| sliceName += ".slice" | ||
| } | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), SystemdTimeout) | ||
| defer cancel() | ||
|
|
||
| if err := createSlice(ctx, sliceName); err != nil { | ||
| log.L.WithError(err).Warnf("blkiorun: failed to create slice %s", sliceName) | ||
| return | ||
| } | ||
|
|
||
| cgroupPath = sliceCgroupPath(sliceName) | ||
| for i := 0; i < sliceWaitRetries; i++ { | ||
| if _, err := os.Stat(cgroupPath); err == nil { | ||
| break | ||
| } | ||
| time.Sleep(sliceWaitInterval) | ||
| } | ||
| } | ||
|
|
||
| // Verify io.weight is available | ||
| if _, err := os.Stat(filepath.Join(cgroupPath, "io.weight")); os.IsNotExist(err) { | ||
| log.L.Warn("blkiorun: io.weight not available") | ||
| return | ||
| } | ||
|
|
||
| // Enable io controller for children | ||
| if err := enableIOController(cgroupPath); err != nil { | ||
| log.L.WithError(err).Warn("blkiorun: failed to enable io controller") | ||
| return | ||
| } | ||
|
|
||
| // Apply default IO weight to slice | ||
| if err := applyConfig(cgroupPath, cfg); err != nil { | ||
| log.L.WithError(err).Warn("blkiorun: failed to apply config") | ||
| return | ||
| } | ||
|
|
||
| s.slicePath = cgroupPath | ||
| s.initialized = true | ||
| log.L.Infof("blkiorun: initialized at %s with weight %d", cgroupPath, cfg.Weight) | ||
| }) | ||
|
|
||
| return initErr | ||
| } |
There was a problem hiding this comment.
The Init function has comprehensive logic for initialization, error handling, and validation, but there's no test coverage for it. Consider adding tests for Init with various configurations (valid weight, invalid weight, missing cgroups v2, etc.) to ensure proper initialization behavior and error handling.
2ce31a4 to
fcd4181
Compare
Signed-off-by: zijiren233 <pyh1670605849@gmail.com>
3735eab to
66e1f4d
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Fallback to io.weight | ||
| data, err := os.ReadFile(filepath.Join(cgroupPath, "io.weight")) | ||
| if err != nil { | ||
| return BFQWeightDefault, nil // Return default if reading fails | ||
| } | ||
|
|
||
| fields := strings.Fields(string(bytes.TrimSpace(data))) | ||
| if len(fields) > 0 { | ||
| ioWeight, err := strconv.ParseUint(fields[len(fields)-1], 10, 64) | ||
| if err == nil { | ||
| return ConvertIOWeightToBFQ(ioWeight), nil | ||
| } | ||
| } | ||
|
|
||
| return BFQWeightDefault, nil |
There was a problem hiding this comment.
readIOWeight() silently returns BFQWeightDefault with a nil error when reads/parses fail. Since the function exposes an error return, it should propagate the underlying read/parse error (or drop the error return entirely) so callers can distinguish “default” from “couldn’t read current weight”.
| defer func(){ | ||
| err := cg.destroy() | ||
| if err != nil { | ||
| log.L.WithError(err).Error("blkiorun: failed to destroy cgroup") | ||
| } | ||
| }() |
There was a problem hiding this comment.
These deferred anonymous functions are not gofmt-formatted (e.g., defer func(){). Please run gofmt on this file; CI/lint typically expects canonical formatting (defer func() { ... }).
| } | ||
| return struct{}{}, nil | ||
| }) |
There was a problem hiding this comment.
There is trailing whitespace on the return struct{}{}, nil line. Please run gofmt (it will also remove trailing tabs/spaces).
| func TestWriteAndReadIOWeight(t *testing.T) { | ||
| if !isCgroupV2() { | ||
| t.Skip("Test requires cgroups v2") | ||
| } | ||
|
|
||
| cgroupPath, err := getCurrentCgroupPath() | ||
| if err != nil { | ||
| t.Skipf("Cannot get cgroup path: %v", err) | ||
| } | ||
|
|
||
| // Read original weight | ||
| originalWeight, err := readIOWeight(cgroupPath) | ||
| if err != nil { | ||
| t.Skipf("Cannot read IO weight: %v", err) | ||
| } | ||
| t.Logf("Original IO weight: %d", originalWeight) | ||
|
|
||
| // Try to write a different weight | ||
| testWeight := uint16(200) | ||
| if originalWeight == testWeight { | ||
| testWeight = 300 | ||
| } | ||
|
|
||
| err = writeIOWeight(cgroupPath, testWeight) | ||
| if err != nil { | ||
| t.Skipf("Cannot write IO weight (may need permissions): %v", err) | ||
| } |
There was a problem hiding this comment.
TestWriteAndReadIOWeight writes to the process’ current cgroup weight, which can have side effects on the test runner (and potentially other processes in the same cgroup) if it has permission. Prefer creating a dedicated temporary child cgroup for the test and cleaning it up, or mark this as an integration test that is skipped unless an explicit env var is set.
| // DoWithConfig executes fn in current goroutine with specified config. | ||
| func DoWithConfig[T any](cfg Config, fn func() (T, error)) (T, error) { | ||
| if cfg.Weight == 0 || !IsInitialized() { | ||
| return fn() | ||
| } |
There was a problem hiding this comment.
DoWithConfig accepts an arbitrary cfg.Weight but does not validate it (unlike Init). Passing a value outside 10-1000 can lead to invalid writes (and for 1-9 currently triggers underflow in ConvertBFQToIOWeight). Consider validating/clamping cfg.Weight in DoWithConfig/applyConfig and returning an error (or falling back to fn() without attempting cgroup changes) when the value is out of range.
| if !isCgroupV2() { | ||
| log.L.Warn("blkiorun: cgroups v2 not available") | ||
| } | ||
|
|
||
| // Get containerd's cgroup path | ||
| var err error | ||
| s.containerdPath, err = getCurrentCgroupPath() | ||
| if err != nil { | ||
| initErr = fmt.Errorf("failed to get cgroup path: %w", err) | ||
| return |
There was a problem hiding this comment.
On cgroup v1 systems, Init currently logs that cgroups v2 are unavailable but then continues and calls getCurrentCgroupPath(), which will return an error (it only parses the cgroup v2 “0::” entry). Since this feature is documented as “cgroups v2”, Init should return early (treat as disabled) when !isCgroupV2() to avoid surfacing an init error on v1 hosts.
| select { | ||
| case <-ch: | ||
| case <-time.After(SystemdTimeout): | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
| return nil |
There was a problem hiding this comment.
createSlice() returns nil even if it times out waiting for the systemd job result (the time.After(SystemdTimeout) case falls through without error). This can cause Init to proceed as if the slice was created when it wasn’t. Return a timeout error when the timeout branch fires (or rely solely on ctx’s deadline and treat ctx.Done() as failure).
| func ConvertBFQToIOWeight(bfqWeight uint16) uint64 { | ||
| if bfqWeight == 0 { | ||
| return 0 | ||
| } |
There was a problem hiding this comment.
ConvertBFQToIOWeight() will underflow when bfqWeight is between 1 and 9: (uint64(bfqWeight)-uint64(BFQWeightMin)) wraps to a huge value, producing an invalid io.weight. Clamp inputs below BFQWeightMin to BFQWeightMin (or return 0/error) and consider clamping above BFQWeightMax as well.
| } | |
| } | |
| if bfqWeight <= BFQWeightMin { | |
| return uint64(IOWeightMin) | |
| } | |
| if bfqWeight >= BFQWeightMax { | |
| return uint64(IOWeightMax) | |
| } |
No description provided.