Skip to content

Commit

Permalink
do cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
free6om committed Sep 8, 2023
1 parent d206d19 commit 45885d9
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 140 deletions.
2 changes: 1 addition & 1 deletion controllers/apps/components/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (c *componentBase) updateStatus(phaseTransitionMsg string, updatefn func(st

if phase != status.Phase {
// TODO: logging the event
if c.Recorder != nil {
if c.Recorder != nil && phaseTransitionMsg != "" {
c.Recorder.Eventf(c.Cluster, corev1.EventTypeNormal, ComponentPhaseTransition, phaseTransitionMsg)
}
}
Expand Down
17 changes: 17 additions & 0 deletions controllers/apps/components/base_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -266,15 +267,31 @@ func (c *rsmComponentBase) Status(reqCtx intctrlutil.RequestCtx, cli client.Clie
return running
}

updatePodsReady := func(ready bool) {
_ = c.updateStatus("", func(status *appsv1alpha1.ClusterComponentStatus) error {
status.PodsReady = &ready
if ready {
time := metav1.Now()
status.PodsReadyTime = &time
}
return nil
})
}

updatePodsReady(false)
switch {
case isDeleting():
c.SetStatusPhase(appsv1alpha1.DeletingClusterCompPhase, nil, "Component is Deleting")
updatePodsReady(true)
case isZeroReplica() && hasComponentPod():
c.SetStatusPhase(appsv1alpha1.StoppingClusterCompPhase, nil, "Component is Stopping")
updatePodsReady(true)
case isZeroReplica():
c.SetStatusPhase(appsv1alpha1.StoppedClusterCompPhase, nil, "Component is Stopped")
updatePodsReady(true)
case isRunning && isAllConfigSynced && !hasVolumeExpansionRunning():
c.SetStatusPhase(appsv1alpha1.RunningClusterCompPhase, nil, "Component is Running")
updatePodsReady(true)
case hasFailedPod(), isScaleOutFailed():
c.SetStatusPhase(appsv1alpha1.FailedClusterCompPhase, messages, "Component is Failed")
case isFirstGeneration():
Expand Down
157 changes: 18 additions & 139 deletions controllers/apps/transformer_cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,14 @@ package apps
import (
"fmt"

"k8s.io/apimachinery/pkg/api/meta"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/internal/controller/graph"
ictrltypes "github.com/apecloud/kubeblocks/internal/controller/types"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/api/meta"
)

// phaseSyncLevel defines a phase synchronization level to notify the status synchronizer how to handle cluster phase.
type phaseSyncLevel int

const (
clusterPhaseNoChange phaseSyncLevel = iota
clusterIsRunning // cluster is running
clusterIsStopped // cluster is stopped
clusterExistFailedOrAbnormal // cluster exists failed or abnormal component
)

type ClusterStatusTransformer struct {
// phaseSyncLevel defines a phase synchronization level to indicate how to handle cluster phase.
phaseSyncLevel phaseSyncLevel
// existsAbnormalOrFailed indicates whether the cluster exists abnormal or failed component.
existsAbnormalOrFailed bool
// replicasNotReadyCompNames records the component names that are not ready.
notReadyCompNames map[string]struct{}
// replicasNotReadyCompNames records the component names which replicas are not ready.
Expand All @@ -66,25 +52,17 @@ func (t *ClusterStatusTransformer) Transform(ctx graph.TransformContext, dag *gr
cluster.Status.ClusterDefGeneration = transCtx.ClusterDef.Generation
}

initClusterStatusParams := func() {
t.phaseSyncLevel = clusterPhaseNoChange
t.notReadyCompNames = map[string]struct{}{}
t.replicasNotReadyCompNames = map[string]struct{}{}
}

switch {
case origCluster.IsUpdating():
transCtx.Logger.Info(fmt.Sprintf("update cluster status after applying resources, generation: %d", cluster.Generation))
updateObservedGeneration()
rootVertex.Action = ictrltypes.ActionStatusPtr()
case origCluster.IsStatusUpdating():
initClusterStatusParams()
defer func() { rootVertex.Action = ictrltypes.ActionPtr(ictrltypes.STATUS) }()
// reconcile the phase and conditions of the Cluster.status
//if err := t.reconcileClusterStatus(transCtx, dag, cluster); err != nil {
// return err
//}
t.reconcileClusterStatus2(cluster)
if err := t.reconcileClusterStatus(cluster); err != nil {
return err
}
case origCluster.IsDeleting():
return fmt.Errorf("unexpected cluster status: %+v", origCluster)
default:
Expand All @@ -94,7 +72,7 @@ func (t *ClusterStatusTransformer) Transform(ctx graph.TransformContext, dag *gr
return nil
}

func (t *ClusterStatusTransformer) reconcileClusterStatus2(cluster *appsv1alpha1.Cluster) {
func (t *ClusterStatusTransformer) reconcileClusterPhase(cluster *appsv1alpha1.Cluster) {
isFirstGeneration := func() bool {
return cluster.Generation == 1
}
Expand Down Expand Up @@ -157,32 +135,27 @@ func (t *ClusterStatusTransformer) reconcileClusterStatus2(cluster *appsv1alpha1
}

// reconcileClusterStatus reconciles phase and conditions of the Cluster.status.
func (t *ClusterStatusTransformer) reconcileClusterStatus(transCtx *ClusterTransformContext, dag *graph.DAG, cluster *appsv1alpha1.Cluster) error {
func (t *ClusterStatusTransformer) reconcileClusterStatus(cluster *appsv1alpha1.Cluster) error {
if len(cluster.Status.Components) == 0 {
return nil
}
initClusterStatusParams := func() {
t.notReadyCompNames = map[string]struct{}{}
t.replicasNotReadyCompNames = map[string]struct{}{}
}
initClusterStatusParams()

// removes the invalid component of status.components which is deleted from spec.components.
t.removeInvalidCompStatus(cluster)

// do analysis of Cluster.Status.component and update the results to status synchronizer.
t.doAnalysisAndUpdateSynchronizer(dag, cluster)
t.doAnalysisAndUpdateSynchronizer(cluster)

// handle the ready condition.
t.syncReadyConditionForCluster(cluster)

// sync the cluster phase.
switch t.phaseSyncLevel {
case clusterIsRunning:
if cluster.Status.Phase != appsv1alpha1.RunningClusterPhase {
t.syncClusterPhaseToRunning(cluster)
}
case clusterIsStopped:
if cluster.Status.Phase != appsv1alpha1.StoppedClusterPhase {
t.syncClusterPhaseToStopped(cluster)
}
case clusterExistFailedOrAbnormal:
t.handleExistAbnormalOrFailed(transCtx, cluster)
}
t.reconcileClusterPhase(cluster)
return nil
}

Expand All @@ -201,11 +174,7 @@ func (t *ClusterStatusTransformer) removeInvalidCompStatus(cluster *appsv1alpha1
}

// doAnalysisAndUpdateSynchronizer analyzes the Cluster.Status.Components and updates the results to the synchronizer.
func (t *ClusterStatusTransformer) doAnalysisAndUpdateSynchronizer(dag *graph.DAG, cluster *appsv1alpha1.Cluster) {
var (
runningCompCount int
stoppedCompCount int
)
func (t *ClusterStatusTransformer) doAnalysisAndUpdateSynchronizer(cluster *appsv1alpha1.Cluster) {
// analysis the status of components and calculate the cluster phase.
for k, v := range cluster.Status.Components {
if v.PodsReady == nil || !*v.PodsReady {
Expand All @@ -214,26 +183,9 @@ func (t *ClusterStatusTransformer) doAnalysisAndUpdateSynchronizer(dag *graph.DA
}
switch v.Phase {
case appsv1alpha1.UnknownClusterCompPhase, appsv1alpha1.FailedClusterCompPhase:
t.existsAbnormalOrFailed, t.notReadyCompNames[k] = true, struct{}{}
case appsv1alpha1.RunningClusterCompPhase:
// if !isComponentInHorizontalScaling(dag, k) {
runningCompCount += 1
// }
case appsv1alpha1.StoppedClusterCompPhase:
stoppedCompCount += 1
t.notReadyCompNames[k] = struct{}{}
}
}
if t.existsAbnormalOrFailed {
t.phaseSyncLevel = clusterExistFailedOrAbnormal
return
}
switch len(cluster.Status.Components) {
case runningCompCount:
t.phaseSyncLevel = clusterIsRunning
case stoppedCompCount:
// cluster is Stopped when cluster is not Running and all components are Stopped or Running
t.phaseSyncLevel = clusterIsStopped
}
}

// syncReadyConditionForCluster syncs the cluster conditions with ClusterReady and ReplicasReady type.
Expand All @@ -257,80 +209,7 @@ func (t *ClusterStatusTransformer) syncClusterPhaseToRunning(cluster *appsv1alph
meta.SetStatusCondition(&cluster.Status.Conditions, newClusterReadyCondition(cluster.Name))
}

// syncClusterToStopped syncs the cluster phase to Stopped.
// syncClusterPhaseToStopped syncs the cluster phase to Stopped.
func (t *ClusterStatusTransformer) syncClusterPhaseToStopped(cluster *appsv1alpha1.Cluster) {
cluster.Status.Phase = appsv1alpha1.StoppedClusterPhase
}

// handleExistAbnormalOrFailed handles the cluster status when some components are not ready.
func (t *ClusterStatusTransformer) handleExistAbnormalOrFailed(transCtx *ClusterTransformContext, cluster *appsv1alpha1.Cluster) {
componentMap, clusterAvailabilityEffectMap, _ := getComponentRelatedInfo(cluster,
*transCtx.ClusterDef, "")
// handle the cluster status when some components are not ready.
handleClusterPhaseWhenCompsNotReady(cluster, componentMap, clusterAvailabilityEffectMap)
}

// handleClusterPhaseWhenCompsNotReady handles the Cluster.status.phase when some components are Abnormal or Failed.
func handleClusterPhaseWhenCompsNotReady(cluster *appsv1alpha1.Cluster,
componentMap map[string]string,
clusterAvailabilityEffectMap map[string]bool) {
var (
clusterIsFailed bool
failedCompCount int
)

for k, v := range cluster.Status.Components {
if !slices.Contains(appsv1alpha1.GetComponentTerminalPhases(), v.Phase) {
return
}
if v.Phase == appsv1alpha1.FailedClusterCompPhase {
failedCompCount += 1
componentDefName := componentMap[k]
// if the component can affect cluster availability, set Cluster.status.phase to Failed
if clusterAvailabilityEffectMap[componentDefName] {
clusterIsFailed = true
break
}
}
}
// If all components fail or there are failed components that affect the availability of the cluster, set phase to Failed
if failedCompCount == len(cluster.Status.Components) || clusterIsFailed {
cluster.Status.Phase = appsv1alpha1.FailedClusterPhase
} else {
cluster.Status.Phase = appsv1alpha1.UnknownClusterPhase
}
}

// getClusterAvailabilityEffect checks whether the component affect the cluster availability.
// if the component affects the availability and being Failed, the cluster is also Failed.
func getClusterAvailabilityEffect(componentDef *appsv1alpha1.ClusterComponentDefinition) bool {
switch componentDef.WorkloadType {
case appsv1alpha1.Consensus, appsv1alpha1.Replication:
return true
default:
return componentDef.GetMaxUnavailable() != nil
}
}

// getComponentRelatedInfo gets componentMap, clusterAvailabilityMap and component definition information
func getComponentRelatedInfo(cluster *appsv1alpha1.Cluster, clusterDef appsv1alpha1.ClusterDefinition, componentName string) (map[string]string, map[string]bool, appsv1alpha1.ClusterComponentDefinition) {
var (
compDefName string
componentMap = map[string]string{}
componentDef appsv1alpha1.ClusterComponentDefinition
)
for _, v := range cluster.Spec.ComponentSpecs {
if v.Name == componentName {
compDefName = v.ComponentDefRef
}
componentMap[v.Name] = v.ComponentDefRef
}
clusterAvailabilityEffectMap := map[string]bool{}
for _, v := range clusterDef.Spec.ComponentDefs {
clusterAvailabilityEffectMap[v.Name] = getClusterAvailabilityEffect(&v)
if v.Name == compDefName {
componentDef = v
}
}
return componentMap, clusterAvailabilityEffectMap, componentDef
}

0 comments on commit 45885d9

Please sign in to comment.