Skip to content

Commit

Permalink
controller: consolidate status update logic (#915)
Browse files Browse the repository at this point in the history
Previously, Status updates could fail due to version conflicts. This
    commit pulls all status update logic into a helper function that
    utilizes retry.RetryOnConflict to ensure that .Status updates get
    persisted in spite of resource version changes.
    Additionally, this commit removes an early bailout if the controller
    detects a divergence in the currently loaded resource and the API
    version. This change should have no notable affects as it took place
    after any actors ran.
    It also fixes a buglet that overwrote the reconcilers context,
    negating the maximum runtime for a single reconcilation loop.
  • Loading branch information
chrisseto authored Jun 14, 2022
1 parent dbefb9b commit dda0a63
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

* Delete the CancelLoop function, fixing a cluster status update bug
* Correctly detect failed version checker Pods
* retry cluster status updates, reducing test flakes

# [v2.7.0](https://github.com/cockroachdb/cockroach-operator/compare/v2.6.0...v2.7.0)

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"@io_k8s_api//policy/v1beta1:go_default_library",
"@io_k8s_apimachinery//pkg/runtime:go_default_library",
"@io_k8s_client_go//kubernetes:go_default_library",
"@io_k8s_client_go//util/retry:go_default_library",
"@io_k8s_sigs_controller_runtime//:go_default_library",
"@io_k8s_sigs_controller_runtime//pkg/client:go_default_library",
"@io_k8s_sigs_controller_runtime//pkg/reconcile:go_default_library",
Expand Down
37 changes: 16 additions & 21 deletions pkg/controller/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -121,8 +122,8 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request
// we added a state called Starting for field ClusterStatus to accomplish this
if cluster.Status().ClusterStatus == "" {
cluster.SetClusterStatusOnFirstReconcile()
if err := r.Client.Status().Update(ctx, cluster.Unwrap()); err != nil {
log.Error(err, "failed to update cluster status on action")
if err := r.updateClusterStatus(ctx, log, &cluster); err != nil {
log.Error(err, "failed to update cluster status")
return requeueIfError(err)
}
return requeueImmediately()
Expand All @@ -132,8 +133,8 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request
if cluster.True(api.CrdbVersionChecked) {
if cluster.GetCockroachDBImageName() != cluster.Status().CrdbContainerImage {
cluster.SetFalse(api.CrdbVersionChecked)
if err := r.Client.Status().Update(ctx, cluster.Unwrap()); err != nil {
log.Error(err, "failed to update cluster status on action")
if err := r.updateClusterStatus(ctx, log, &cluster); err != nil {
log.Error(err, "failed to update cluster status")
return requeueIfError(err)
}
return requeueImmediately()
Expand All @@ -148,16 +149,14 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request
return noRequeue()
}

ctx = context.Background()

log.Info(fmt.Sprintf("Running action with name: %s", actorToExecute.GetActionType()))
if err := actorToExecute.Act(ctx, &cluster, log); err != nil {
// Save the error on the Status for each action
log.Info("Error on action", "Action", actorToExecute.GetActionType(), "err", err.Error())
cluster.SetActionFailed(actorToExecute.GetActionType(), err.Error())

defer func(ctx context.Context, cluster *resource.Cluster) {
if err := r.Client.Status().Update(ctx, cluster.Unwrap()); err != nil {
if err := r.updateClusterStatus(ctx, log, cluster); err != nil {
log.Error(err, "failed to update cluster status")
}
}(ctx, &cluster)
Expand Down Expand Up @@ -193,20 +192,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request
cluster.SetActionFinished(actorToExecute.GetActionType())
}

// Check if the resource has been updated while the controller worked on it
fresh, err := cluster.IsFresh(fetcher)
if err != nil {
return requeueIfError(err)
}

// If the resource was updated, it is needed to start all over again
// to ensure that the latest state was reconciled
if !fresh {
log.V(int(zapcore.DebugLevel)).Info("cluster resources is not up to date")
return requeueImmediately()
}
cluster.SetClusterStatus()
if err := r.Client.Status().Update(ctx, cluster.Unwrap()); err != nil {
if err := r.updateClusterStatus(ctx, log, &cluster); err != nil {
log.Error(err, "failed to update cluster status")
return requeueIfError(err)
}
Expand All @@ -215,6 +201,15 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request
return noRequeue()
}

// updateClusterStatus preprocesses a cluster's Status and then persists it to
// the Kubernetes API. updateClusterStatus will retry on conflict errors.
func (r *ClusterReconciler) updateClusterStatus(ctx context.Context, log logr.Logger, cluster *resource.Cluster) error {
cluster.SetClusterStatus()
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Status().Update(ctx, cluster.Unwrap())
})
}

// SetupWithManager registers the controller with the controller.Manager from controller-runtime
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
var ingress client.Object
Expand Down
9 changes: 0 additions & 9 deletions pkg/resource/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,6 @@ func (cluster Cluster) SecureMode() string {
return "--insecure"
}

func (cluster Cluster) IsFresh(fetcher Fetcher) (bool, error) {
actual := ClusterPlaceholder(cluster.Name())
if err := fetcher.Fetch(actual); err != nil {
return false, errors.Wrapf(err, "failed to fetch cluster resource")
}

return cluster.cr.ResourceVersion == actual.ResourceVersion, nil
}

func (cluster Cluster) LoggingConfiguration(fetcher Fetcher) (string, error) {
if cluster.Spec().LogConfigMap != "" {
cm := &corev1.ConfigMap{
Expand Down

0 comments on commit dda0a63

Please sign in to comment.