Skip to content

Commit cc7b511

Browse files
authored
Re-visit LPT evaluation as it was violating etcd size limits (#6)
* Re-visit LPT evaluation as it was violating etcd size limits
1 parent cf74510 commit cc7b511

6 files changed

+82
-147
lines changed

api/autoscaler/v1alpha1/mostwantedtwophasehysteresisevaluation_types.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,20 @@ type MostWantedTwoPhaseHysteresisEvaluationSpec struct {
3333
// Older historical records are always pruned.
3434
// +kubebuilder:validation:Required
3535
StabilizationPeriod metav1.Duration `json:"stabilizationPeriod,omitempty"`
36-
37-
// MinimumSampleSize is the minimum number of samples to consider before first evaluating.
38-
// +kubebuilder:validation:Required
39-
MinimumSampleSize int32 `json:"minimumSampleSize,omitempty"`
4036
}
4137

4238
type MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord struct {
43-
// Timestamp is the time at which the record was created.
39+
// Timestamp is the time at which the record was last seen.
4440
// +kubebuilder:validation:Required
4541
Timestamp metav1.Time `json:"timestamp,omitempty"`
4642

4743
// Replicas is the partition as it was seen at this moment in time.
4844
// +kubebuilder:validation:Required
4945
Replicas common.ReplicaList `json:"replicas,omitempty"`
46+
47+
// SeenTimes is the counter of how many times have this record been seen.
48+
// +kubebuilder:validation:Required
49+
SeenTimes int32 `json:"seenTimes,omitempty"`
5050
}
5151

5252
// MostWantedTwoPhaseHysteresisEvaluationStatus defines the observed state of MostWantedTwoPhaseHysteresisEvaluation.
@@ -57,7 +57,6 @@ type MostWantedTwoPhaseHysteresisEvaluationStatus struct {
5757
History []MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord `json:"history,omitempty"`
5858

5959
// LastEvaluationTimestamp is the time at which the last evaluation was performed.
60-
6160
LastEvaluationTimestamp *metav1.Time `json:"lastEvaluationTimestamp,omitempty"`
6261

6362
// Projection shows what the partitioning choice would have been if evaluation was performed during last poll.

config/crd/bases/autoscaler.argoproj.io_mostwantedtwophasehysteresisevaluations.yaml

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ spec:
5454
description: MostWantedTwoPhaseHysteresisEvaluationSpec defines the desired
5555
state of MostWantedTwoPhaseHysteresisEvaluation.
5656
properties:
57-
minimumSampleSize:
58-
description: MinimumSampleSize is the minimum number of samples to
59-
consider before first evaluating.
60-
format: int32
61-
type: integer
6257
partitionProviderRef:
6358
description: PartitionProviderRef is the reference to the partition
6459
provider.
@@ -89,7 +84,6 @@ spec:
8984
Older historical records are always pruned.
9085
type: string
9186
required:
92-
- minimumSampleSize
9387
- partitionProviderRef
9488
- pollingPeriod
9589
- stabilizationPeriod
@@ -257,16 +251,25 @@ spec:
257251
- totalLoadDisplayValue
258252
type: object
259253
type: array
254+
seenTimes:
255+
description: SeenTimes is the counter of how many times have
256+
this record been seen.
257+
format: int32
258+
type: integer
260259
timestamp:
261-
description: Timestamp is the time at which the record was created.
260+
description: Timestamp is the time at which the record was last
261+
seen.
262262
format: date-time
263263
type: string
264264
required:
265265
- replicas
266+
- seenTimes
266267
- timestamp
267268
type: object
268269
type: array
269270
lastEvaluationTimestamp:
271+
description: LastEvaluationTimestamp is the time at which the last
272+
evaluation was performed.
270273
format: date-time
271274
type: string
272275
projection:

config/default-scaling-strategy/autoscaler_v1alpha1_mostwantedtwophasehysteresisevaluation.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,3 @@ spec:
1111
name: default
1212
pollingPeriod: 1h
1313
stabilizationPeriod: 24h
14-
minimumSampleSize: 5

config/e2e/samples/autoscaler_v1alpha1_mostwantedtwophasehysteresisevaluation.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,3 @@ spec:
1111
name: sample
1212
pollingPeriod: 5s
1313
stabilizationPeriod: 1m
14-
minimumSampleSize: 3

internal/controller/autoscaler/mostwantedtwophasehysteresisevaluation_controller.go

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -204,56 +204,58 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
204204
replicas := partitionProvider.GetPartitionProviderStatus().Replicas
205205
log.V(2).Info("Currently reported replicas by partition provider", "count", len(replicas))
206206

207+
seenBefore := false
208+
for i, record := range evaluation.Status.History {
209+
if record.Replicas.SerializeToString() == replicas.SerializeToString() {
210+
seenBefore = true
211+
evaluation.Status.History[i].SeenTimes++
212+
evaluation.Status.History[i].Timestamp = metav1.Now()
213+
break
214+
}
215+
}
216+
if !seenBefore {
217+
evaluation.Status.History = append(evaluation.Status.History,
218+
autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{
219+
Timestamp: metav1.Now(),
220+
Replicas: replicas,
221+
SeenTimes: 1,
222+
},
223+
)
224+
}
225+
207226
// Maintain the history
208-
evaluation.Status.History = append(evaluation.Status.History,
209-
autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{
210-
Timestamp: metav1.Now(),
211-
Replicas: replicas,
212-
})
213227
cleanHistory := []autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
214228
for _, record := range evaluation.Status.History {
215229
if record.Timestamp.Add(evaluation.Spec.StabilizationPeriod.Duration).After(time.Now()) {
216230
cleanHistory = append(cleanHistory, record)
217231
}
218232
}
219-
log.V(1).Info("Trim the history", "from", len(evaluation.Status.History), "to", len(cleanHistory))
220-
evaluation.Status.History = cleanHistory
221-
if len(evaluation.Status.History) < int(evaluation.Spec.MinimumSampleSize) {
222-
err := fmt.Errorf("Minimum sample size not reached")
223-
log.Info(err.Error())
224-
meta.SetStatusCondition(&evaluation.Status.Conditions, metav1.Condition{
225-
Type: StatusTypeReady,
226-
Status: metav1.ConditionFalse,
227-
Reason: "MinimumSampleSizeNotReached",
228-
Message: err.Error(),
229-
})
230-
if err := r.Status().Update(ctx, evaluation); err != nil {
231-
log.V(1).Info("Failed to update resource status", "err", err)
232-
return ctrl.Result{}, err
233-
}
234-
// We need to re-queue it for the next poll, but this is NOT an error
235-
return ctrl.Result{RequeueAfter: evaluation.Spec.PollingPeriod.Duration}, nil
233+
historyWasTrimmed := len(cleanHistory) < len(evaluation.Status.History)
234+
if historyWasTrimmed {
235+
log.V(1).Info("Trim the history", "from", len(evaluation.Status.History), "to", len(cleanHistory))
236+
evaluation.Status.History = cleanHistory
236237
}
237-
log.V(2).Info("Minimum sample size reached, proceeding to evaluate",
238-
"minimumSampleSize", evaluation.Spec.MinimumSampleSize,
239-
"currentSampleSize", len(evaluation.Status.History))
240238

241239
historyRecords := map[string]autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
242240
historyRecordsLastSeen := map[string]metav1.Time{}
243-
historyRecorsSeenTimes := map[string]int{}
241+
historyRecorsSeenTimes := map[string]int32{}
244242
for _, record := range evaluation.Status.History {
245243
serializedRecord := record.Replicas.SerializeToString()
246244
log.V(2).Info("Noticing record", "record", serializedRecord)
247-
if _, ok := historyRecordsLastSeen[serializedRecord]; !ok ||
248-
record.Timestamp.After(historyRecordsLastSeen[serializedRecord].Time) {
245+
if _, ok := historyRecordsLastSeen[serializedRecord]; !ok {
249246
historyRecords[serializedRecord] = record
250247
historyRecordsLastSeen[serializedRecord] = record.Timestamp
251-
historyRecorsSeenTimes[serializedRecord] = 0
248+
historyRecorsSeenTimes[serializedRecord] = record.SeenTimes
249+
} else if record.Timestamp.After(historyRecordsLastSeen[serializedRecord].Time) {
250+
historyRecordsLastSeen[serializedRecord] = record.Timestamp
251+
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
252+
} else {
253+
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
252254
}
253-
historyRecorsSeenTimes[serializedRecord]++
254255
}
256+
255257
topSeenRecord := autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
256-
maxSeenCount := 0
258+
maxSeenCount := int32(0)
257259
for serializedRecord, seenTimes := range historyRecorsSeenTimes {
258260
log.V(2).Info("Evaluating records", "record", serializedRecord, "seenTimes", seenTimes)
259261
if seenTimes > maxSeenCount {

internal/controller/autoscaler/mostwantedtwophasehysteresisevaluation_controller_test.go

Lines changed: 34 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package autoscaler
1818

1919
import (
20-
"fmt"
2120
"time"
2221

2322
. "github.com/onsi/ginkgo/v2"
@@ -83,9 +82,8 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
8382
Name: "N/A",
8483
},
8584
},
86-
PollingPeriod: metav1.Duration{Duration: time.Minute},
87-
StabilizationPeriod: metav1.Duration{Duration: time.Hour},
88-
MinimumSampleSize: 5,
85+
PollingPeriod: metav1.Duration{Duration: time.Second},
86+
StabilizationPeriod: metav1.Duration{Duration: 5 * time.Minute},
8987
},
9088
},
9189
).Create()
@@ -207,45 +205,8 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
207205
).
208206
BranchFailureToUpdateStatusCheck(collector.Collect).
209207
WithCheck(
210-
"bail out on minimum sample size not reached",
208+
"succeed",
211209
func(run *ScenarioRun[*autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluation]) {
212-
samplePartition := NewObjectContainer(
213-
run,
214-
&autoscalerv1alpha1.LongestProcessingTimePartition{
215-
ObjectMeta: metav1.ObjectMeta{
216-
Name: run.Container().Get().Object().Spec.PartitionProviderRef.Name,
217-
Namespace: run.Namespace().ObjectKey().Name,
218-
},
219-
},
220-
)
221-
for i := 0; i < int(run.Container().Object().Spec.MinimumSampleSize-1); i++ {
222-
Expect(run.ReconcileError()).ToNot(HaveOccurred())
223-
Expect(run.ReconcileResult().RequeueAfter).
224-
To(Equal(run.Container().Object().Spec.PollingPeriod.Duration))
225-
Expect(run.ReconcileResult().Requeue).To(BeFalse())
226-
227-
By("Checking conditions")
228-
readyCondition := meta.FindStatusCondition(
229-
run.Container().Get().Object().Status.Conditions,
230-
StatusTypeReady,
231-
)
232-
Expect(readyCondition).NotTo(BeNil())
233-
Expect(readyCondition.Status).To(Equal(metav1.ConditionFalse))
234-
Expect(readyCondition.Reason).To(Equal("MinimumSampleSizeNotReached"))
235-
Expect(readyCondition.Message).To(ContainSubstring("Minimum sample size not reached"))
236-
237-
By("Checking history records")
238-
samplePartition.Get()
239-
history := run.Container().Object().Status.History
240-
Expect(history).To(HaveLen(i + 1))
241-
record := history[i]
242-
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
243-
244-
By("Reconciling resource sample #" + fmt.Sprintf("%d", i+2))
245-
time.Sleep(5 * time.Second)
246-
run.Reconcile()
247-
}
248-
249210
Expect(run.ReconcileError()).ToNot(HaveOccurred())
250211
Expect(run.ReconcileResult().RequeueAfter).
251212
To(Equal(run.Container().Object().Spec.PollingPeriod.Duration))
@@ -261,57 +222,28 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
261222
Expect(readyCondition.Reason).To(Equal(StatusTypeReady))
262223

263224
By("Checking history records")
225+
samplePartition := NewObjectContainer(
226+
run,
227+
&autoscalerv1alpha1.LongestProcessingTimePartition{
228+
ObjectMeta: metav1.ObjectMeta{
229+
Name: run.Container().Get().Object().Spec.PartitionProviderRef.Name,
230+
Namespace: run.Namespace().ObjectKey().Name,
231+
},
232+
},
233+
)
264234
samplePartition.Get()
265235
history := run.Container().Object().Status.History
266-
Expect(history).To(HaveLen(int(run.Container().Object().Spec.MinimumSampleSize)))
267-
record := history[int(run.Container().Object().Spec.MinimumSampleSize)-1]
268-
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
236+
Expect(history).To(HaveLen(1))
269237

270238
By("Checking evaluation results")
271-
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).To(BeTemporally("~", time.Now(), 2*time.Second))
272-
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
273239
Expect(run.Container().Object().Status.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
240+
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
241+
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).
242+
To(BeTemporally("~", time.Now(), time.Second))
274243

275-
By("Simulate stabilization period expiring")
276-
run.Container().Object().Status.LastEvaluationTimestamp = &metav1.Time{
277-
Time: time.Now().Add(-run.Container().Object().Spec.StabilizationPeriod.Duration).
278-
Add(-time.Second),
279-
}
280-
for i := range run.Container().Object().Status.History {
281-
run.Container().Object().Status.History[i].Timestamp = *run.Container().Object().Status.LastEvaluationTimestamp
282-
}
283-
run.Container().StatusUpdate()
284-
285-
for i := 0; i < int(run.Container().Object().Spec.MinimumSampleSize-1); i++ {
286-
By("Reconciling resource sample #" + fmt.Sprintf("%d", i+1))
287-
time.Sleep(5 * time.Second)
288-
run.Reconcile()
289-
290-
Expect(run.ReconcileError()).ToNot(HaveOccurred())
291-
Expect(run.ReconcileResult().RequeueAfter).
292-
To(Equal(run.Container().Object().Spec.PollingPeriod.Duration))
293-
Expect(run.ReconcileResult().Requeue).To(BeFalse())
294-
295-
By("Checking conditions")
296-
readyCondition := meta.FindStatusCondition(
297-
run.Container().Get().Object().Status.Conditions,
298-
StatusTypeReady,
299-
)
300-
Expect(readyCondition).NotTo(BeNil())
301-
Expect(readyCondition.Status).To(Equal(metav1.ConditionFalse))
302-
Expect(readyCondition.Reason).To(Equal("MinimumSampleSizeNotReached"))
303-
Expect(readyCondition.Message).To(ContainSubstring("Minimum sample size not reached"))
304-
305-
By("Checking history records")
306-
samplePartition.Get()
307-
history := run.Container().Object().Status.History
308-
Expect(history).To(HaveLen(i + 1))
309-
record := history[i]
310-
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
311-
}
312-
313-
By("Reconciling resource sample #" + fmt.Sprintf("%d", run.Container().Object().Spec.MinimumSampleSize))
314-
time.Sleep(5 * time.Second)
244+
By("Checking seconds reconciliation")
245+
firstReconcileTime := run.Container().Object().Status.LastEvaluationTimestamp.Time
246+
time.Sleep(6 * time.Second)
315247
run.Reconcile()
316248

317249
Expect(run.ReconcileError()).ToNot(HaveOccurred())
@@ -329,27 +261,28 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
329261
Expect(readyCondition.Reason).To(Equal(StatusTypeReady))
330262

331263
By("Checking history records")
264+
samplePartition = NewObjectContainer(
265+
run,
266+
&autoscalerv1alpha1.LongestProcessingTimePartition{
267+
ObjectMeta: metav1.ObjectMeta{
268+
Name: run.Container().Get().Object().Spec.PartitionProviderRef.Name,
269+
Namespace: run.Namespace().ObjectKey().Name,
270+
},
271+
},
272+
)
332273
samplePartition.Get()
333274
history = run.Container().Object().Status.History
334-
Expect(history).To(HaveLen(int(run.Container().Object().Spec.MinimumSampleSize)))
335-
record = history[int(run.Container().Object().Spec.MinimumSampleSize)-1]
336-
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
275+
Expect(history).To(HaveLen(1))
276+
Expect(history[0].SeenTimes).To(Equal(int32(2)))
337277

338278
By("Checking evaluation results")
339-
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).To(BeTemporally("~", time.Now(), 2*time.Second))
340-
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
341279
Expect(run.Container().Object().Status.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
280+
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
281+
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).
282+
To(Equal(firstReconcileTime))
342283
},
343284
).
344-
Commit(collector.Collect).
345-
Hydrate(
346-
"with one min sample requirement",
347-
func(run *ScenarioRun[*autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluation]) {
348-
run.Container().Object().Spec.MinimumSampleSize = 1
349-
run.Container().Update()
350-
},
351-
).
352-
BranchFailureToUpdateStatusCheck(collector.Collect)
285+
Commit(collector.Collect)
353286

354287
BeforeEach(func() {
355288
scenarioRun = collector.NewRun(ctx, k8sClient)

0 commit comments

Comments
 (0)