Skip to content

Commit 4433dbd

Browse files
committed
Integrate resource quotas with scale up
# Conflicts: # cluster-autoscaler/core/utils/utils.go # cluster-autoscaler/resourcequotas/tracker.go
1 parent 4177783 commit 4433dbd

24 files changed

+578
-166
lines changed

cluster-autoscaler/core/autoscaler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
2929
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
3030
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
31+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
3233
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
3334
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
@@ -76,6 +77,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
7677
opts.DeleteOptions,
7778
opts.DrainabilityRules,
7879
opts.DraProvider,
80+
opts.QuotasProvider,
7981
), nil
8082
}
8183

@@ -142,6 +144,10 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
142144
}
143145
opts.ExpanderStrategy = expanderStrategy
144146
}
147+
if opts.QuotasProvider == nil {
148+
cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(opts.CloudProvider)
149+
opts.QuotasProvider = resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider})
150+
}
145151

146152
return nil
147153
}

cluster-autoscaler/core/options/autoscaler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/expander"
2828
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
2929
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
30+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3031
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
3233
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
@@ -57,4 +58,5 @@ type AutoscalerOptions struct {
5758
DeleteOptions options.NodeDeleteOptions
5859
DrainabilityRules rules.Rules
5960
DraProvider *draprovider.Provider
61+
QuotasProvider resourcequotas.Provider
6062
}

cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import (
2727
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/equivalence"
2929
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource"
30+
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
3031
"k8s.io/autoscaler/cluster-autoscaler/estimator"
3132
"k8s.io/autoscaler/cluster-autoscaler/expander"
3233
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3334
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
3435
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
3536
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
3637
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
38+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3739
"k8s.io/autoscaler/cluster-autoscaler/simulator"
3840
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3941
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@@ -47,6 +49,7 @@ type ScaleUpOrchestrator struct {
4749
autoscalingCtx *ca_context.AutoscalingContext
4850
processors *ca_processors.AutoscalingProcessors
4951
resourceManager *resource.Manager
52+
quotasTrackerFactory *resourcequotas.TrackerFactory
5053
clusterStateRegistry *clusterstate.ClusterStateRegistry
5154
scaleUpExecutor *scaleUpExecutor
5255
estimatorBuilder estimator.EstimatorBuilder
@@ -68,6 +71,7 @@ func (o *ScaleUpOrchestrator) Initialize(
6871
clusterStateRegistry *clusterstate.ClusterStateRegistry,
6972
estimatorBuilder estimator.EstimatorBuilder,
7073
taintConfig taints.TaintConfig,
74+
quotasTrackerFactory *resourcequotas.TrackerFactory,
7175
) {
7276
o.autoscalingCtx = autoscalingCtx
7377
o.processors = processors
@@ -76,6 +80,7 @@ func (o *ScaleUpOrchestrator) Initialize(
7680
o.taintConfig = taintConfig
7781
o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
7882
o.scaleUpExecutor = newScaleUpExecutor(autoscalingCtx, processors.ScaleStateNotifier, o.processors.AsyncNodeGroupStateChecker)
83+
o.quotasTrackerFactory = quotasTrackerFactory
7984
o.initialized = true
8085
}
8186

@@ -121,15 +126,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
121126
// Initialise binpacking limiter.
122127
o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingCtx, nodeGroups)
123128

124-
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
125-
if aErr != nil {
126-
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
129+
tracker, err := o.newQuotasTracker()
130+
if err != nil {
131+
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
127132
}
128133

129134
now := time.Now()
130135

131136
// Filter out invalid node groups
132-
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, len(nodes)+len(upcomingNodes), now)
137+
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, tracker, len(nodes)+len(upcomingNodes), now)
133138

134139
// Mark skipped node groups as processed.
135140
for nodegroupID := range skippedNodeGroups {
@@ -194,7 +199,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
194199
return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
195200
}
196201

197-
newNodes, aErr = o.applyLimits(newNodes, resourcesLeft, bestOption.NodeGroup, nodeInfos)
202+
newNodes, aErr = o.applyLimits(newNodes, tracker, bestOption.NodeGroup, nodeInfos)
198203
if aErr != nil {
199204
return status.UpdateScaleUpError(
200205
&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods},
@@ -283,14 +288,18 @@ func (o *ScaleUpOrchestrator) ScaleUp(
283288
}, nil
284289
}
285290

286-
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
291+
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
287292
nodeInfo, found := nodeInfos[nodeGroup.Id()]
288293
if !found {
289294
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
290295
klog.Errorf("No node info for: %s", nodeGroup.Id())
291296
return 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for best expansion option!")
292297
}
293-
return o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodes, resourcesLeft, nodeInfo, nodeGroup)
298+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), newNodes)
299+
if err != nil {
300+
return 0, errors.ToAutoscalerError(errors.InternalError, err)
301+
}
302+
return checkResult.AllowedDelta, nil
294303
}
295304

296305
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
@@ -309,9 +318,9 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
309318
nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups()
310319
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)
311320

312-
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
313-
if aErr != nil {
314-
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
321+
tracker, err := o.newQuotasTracker()
322+
if err != nil {
323+
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
315324
}
316325

317326
for _, ng := range nodeGroups {
@@ -342,17 +351,18 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
342351
continue
343352
}
344353

345-
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo, 1); skipReason != nil {
354+
if skipReason := o.IsNodeGroupResourceExceeded(tracker, ng, nodeInfo, 1); skipReason != nil {
346355
klog.Warningf("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason)
347356
continue
348357
}
349358

350359
newNodeCount := ng.MinSize() - targetSize
351-
newNodeCount, err = o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodeCount, resourcesLeft, nodeInfo, ng)
360+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, ng, nodeInfo.Node(), newNodeCount)
352361
if err != nil {
353362
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err)
354363
continue
355364
}
365+
newNodeCount = checkResult.AllowedDelta
356366

357367
newNodeCount, err = o.GetCappedNewNodeCount(newNodeCount, targetSize)
358368
if err != nil {
@@ -397,7 +407,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
397407
func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
398408
nodeGroups []cloudprovider.NodeGroup,
399409
nodeInfos map[string]*framework.NodeInfo,
400-
resourcesLeft resource.Limits,
410+
tracker *resourcequotas.Tracker,
401411
currentNodeCount int,
402412
now time.Time,
403413
) ([]cloudprovider.NodeGroup, map[string]status.Reasons) {
@@ -441,7 +451,7 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
441451
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
442452
continue
443453
}
444-
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo, numNodes); skipReason != nil {
454+
if skipReason := o.IsNodeGroupResourceExceeded(tracker, nodeGroup, nodeInfo, numNodes); skipReason != nil {
445455
skippedNodeGroups[nodeGroup.Id()] = skipReason
446456
continue
447457
}
@@ -664,37 +674,35 @@ func (o *ScaleUpOrchestrator) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.
664674
}
665675

666676
// IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided.
667-
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
668-
resourcesDelta, err := o.resourceManager.DeltaForNode(o.autoscalingCtx, nodeInfo, nodeGroup)
677+
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
678+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), numNodes)
669679
if err != nil {
670-
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
680+
klog.Errorf("Skipping node group %s; error checking resource quotas: %v", nodeGroup.Id(), err)
671681
return NotReadyReason
672682
}
673683

674-
for resource, delta := range resourcesDelta {
675-
resourcesDelta[resource] = delta * int64(numNodes)
676-
}
677-
678-
checkResult := resource.CheckDeltaWithinLimits(resourcesLeft, resourcesDelta)
679-
if checkResult.Exceeded {
680-
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources)
681-
for _, resource := range checkResult.ExceededResources {
682-
switch resource {
683-
case cloudprovider.ResourceNameCores:
684-
metrics.RegisterSkippedScaleUpCPU()
685-
case cloudprovider.ResourceNameMemory:
686-
metrics.RegisterSkippedScaleUpMemory()
687-
default:
688-
continue
684+
if checkResult.Exceeded() {
685+
for _, quota := range checkResult.ExceededQuotas {
686+
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), quota.ExceededResources)
687+
for _, resource := range quota.ExceededResources {
688+
switch resource {
689+
case cloudprovider.ResourceNameCores:
690+
metrics.RegisterSkippedScaleUpCPU()
691+
case cloudprovider.ResourceNameMemory:
692+
metrics.RegisterSkippedScaleUpMemory()
693+
default:
694+
continue
695+
}
689696
}
690697
}
691-
return NewMaxResourceLimitReached(checkResult.ExceededResources)
698+
return NewMaxResourceLimitReached(checkResult.ExceededQuotas)
692699
}
693700
return nil
694701
}
695702

696703
// GetCappedNewNodeCount caps resize according to cluster wide node count limit.
697704
func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) {
705+
// TODO: port MaxNodesTotal to a resource quota
698706
if o.autoscalingCtx.MaxNodesTotal > 0 && newNodeCount+currentNodeCount > o.autoscalingCtx.MaxNodesTotal {
699707
klog.V(1).Infof("Capping size to max cluster total size (%d)", o.autoscalingCtx.MaxNodesTotal)
700708
newNodeCount = o.autoscalingCtx.MaxNodesTotal - currentNodeCount
@@ -787,6 +795,22 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
787795
return validSimilarNodeGroups
788796
}
789797

798+
func (o *ScaleUpOrchestrator) newQuotasTracker() (*resourcequotas.Tracker, error) {
799+
var nodes []*apiv1.Node
800+
nodeInfos, err := o.autoscalingCtx.ClusterSnapshot.ListNodeInfos()
801+
if err != nil {
802+
return nil, err
803+
}
804+
for _, nodeInfo := range nodeInfos {
805+
node := nodeInfo.Node()
806+
if utils.IsVirtualKubeletNode(node) {
807+
continue
808+
}
809+
nodes = append(nodes, nodeInfo.Node())
810+
}
811+
return o.quotasTrackerFactory.NewQuotasTracker(o.autoscalingCtx, nodes)
812+
}
813+
790814
func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool {
791815
schedulableSamplePods := make(map[*apiv1.Pod]bool)
792816
for _, podGroup := range similarPodGroups {

0 commit comments

Comments
 (0)