Skip to content

Commit ab08e9a

Browse files
authored
Merge pull request kubernetes#5131 from x13n/scaledown2
Allow simulator to persist changes in cluster snapshot
2 parents b042aae + 2854870 commit ab08e9a

File tree

6 files changed

+387
-291
lines changed

6 files changed

+387
-291
lines changed

cluster-autoscaler/core/scaledown/legacy/legacy.go

Lines changed: 16 additions & 197 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@ import (
2424
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2525
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
2626
"k8s.io/autoscaler/cluster-autoscaler/context"
27-
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
2827
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
2928
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
29+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3030
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
31-
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
3231
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3332
"k8s.io/autoscaler/cluster-autoscaler/processors"
34-
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
3533
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
3634
"k8s.io/autoscaler/cluster-autoscaler/simulator"
3735
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
@@ -47,187 +45,6 @@ import (
4745
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
4846
)
4947

50-
type scaleDownResourcesLimits map[string]int64
51-
type scaleDownResourcesDelta map[string]int64
52-
53-
// used as a value in scaleDownResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider
54-
const scaleDownLimitUnknown = math.MinInt64
55-
56-
func (sd *ScaleDown) computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *cloudprovider.ResourceLimiter, cp cloudprovider.CloudProvider, timestamp time.Time) scaleDownResourcesLimits {
57-
totalCores, totalMem := calculateScaleDownCoresMemoryTotal(nodes, timestamp)
58-
59-
var totalResources map[string]int64
60-
var totalResourcesErr error
61-
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
62-
totalResources, totalResourcesErr = sd.calculateScaleDownCustomResourcesTotal(nodes, cp, timestamp)
63-
}
64-
65-
resultScaleDownLimits := make(scaleDownResourcesLimits)
66-
for _, resource := range resourceLimiter.GetResources() {
67-
min := resourceLimiter.GetMin(resource)
68-
69-
// we put only actual limits into final map. No entry means no limit.
70-
if min > 0 {
71-
switch {
72-
case resource == cloudprovider.ResourceNameCores:
73-
resultScaleDownLimits[resource] = computeAboveMin(totalCores, min)
74-
case resource == cloudprovider.ResourceNameMemory:
75-
resultScaleDownLimits[resource] = computeAboveMin(totalMem, min)
76-
case cloudprovider.IsCustomResource(resource):
77-
if totalResourcesErr != nil {
78-
resultScaleDownLimits[resource] = scaleDownLimitUnknown
79-
} else {
80-
resultScaleDownLimits[resource] = computeAboveMin(totalResources[resource], min)
81-
}
82-
default:
83-
klog.Errorf("Scale down limits defined for unsupported resource '%s'", resource)
84-
}
85-
}
86-
}
87-
return resultScaleDownLimits
88-
}
89-
90-
func computeAboveMin(total int64, min int64) int64 {
91-
if total > min {
92-
return total - min
93-
}
94-
return 0
95-
96-
}
97-
98-
func calculateScaleDownCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64, int64) {
99-
var coresTotal, memoryTotal int64
100-
for _, node := range nodes {
101-
if actuation.IsNodeBeingDeleted(node, timestamp) {
102-
// Nodes being deleted do not count towards total cluster resources
103-
continue
104-
}
105-
cores, memory := core_utils.GetNodeCoresAndMemory(node)
106-
107-
coresTotal += cores
108-
memoryTotal += memory
109-
}
110-
111-
return coresTotal, memoryTotal
112-
}
113-
114-
func (sd *ScaleDown) calculateScaleDownCustomResourcesTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProvider, timestamp time.Time) (map[string]int64, error) {
115-
result := make(map[string]int64)
116-
ngCache := make(map[string][]customresources.CustomResourceTarget)
117-
for _, node := range nodes {
118-
if actuation.IsNodeBeingDeleted(node, timestamp) {
119-
// Nodes being deleted do not count towards total cluster resources
120-
continue
121-
}
122-
nodeGroup, err := cp.NodeGroupForNode(node)
123-
if err != nil {
124-
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("can not get node group for node %v when calculating cluster gpu usage", node.Name)
125-
}
126-
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
127-
// We do not trust cloud providers to return properly constructed nil for interface type - hence the reflection check.
128-
// See https://golang.org/doc/faq#nil_error
129-
// TODO[lukaszos] consider creating cloud_provider sanitizer which will wrap cloud provider and ensure sane behaviour.
130-
nodeGroup = nil
131-
}
132-
133-
var resourceTargets []customresources.CustomResourceTarget
134-
var cacheHit bool
135-
136-
if nodeGroup != nil {
137-
resourceTargets, cacheHit = ngCache[nodeGroup.Id()]
138-
}
139-
if !cacheHit {
140-
resourceTargets, err = sd.processors.CustomResourcesProcessor.GetNodeResourceTargets(sd.context, node, nodeGroup)
141-
if err != nil {
142-
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("can not get gpu count for node %v when calculating cluster gpu usage")
143-
}
144-
if nodeGroup != nil {
145-
ngCache[nodeGroup.Id()] = resourceTargets
146-
}
147-
}
148-
149-
for _, resourceTarget := range resourceTargets {
150-
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
151-
continue
152-
}
153-
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount
154-
}
155-
}
156-
157-
return result, nil
158-
}
159-
160-
func noScaleDownLimitsOnResources() scaleDownResourcesLimits {
161-
return nil
162-
}
163-
164-
func copyScaleDownResourcesLimits(source scaleDownResourcesLimits) scaleDownResourcesLimits {
165-
copy := scaleDownResourcesLimits{}
166-
for k, v := range source {
167-
copy[k] = v
168-
}
169-
return copy
170-
}
171-
172-
func (sd *ScaleDown) computeScaleDownResourcesDelta(cp cloudprovider.CloudProvider, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, resourcesWithLimits []string) (scaleDownResourcesDelta, errors.AutoscalerError) {
173-
resultScaleDownDelta := make(scaleDownResourcesDelta)
174-
175-
nodeCPU, nodeMemory := core_utils.GetNodeCoresAndMemory(node)
176-
resultScaleDownDelta[cloudprovider.ResourceNameCores] = nodeCPU
177-
resultScaleDownDelta[cloudprovider.ResourceNameMemory] = nodeMemory
178-
179-
if cloudprovider.ContainsCustomResources(resourcesWithLimits) {
180-
resourceTargets, err := sd.processors.CustomResourcesProcessor.GetNodeResourceTargets(sd.context, node, nodeGroup)
181-
if err != nil {
182-
return scaleDownResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node %v custom resources: %v", node.Name)
183-
}
184-
for _, resourceTarget := range resourceTargets {
185-
resultScaleDownDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount
186-
}
187-
}
188-
return resultScaleDownDelta, nil
189-
}
190-
191-
type scaleDownLimitsCheckResult struct {
192-
exceeded bool
193-
exceededResources []string
194-
}
195-
196-
func scaleDownLimitsNotExceeded() scaleDownLimitsCheckResult {
197-
return scaleDownLimitsCheckResult{false, []string{}}
198-
}
199-
200-
func (limits *scaleDownResourcesLimits) checkScaleDownDeltaWithinLimits(delta scaleDownResourcesDelta) scaleDownLimitsCheckResult {
201-
exceededResources := sets.NewString()
202-
for resource, resourceDelta := range delta {
203-
resourceLeft, found := (*limits)[resource]
204-
if found {
205-
if (resourceDelta > 0) && (resourceLeft == scaleDownLimitUnknown || resourceDelta > resourceLeft) {
206-
exceededResources.Insert(resource)
207-
}
208-
}
209-
}
210-
if len(exceededResources) > 0 {
211-
return scaleDownLimitsCheckResult{true, exceededResources.List()}
212-
}
213-
214-
return scaleDownLimitsNotExceeded()
215-
}
216-
217-
func (limits *scaleDownResourcesLimits) tryDecrementLimitsByDelta(delta scaleDownResourcesDelta) scaleDownLimitsCheckResult {
218-
result := limits.checkScaleDownDeltaWithinLimits(delta)
219-
if result.exceeded {
220-
return result
221-
}
222-
for resource, resourceDelta := range delta {
223-
resourceLeft, found := (*limits)[resource]
224-
if found {
225-
(*limits)[resource] = resourceLeft - resourceDelta
226-
}
227-
}
228-
return scaleDownLimitsNotExceeded()
229-
}
230-
23148
// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
23249
type ScaleDown struct {
23350
context *context.AutoscalingContext
@@ -242,12 +59,13 @@ type ScaleDown struct {
24259
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
24360
removalSimulator *simulator.RemovalSimulator
24461
eligibilityChecker *eligibility.Checker
62+
resourceLimitsFinder *resource.LimitsFinder
24563
}
24664

24765
// NewScaleDown builds new ScaleDown object.
24866
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDown {
24967
usageTracker := simulator.NewUsageTracker()
250-
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker)
68+
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, false)
25169
unremovableNodes := unremovable.NewNodes()
25270
return &ScaleDown{
25371
context: context,
@@ -262,6 +80,7 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
26280
nodeDeletionTracker: ndt,
26381
removalSimulator: removalSimulator,
26482
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
83+
resourceLimitsFinder: resource.NewLimitsFinder(processors.CustomResourcesProcessor),
26584
}
26685
}
26786

@@ -513,7 +332,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
513332
return nil, nil, status.ScaleDownError, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
514333
}
515334

516-
scaleDownResourcesLeft := sd.computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)
335+
scaleDownResourcesLeft := sd.resourceLimitsFinder.LimitsLeft(sd.context, nodesWithoutMaster, resourceLimiter, currentTime)
517336

518337
nodeGroupSize := utils.GetNodeGroupSizeMap(sd.context.CloudProvider)
519338
resourcesWithLimits := resourceLimiter.GetResources()
@@ -588,18 +407,18 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
588407
continue
589408
}
590409

591-
scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
410+
scaleDownResourcesDelta, err := sd.resourceLimitsFinder.DeltaForNode(sd.context, node, nodeGroup, resourcesWithLimits)
592411
if err != nil {
593412
klog.Errorf("Error getting node resources: %v", err)
594413
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
595414
continue
596415
}
597416

598-
checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
599-
if checkResult.exceeded {
600-
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.exceededResources)
417+
checkResult := scaleDownResourcesLeft.CheckDeltaWithinLimits(scaleDownResourcesDelta)
418+
if checkResult.Exceeded() {
419+
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources)
601420
sd.unremovableNodes.AddReason(node, simulator.MinimalResourceLimitExceeded)
602-
for _, resource := range checkResult.exceededResources {
421+
for _, resource := range checkResult.ExceededResources {
603422
switch resource {
604423
case cloudprovider.ResourceNameCores:
605424
metrics.RegisterSkippedScaleDownCPU()
@@ -674,18 +493,18 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration
674493
}
675494

676495
func (sd *ScaleDown) getEmptyNodesToRemoveNoResourceLimits(candidates []string, timestamp time.Time) []simulator.NodeToBeRemoved {
677-
return sd.getEmptyNodesToRemove(candidates, noScaleDownLimitsOnResources(), timestamp)
496+
return sd.getEmptyNodesToRemove(candidates, resource.NoLimits(), timestamp)
678497
}
679498

680499
// This functions finds empty nodes among passed candidates and returns a list of empty nodes
681500
// that can be deleted at the same time.
682-
func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits scaleDownResourcesLimits,
501+
func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits resource.Limits,
683502
timestamp time.Time) []simulator.NodeToBeRemoved {
684503

685504
emptyNodes := sd.removalSimulator.FindEmptyNodesToRemove(candidates, timestamp)
686505
availabilityMap := make(map[string]int)
687506
nodesToRemove := make([]simulator.NodeToBeRemoved, 0)
688-
resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter
507+
resourcesLimitsCopy := resourcesLimits.DeepCopy() // we do not want to modify input parameter
689508
resourcesNames := sets.StringKeySet(resourcesLimits).List()
690509
for _, nodeName := range emptyNodes {
691510
nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
@@ -719,13 +538,13 @@ func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits
719538
availabilityMap[nodeGroup.Id()] = available
720539
}
721540
if available > 0 {
722-
resourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesNames)
541+
resourcesDelta, err := sd.resourceLimitsFinder.DeltaForNode(sd.context, node, nodeGroup, resourcesNames)
723542
if err != nil {
724543
klog.Errorf("Error: %v", err)
725544
continue
726545
}
727-
checkResult := resourcesLimitsCopy.tryDecrementLimitsByDelta(resourcesDelta)
728-
if checkResult.exceeded {
546+
checkResult := resourcesLimitsCopy.TryDecrementBy(resourcesDelta)
547+
if checkResult.Exceeded() {
729548
continue
730549
}
731550
available--

cluster-autoscaler/core/scaledown/legacy/legacy_test.go

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,37 +1249,6 @@ func getCountOfChan(c chan string) int {
12491249
}
12501250
}
12511251

1252-
func TestCalculateCoresAndMemoryTotal(t *testing.T) {
1253-
nodeConfigs := []NodeConfig{
1254-
{"n1", 2000, 7500 * utils.MiB, 0, true, "ng1"},
1255-
{"n2", 2000, 7500 * utils.MiB, 0, true, "ng1"},
1256-
{"n3", 2000, 7500 * utils.MiB, 0, true, "ng1"},
1257-
{"n4", 12000, 8000 * utils.MiB, 0, true, "ng1"},
1258-
{"n5", 16000, 7500 * utils.MiB, 0, true, "ng1"},
1259-
{"n6", 8000, 6000 * utils.MiB, 0, true, "ng1"},
1260-
{"n7", 6000, 16000 * utils.MiB, 0, true, "ng1"},
1261-
}
1262-
nodes := make([]*apiv1.Node, len(nodeConfigs))
1263-
for i, n := range nodeConfigs {
1264-
node := BuildTestNode(n.Name, n.Cpu, n.Memory)
1265-
SetNodeReadyState(node, n.Ready, time.Now())
1266-
nodes[i] = node
1267-
}
1268-
1269-
nodes[6].Spec.Taints = []apiv1.Taint{
1270-
{
1271-
Key: deletetaint.ToBeDeletedTaint,
1272-
Value: fmt.Sprint(time.Now().Unix()),
1273-
Effect: apiv1.TaintEffectNoSchedule,
1274-
},
1275-
}
1276-
1277-
coresTotal, memoryTotal := calculateScaleDownCoresMemoryTotal(nodes, time.Now())
1278-
1279-
assert.Equal(t, int64(42), coresTotal)
1280-
assert.Equal(t, int64(44000*utils.MiB), memoryTotal)
1281-
}
1282-
12831252
func TestFilterOutMasters(t *testing.T) {
12841253
nodeConfigs := []NodeConfig{
12851254
{"n1", 2000, 4000, 0, false, "ng1"},
@@ -1333,55 +1302,6 @@ func TestFilterOutMasters(t *testing.T) {
13331302
assertEqualSet(t, []string{"n1", "n2", "n4", "n5", "n6"}, withoutMastersNames)
13341303
}
13351304

1336-
func TestCheckScaleDownDeltaWithinLimits(t *testing.T) {
1337-
type testcase struct {
1338-
limits scaleDownResourcesLimits
1339-
delta scaleDownResourcesDelta
1340-
exceededResources []string
1341-
}
1342-
tests := []testcase{
1343-
{
1344-
limits: scaleDownResourcesLimits{"a": 10},
1345-
delta: scaleDownResourcesDelta{"a": 10},
1346-
exceededResources: []string{},
1347-
},
1348-
{
1349-
limits: scaleDownResourcesLimits{"a": 10},
1350-
delta: scaleDownResourcesDelta{"a": 11},
1351-
exceededResources: []string{"a"},
1352-
},
1353-
{
1354-
limits: scaleDownResourcesLimits{"a": 10},
1355-
delta: scaleDownResourcesDelta{"b": 10},
1356-
exceededResources: []string{},
1357-
},
1358-
{
1359-
limits: scaleDownResourcesLimits{"a": scaleDownLimitUnknown},
1360-
delta: scaleDownResourcesDelta{"a": 0},
1361-
exceededResources: []string{},
1362-
},
1363-
{
1364-
limits: scaleDownResourcesLimits{"a": scaleDownLimitUnknown},
1365-
delta: scaleDownResourcesDelta{"a": 1},
1366-
exceededResources: []string{"a"},
1367-
},
1368-
{
1369-
limits: scaleDownResourcesLimits{"a": 10, "b": 20, "c": 30},
1370-
delta: scaleDownResourcesDelta{"a": 11, "b": 20, "c": 31},
1371-
exceededResources: []string{"a", "c"},
1372-
},
1373-
}
1374-
1375-
for _, test := range tests {
1376-
checkResult := test.limits.checkScaleDownDeltaWithinLimits(test.delta)
1377-
if len(test.exceededResources) == 0 {
1378-
assert.Equal(t, scaleDownLimitsNotExceeded(), checkResult)
1379-
} else {
1380-
assert.Equal(t, scaleDownLimitsCheckResult{true, test.exceededResources}, checkResult)
1381-
}
1382-
}
1383-
}
1384-
13851305
func generateReplicaSets() []*appsv1.ReplicaSet {
13861306
replicas := int32(5)
13871307
return []*appsv1.ReplicaSet{

0 commit comments

Comments
 (0)