Skip to content

Commit

Permalink
Merge branch 'error/adaptive-ud-240902' into feature/ud-adaptive-240827
Browse files Browse the repository at this point in the history
# Conflicts:
#	test/e2e/framework/uniteddeployment.go
  • Loading branch information
AiRanthem committed Sep 3, 2024
2 parents 89bffc8 + eed4c5d commit c462401
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 180 deletions.
8 changes: 5 additions & 3 deletions apis/apps/v1alpha1/uniteddeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,11 @@ type UpdateStatus struct {
}

type UnschedulableStatus struct {
Unschedulable bool `json:"unschedulable"`
UnschedulableTimestamp metav1.Time `json:"unschedulableTimestamp,omitempty"`
FailedPods int32 `json:"failedPods,omitempty"`
Unschedulable bool `json:"unschedulable"`
// +optional
UnschedulableTimestamp *metav1.Time `json:"unschedulableTimestamp,omitempty"`
// +optional
PendingPods int32 `json:"pendingPods,omitempty"`
}

// +genclient
Expand Down
5 changes: 4 additions & 1 deletion apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ spec:
subsetUnschedulable:
additionalProperties:
properties:
failedPods:
pendingPods:
format: int32
type: integer
unschedulable:
Expand Down
49 changes: 23 additions & 26 deletions pkg/controller/uniteddeployment/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,28 @@ func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator {
return &specificAllocator{UnitedDeployment: ud}
}

// safeReplicas refers to the number of Pods that an unschedulable subset can safely accommodate.
// NotPendingReplicas refers to the number of Pods that an unschedulable subset can safely accommodate.
// Exceeding this number may lead to scheduling failures within that subset.
// This value is only effective in the Adaptive scheduling strategy.
func getSafeReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 {
func getNotPendingReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 {
if nameToSubset == nil {
return nil
}
var result = make(map[string]int32)
for name, subset := range *nameToSubset {
if subset.Status.UnschedulableStatus.Unschedulable {
result[name] = subset.Status.Replicas - subset.Status.UnschedulableStatus.FailedPods
}
result[name] = subset.Status.Replicas - subset.Status.UnschedulableStatus.PendingPods
}
return result
}

// get readyReplicas to prevent healthy Pods from being deleted.
func getReadyReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 {
if nameToSubset == nil {
return nil
}
var result = make(map[string]int32)
for name, subset := range *nameToSubset {
result[name] = subset.Status.ReadyReplicas
func getSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) {
if subsetObj, ok := (*nameToSubset)[name]; ok {
unschedulable = subsetObj.Status.UnschedulableStatus.Unschedulable
} else {
// newly created subsets are all schedulable
unschedulable = false

Check warning on line 91 in pkg/controller/uniteddeployment/allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/allocator.go#L91

Added line #L91 was not covered by tests
}
return result
return
}

type specificAllocator struct {
Expand Down Expand Up @@ -158,8 +154,7 @@ func getSpecifiedSubsetReplicas(replicas int32, ud *appsv1alpha1.UnitedDeploymen
return &replicaLimits
}

safeReplicasMap := getSafeReplicasMap(nameToSubset)

safeReplicasMap := getNotPendingReplicasMap(nameToSubset)
for _, subsetDef := range ud.Spec.Topology.Subsets {
if subsetDef.Replicas == nil {
continue
Expand All @@ -169,7 +164,7 @@ func getSpecifiedSubsetReplicas(replicas int32, ud *appsv1alpha1.UnitedDeploymen
limit := specifiedReplicas
if ud.Spec.Topology.ScheduleStrategy.IsAdaptive() {
// This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up.
if safeReplicas, ok := safeReplicasMap[subsetDef.Name]; ok {
if safeReplicas, ok := safeReplicasMap[subsetDef.Name]; getSubSetUnschedulable(subsetDef.Name, nameToSubset) && ok {
limit = integer.Int32Min(safeReplicas, specifiedReplicas)
}
}
Expand Down Expand Up @@ -305,8 +300,7 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo
numSubset := len(ac.Spec.Topology.Subsets)
minReplicasMap := make(map[string]int32, numSubset)
maxReplicasMap := make(map[string]int32, numSubset)
safeReplicasMap := getSafeReplicasMap(nameToSubset)
readyReplicasMap := getReadyReplicasMap(nameToSubset)
notPendingReplicasMap := getNotPendingReplicasMap(nameToSubset)
for index, subset := range ac.Spec.Topology.Subsets {
minReplicas := int32(0)
maxReplicas := int32(math.MaxInt32)
Expand All @@ -317,16 +311,19 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo
maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas)
}
if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() {
unschedulable := getSubSetUnschedulable(subset.Name, nameToSubset)
// This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up.
if safeReplicas, ok := safeReplicasMap[subset.Name]; ok {
minReplicas = integer.Int32Min(safeReplicas, minReplicas)
maxReplicas = integer.Int32Min(safeReplicas, maxReplicas)
if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; unschedulable && ok {
klog.InfoS("Assign min(notPendingReplicas, minReplicas/maxReplicas) for unschedulable subset",
"subset", subset.Name)
minReplicas = integer.Int32Min(notPendingReplicas, minReplicas)
maxReplicas = integer.Int32Min(notPendingReplicas, maxReplicas)
}
// To prevent healthy pod from being deleted
if readyReplicas := readyReplicasMap[subset.Name]; readyReplicas > minReplicas {
klog.InfoS("Assign min(readyReplicas, maxReplicas) to minReplicas to avoid deleting running pods",
"subset", subset.Name, "minReplicas", minReplicas, "readyReplicas", readyReplicas, "maxReplicas", maxReplicas)
minReplicas = integer.Int32Min(readyReplicas, maxReplicas)
if notPendingReplicas := notPendingReplicasMap[subset.Name]; !unschedulable && notPendingReplicas > minReplicas {
klog.InfoS("Assign min(notPendingReplicas, maxReplicas) to minReplicas to avoid deleting running pods",
"subset", subset.Name, "minReplicas", minReplicas, "notPendingReplicas", notPendingReplicas, "maxReplicas", maxReplicas)
minReplicas = integer.Int32Min(notPendingReplicas, maxReplicas)

Check warning on line 326 in pkg/controller/uniteddeployment/allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/allocator.go#L324-L326

Added lines #L324 - L326 were not covered by tests
}
}

Expand Down
34 changes: 14 additions & 20 deletions pkg/controller/uniteddeployment/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestAdaptiveSpecificAllocator(t *testing.T) {
Status: SubsetStatus{
UnschedulableStatus: appsv1alpha1.UnschedulableStatus{
Unschedulable: true,
FailedPods: 2,
PendingPods: 2,
},
Replicas: 5,
},
Expand All @@ -352,7 +352,7 @@ func TestAdaptiveSpecificAllocator(t *testing.T) {
}

func TestAdaptiveElasticAllocator(t *testing.T) {
getUnitedDeploymentAndSubsets := func(totalReplicas, minReplicas, maxReplicas, failedPods, readyReplicas int32) (
getUnitedDeploymentAndSubsets := func(totalReplicas, minReplicas, maxReplicas, failedPods int32) (
*appsv1alpha1.UnitedDeployment, map[string]*Subset) {
minR, maxR := intstr.FromInt32(minReplicas), intstr.FromInt32(maxReplicas)
return &appsv1alpha1.UnitedDeployment{
Expand All @@ -379,62 +379,56 @@ func TestAdaptiveElasticAllocator(t *testing.T) {
Status: SubsetStatus{
UnschedulableStatus: appsv1alpha1.UnschedulableStatus{
Unschedulable: true,
FailedPods: failedPods,
PendingPods: failedPods,
},
Replicas: maxReplicas,
},
Spec: SubsetSpec{Replicas: minReplicas},
},
"subset-2": {
Status: SubsetStatus{
ReadyReplicas: readyReplicas,
},
Status: SubsetStatus{},
},
}
}
cases := []struct {
totalReplicas, minReplicas, maxReplicas, failedPods, readyReplicas int32
subset1Replicas, subset2Replicas int32
totalReplicas, minReplicas, maxReplicas, pendingPods int32
subset1Replicas, subset2Replicas int32
}{
{
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 5,
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 5,
subset1Replicas: 0, subset2Replicas: 10,
},
{
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 4,
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 4,
subset1Replicas: 0, subset2Replicas: 10,
},
{
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 3,
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 3,
subset1Replicas: 1, subset2Replicas: 9,
},
{
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 2,
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 2,
subset1Replicas: 2, subset2Replicas: 8,
},
{
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 1,
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 1,
subset1Replicas: 3, subset2Replicas: 7,
},
{
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 0,
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 0,
subset1Replicas: 4, subset2Replicas: 6,
},
{
totalReplicas: 10, maxReplicas: 4, readyReplicas: 7,
subset1Replicas: 3, subset2Replicas: 7,
},
}
for _, testCase := range cases {
ud, subsets := getUnitedDeploymentAndSubsets(
testCase.totalReplicas, testCase.minReplicas, testCase.maxReplicas, testCase.failedPods, testCase.readyReplicas)
testCase.totalReplicas, testCase.minReplicas, testCase.maxReplicas, testCase.pendingPods)
alloc, err := NewReplicaAllocator(ud).Alloc(&subsets)
if err != nil {
t.Fatalf("unexpected alloc error %v", err)
} else {
subset1Replicas, subset2Replicas := (*alloc)["subset-1"], (*alloc)["subset-2"]
if subset1Replicas != testCase.subset1Replicas || subset2Replicas != testCase.subset2Replicas {
t.Fatalf("subset1Replicas = %d, subset1Replicas = %d, test case is %v",
t.Fatalf("subset1Replicas = %d, subset1Replicas = %d, test case is %+v",
subset1Replicas, subset2Replicas, testCase)
}
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/controller/uniteddeployment/uniteddeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud
if status, ok := ud.Status.SubsetUnschedulable[name]; ok && status.Unschedulable {
// The unschedulable state of a subset lasts for at least 5 minutes.
// During this period, even if ReadyReplicas == Replicas, the subset is still unschedulable.
if status.UnschedulableTimestamp == nil {
status.UnschedulableTimestamp = &metav1.Time{}

Check warning on line 286 in pkg/controller/uniteddeployment/uniteddeployment_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/uniteddeployment_controller.go#L286

Added line #L286 was not covered by tests
}
recoverTime := status.UnschedulableTimestamp.Add(ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration())
klog.InfoS("existing unschedulable subset found", "subset", name, "recoverTime", recoverTime)
if now.Before(recoverTime) {
Expand All @@ -296,19 +299,20 @@ func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud
// Maybe there exist some pending pods because the subset is unschedulable.
if subset.Status.ReadyReplicas < subset.Status.Replicas {
for _, pod := range subset.Spec.SubsetPods {
timeouted, checkAfter := utilcontroller.PodUnscheduledTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration())
timeouted, checkAfter := utilcontroller.PodPendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration())
if timeouted {
subset.Status.UnschedulableStatus.FailedPods++
subset.Status.UnschedulableStatus.PendingPods++
}
if checkAfter > 0 {
durationStore.Push(unitedDeploymentKey, checkAfter)
}
}
if subset.Status.UnschedulableStatus.FailedPods > 0 {
klog.InfoS("subset has failed pods", "failedPods", subset.Status.UnschedulableStatus.FailedPods)
if subset.Status.UnschedulableStatus.PendingPods > 0 {
klog.InfoS("subset has pending pods",
"subset", subset.Name, "pendingPods", subset.Status.UnschedulableStatus.PendingPods)
if !subset.Status.UnschedulableStatus.Unschedulable {
subset.Status.UnschedulableStatus.Unschedulable = true
subset.Status.UnschedulableStatus.UnschedulableTimestamp = metav1.Time{Time: time.Now()}
subset.Status.UnschedulableStatus.UnschedulableTimestamp = &metav1.Time{Time: time.Now()}
durationStore.Push(unitedDeploymentKey, ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,28 +164,28 @@ func TestUnschedulableStatusManagement(t *testing.T) {

// CASE1: Not timeouted yet
manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud)
g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(0))
g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0))
g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool {
return after < 15*time.Second && after > 14*time.Second
}))

//// CASE2: Timeouted
pod.CreationTimestamp = metav1.NewTime(time.Now().Add(-31 * time.Second))
manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud)
g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(1))
g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(1))
g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(appsv1alpha1.DefaultUnschedulableStatusLastDuration))

// CASE3: Unschedulable status
ud.Status.SubsetUnschedulable = map[string]appsv1alpha1.UnschedulableStatus{
subset.Name: {
Unschedulable: true,
UnschedulableTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)),
UnschedulableTimestamp: &metav1.Time{Time: time.Now().Add(-time.Minute)},
},
}
subset.Status.ReadyReplicas = 1
subset.Status.UnschedulableStatus.FailedPods = 0
subset.Status.UnschedulableStatus.PendingPods = 0
manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud)
g.Expect(g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(0)))
g.Expect(g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0)))
g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool {
return after < appsv1alpha1.DefaultUnschedulableStatusLastDuration-time.Minute &&
after > 59*time.Second+appsv1alpha1.DefaultUnschedulableStatusLastDuration-2*time.Minute
Expand All @@ -195,11 +195,11 @@ func TestUnschedulableStatusManagement(t *testing.T) {
ud.Status.SubsetUnschedulable = map[string]appsv1alpha1.UnschedulableStatus{
subset.Name: {
Unschedulable: true,
UnschedulableTimestamp: metav1.NewTime(time.Now().Add(-time.Minute - appsv1alpha1.DefaultUnschedulableStatusLastDuration)),
UnschedulableTimestamp: &metav1.Time{Time: time.Now().Add(-time.Minute - appsv1alpha1.DefaultUnschedulableStatusLastDuration)},
},
}
subset.Status.UnschedulableStatus.Unschedulable = false
g.Expect(g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(0)))
g.Expect(g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0)))
g.Expect(subset.Status.UnschedulableStatus.Unschedulable).To(gomega.BeFalse())
g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(0))
}
5 changes: 2 additions & 3 deletions pkg/controller/util/pod_condition_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ func UpdateMessageKvCondition(kv map[string]interface{}, condition *v1.PodCondit
condition.Message = string(message)
}

// PodUnscheduledTimeout return true when Pod was scheduled failed and timeout.
// PodPendingTimeout return true when Pod was scheduled failed and timeout.
// nextCheckAfter > 0 means the pod is failed to schedule but not timeout yet.
func PodUnscheduledTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) {
func PodPendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) {
if pod.DeletionTimestamp != nil || pod.Status.Phase != v1.PodPending || pod.Spec.NodeName != "" {
return false, -1
}
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionFalse &&
condition.Reason == v1.PodReasonUnschedulable {
currentTime := time.Now()

expectSchedule := pod.CreationTimestamp.Add(timeout)
// schedule timeout
if expectSchedule.Before(currentTime) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workloadspread/reschedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS

// PodUnscheduledTimeout return true when Pod was scheduled failed and timeout.
func PodUnscheduledTimeout(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) bool {
timeouted, nextCheckAfter := util.PodUnscheduledTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds))
timeouted, nextCheckAfter := util.PodPendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds))
if nextCheckAfter > 0 {
durationStore.Push(getWorkloadSpreadKey(ws), nextCheckAfter)

Check warning on line 117 in pkg/controller/workloadspread/reschedule.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/reschedule.go#L117

Added line #L117 was not covered by tests
}
Expand Down
Loading

0 comments on commit c462401

Please sign in to comment.