diff --git a/.golangci.yml b/.golangci.yml index 1ed45b6..250a5cb 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -14,7 +14,6 @@ linters: - importas - misspell - nestif - - nonamedreturns - prealloc - revive - stylecheck diff --git a/pkg/reconciler/internal/updater/updater.go b/pkg/reconciler/internal/updater/updater.go index 4c4bd31..9621046 100644 --- a/pkg/reconciler/internal/updater/updater.go +++ b/pkg/reconciler/internal/updater/updater.go @@ -18,7 +18,10 @@ package updater import ( "context" + "fmt" + "reflect" + "github.com/go-logr/logr" "helm.sh/helm/v3/pkg/release" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -33,17 +36,35 @@ import ( "github.com/operator-framework/helm-operator-plugins/pkg/internal/status" ) -func New(client client.Client) Updater { +func New(client client.Client, logger logr.Logger) Updater { + logger = logger.WithName("updater") return Updater{ client: client, + logger: logger, } } type Updater struct { - isCanceled bool - client client.Client - updateFuncs []UpdateFunc - updateStatusFuncs []UpdateStatusFunc + isCanceled bool + client client.Client + logger logr.Logger + updateFuncs []UpdateFunc + updateStatusFuncs []UpdateStatusFunc + externallyManagedStatusConditions map[string]struct{} + enableAggressiveConflictResolution bool +} + +func (u *Updater) RegisterExternallyManagedStatusConditions(conditions map[string]struct{}) { + if u.externallyManagedStatusConditions == nil { + u.externallyManagedStatusConditions = make(map[string]struct{}, len(conditions)) + } + for conditionType := range conditions { + u.externallyManagedStatusConditions[conditionType] = struct{}{} + } +} + +func (u *Updater) EnableAggressiveConflictResolution() { + u.enableAggressiveConflictResolution = true } type UpdateFunc func(*unstructured.Unstructured) bool @@ -113,7 +134,20 @@ func (u *Updater) Apply(ctx context.Context, obj *unstructured.Unstructured) err st.updateStatusObject() obj.Object["status"] = st.StatusObject if err := retryOnRetryableUpdateError(backoff, func() error { - return u.client.Status().Update(ctx, obj) + updateErr := u.client.Status().Update(ctx, obj) + if errors.IsConflict(updateErr) && u.enableAggressiveConflictResolution { + u.logger.V(1).Info("Status update conflict detected") + resolved, resolveErr := u.tryRefreshObject(ctx, obj) + u.logger.V(1).Info("tryRefreshObject", "resolved", resolved, "resolveErr", resolveErr) + if resolveErr != nil { + return resolveErr + } + if !resolved { + return updateErr + } + return fmt.Errorf("status update conflict due to externally-managed status conditions") // retriable error. + } + return updateErr }); err != nil { return err } @@ -125,7 +159,20 @@ func (u *Updater) Apply(ctx context.Context, obj *unstructured.Unstructured) err } if needsUpdate { if err := retryOnRetryableUpdateError(backoff, func() error { - return u.client.Update(ctx, obj) + updateErr := u.client.Update(ctx, obj) + if errors.IsConflict(updateErr) && u.enableAggressiveConflictResolution { + u.logger.V(1).Info("Status update conflict detected") + resolved, resolveErr := u.tryRefreshObject(ctx, obj) + u.logger.V(1).Info("tryRefreshObject", "resolved", resolved, "resolveErr", resolveErr) + if resolveErr != nil { + return resolveErr + } + if !resolved { + return updateErr + } + return fmt.Errorf("update conflict due to externally-managed status conditions") // retriable error. + } + return updateErr }); err != nil { return err } @@ -133,6 +180,140 @@ func (u *Updater) Apply(ctx context.Context, obj *unstructured.Unstructured) err return nil } +// This function tries to merge the status of obj with the current version of the status on the cluster. +// The unstructured obj is expected to have been modified and to have caused a conflict error during an update attempt. +// If the only differences between obj and the current version are in externally managed status conditions, +// those conditions are merged from the current version into obj. +// Returns true if updating shall be retried with the updated obj. +// Returns false if the conflict could not be resolved. +func (u *Updater) tryRefreshObject(ctx context.Context, obj *unstructured.Unstructured) (bool, error) { + // Retrieve current version from the cluster. + current := &unstructured.Unstructured{} + current.SetGroupVersionKind(obj.GroupVersionKind()) + objectKey := client.ObjectKeyFromObject(obj) + if err := u.client.Get(ctx, objectKey, current); err != nil { + err = fmt.Errorf("refreshing object %s/%s: %w", objectKey.Namespace, objectKey.Name, err) + return false, err + } + + if !reflect.DeepEqual(obj.Object["spec"], current.Object["spec"]) { + // Diff in object spec. Nothing we can do about it -> Fail. + u.logger.V(1).Info("Cluster resource cannot be updated due to spec mismatch", + "namespace", objectKey.Namespace, "name", objectKey.Name, "gkv", obj.GroupVersionKind(), + ) + return false, nil + } + + // Merge externally managed conditions from current into object copy. + objCopy := obj.DeepCopy() + u.mergeExternallyManagedConditions(objCopy, current) + + // Overwrite metadata with the most recent in-cluster version. + // This ensures we have the latest resourceVersion, annotations, labels, etc. + // The reconciler flow can modify the set of finalizers, hence we need to preserve them. + newMetadata, metadataFound, _ := unstructured.NestedFieldNoCopy(current.Object, "metadata") + if !metadataFound { + return false, fmt.Errorf("failed to obtain metadata from current object") + } + finalizers, finalizersFound, _ := unstructured.NestedFieldNoCopy(obj.Object, "metadata", "finalizers") + if finalizersFound { + err := unstructured.SetNestedField(newMetadata.(map[string]interface{}), finalizers, "finalizers") + if err != nil { + return false, fmt.Errorf("failed to set finalizers in merged metadata: %w", err) + } + } + objCopy.Object["metadata"] = newMetadata + + // We were able to resolve the conflict by merging external conditions. + obj.Object = objCopy.Object + + u.logger.V(1).Info("Resolved update conflict by merging externally-managed status conditions") + return true, nil +} + +// mergeExternallyManagedConditions updates obj's status conditions by replacing +// externally managed conditions with their values from current. +// Uses current's ordering to avoid false positives in conflict detection. +func (u *Updater) mergeExternallyManagedConditions(obj, current *unstructured.Unstructured) { + objConditions := statusConditionsFromObject(obj) + if objConditions == nil { + return + } + + currentConditions := statusConditionsFromObject(current) + if currentConditions == nil { + return + } + + // Build a map of all conditions from obj (by type). + objConditionsByType := make(map[string]map[string]interface{}) + for _, cond := range objConditions { + if condType, ok := cond["type"].(string); ok { + objConditionsByType[condType] = cond + } + } + + // Build merged conditions starting from current's ordering. + mergedConditions := make([]map[string]interface{}, 0, len(currentConditions)) + for _, cond := range currentConditions { + condType, ok := cond["type"].(string) + if !ok { + // Shouldn't happen. + continue + } + if _, isExternal := u.externallyManagedStatusConditions[condType]; isExternal { + // Keep external condition from current. + mergedConditions = append(mergedConditions, cond) + } else if objCond, found := objConditionsByType[condType]; found { + // Replace with non-external condition from obj. + mergedConditions = append(mergedConditions, objCond) + delete(objConditionsByType, condType) // Mark as used. + } + // Note: If condition exists in current but not in obj (and is non-external), + // we skip it. + } + + // Add any remaining non-externally managed conditions from obj that weren't in current. + for condType, cond := range objConditionsByType { + if _, isExternal := u.externallyManagedStatusConditions[condType]; isExternal { + continue + } + mergedConditions = append(mergedConditions, cond) + } + + // Convert to []interface{} for SetNestedField + mergedConditionsInterface := make([]interface{}, len(mergedConditions)) + for i, cond := range mergedConditions { + mergedConditionsInterface[i] = cond + } + + // Write the modified conditions back. + _ = unstructured.SetNestedField(obj.Object, mergedConditionsInterface, "status", "conditions") +} + +// statusConditionsFromObject extracts status conditions from an unstructured object. +// Returns nil if the conditions field is not found or is not the expected type. +func statusConditionsFromObject(obj *unstructured.Unstructured) []map[string]interface{} { + conditionsRaw, ok, _ := unstructured.NestedFieldNoCopy(obj.Object, "status", "conditions") + if !ok { + return nil + } + + conditionsSlice, ok := conditionsRaw.([]interface{}) + if !ok { + return nil + } + + // Convert []interface{} to []map[string]interface{} + result := make([]map[string]interface{}, 0, len(conditionsSlice)) + for _, cond := range conditionsSlice { + if condMap, ok := cond.(map[string]interface{}); ok { + result = append(result, condMap) + } + } + return result +} + func RemoveFinalizer(finalizer string) UpdateFunc { return func(obj *unstructured.Unstructured) bool { if !controllerutil.ContainsFinalizer(obj, finalizer) { diff --git a/pkg/reconciler/internal/updater/updater_test.go b/pkg/reconciler/internal/updater/updater_test.go index a4f8c3a..bbe7761 100644 --- a/pkg/reconciler/internal/updater/updater_test.go +++ b/pkg/reconciler/internal/updater/updater_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" + "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -51,7 +52,7 @@ var _ = Describe("Updater", func() { JustBeforeEach(func() { cl = fake.NewClientBuilder().WithInterceptorFuncs(interceptorFuncs).Build() - u = New(cl) + u = New(cl, logr.Discard()) obj = &unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": "apps/v1", "kind": "Deployment", @@ -326,3 +327,149 @@ var _ = Describe("statusFor", func() { Expect(statusFor(obj)).To(Equal(&helmAppStatus{})) }) }) + +var _ = Describe("tryRefreshObject", func() { + var ( + cl client.Client + u Updater + obj *unstructured.Unstructured + current *unstructured.Unstructured + externalCondition map[string]interface{} + externalConditionTypes map[string]struct{} + nonExternalCondition map[string]interface{} + // nonExternalConditionB map[string]interface{} + ) + + BeforeEach(func() { + externalCondition = map[string]interface{}{ + "type": "ExternalCondition", + "status": string(corev1.ConditionTrue), + "reason": "ExternallyManaged", + } + externalConditionTypes = map[string]struct{}{ + externalCondition["type"].(string): {}, + } + nonExternalCondition = map[string]interface{}{ + "type": "Deployed", + "status": string(corev1.ConditionTrue), + "reason": "InitialDeployment", + } + // nonExternalConditionB = map[string]interface{}{ + // "type": "Foo", + // "status": string(corev1.ConditionTrue), + // "reason": "Success", + // } + + // Setup obj with initial state (version 100). + obj = &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "example.com/v1", + "kind": "TestResource", + "metadata": map[string]interface{}{ + "name": "test-obj", + "namespace": "default", + "resourceVersion": "100", + }, + "status": map[string]interface{}{}, + }, + } + + // Setup current with updated state (version 101). + current = &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "example.com/v1", + "kind": "TestResource", + "metadata": map[string]interface{}{ + "name": "test-obj", + "namespace": "default", + "resourceVersion": "101", + }, + "status": map[string]interface{}{}, + }, + } + }) + + When("finalizers are updated in the reconciler flow", func() { + BeforeEach(func() { + err := unstructured.SetNestedStringSlice(obj.Object, []string{"finalizerA, finalizerB"}, "metadata", "finalizers") + Expect(err).ToNot(HaveOccurred()) + err = unstructured.SetNestedStringSlice(current.Object, []string{"finalizerA, finalizerC"}, "metadata", "finalizers") + Expect(err).ToNot(HaveOccurred()) + + cl = fake.NewClientBuilder().WithObjects(current).Build() + u = New(cl, logr.Discard()) + }) + + It("should preserve these changes", func() { + resolved, err := u.tryRefreshObject(context.Background(), obj) + Expect(err).ToNot(HaveOccurred()) + Expect(resolved).To(BeTrue()) + Expect(obj.GetResourceVersion()).To(Equal("101")) + + // Verify finalizers were merged + finalizers := obj.GetFinalizers() + Expect(finalizers).To(Equal([]string{"finalizerA, finalizerB"})) + }) + }) + + When("only external conditions differ", func() { + BeforeEach(func() { + obj.Object["status"] = map[string]interface{}{ + "conditions": []interface{}{nonExternalCondition}, + } + current.Object["status"] = map[string]interface{}{ + "conditions": []interface{}{nonExternalCondition, externalCondition}, + } + + cl = fake.NewClientBuilder().WithObjects(current).Build() + u = New(cl, logr.Discard()) + u.RegisterExternallyManagedStatusConditions(externalConditionTypes) + }) + + It("should merge and return resolved", func() { + resolved, err := u.tryRefreshObject(context.Background(), obj) + + Expect(err).ToNot(HaveOccurred()) + Expect(resolved).To(BeTrue()) + Expect(obj.GetResourceVersion()).To(Equal("101")) + + // Verify external condition was merged + conditions, ok, _ := unstructured.NestedSlice(obj.Object, "status", "conditions") + Expect(ok).To(BeTrue()) + Expect(conditions).To(HaveLen(2)) + + // Verify ordering (Deployed first, ExternalCondition second from current) + Expect(conditions[0].(map[string]interface{})["type"]).To(Equal("Deployed")) + Expect(conditions[1].(map[string]interface{})["type"]).To(Equal("ExternalCondition")) + }) + }) + + // When("no external conditions are configured", func() { + // BeforeEach(func() { + // cl = fake.NewClientBuilder().Build() + // u = New(cl, logr.Discard()) + // }) + + // It("should return early without calling Get", func() { + // resolved, err := u.tryRefreshObject(context.Background(), obj) + // Expect(err).ToNot(HaveOccurred()) + // Expect(resolved).To(BeFalse()) + // }) + // }) + + When("Get returns an error", func() { + BeforeEach(func() { + // Empty client - Get will return NotFound + cl = fake.NewClientBuilder().Build() + u = New(cl, logr.Discard()) + u.RegisterExternallyManagedStatusConditions(externalConditionTypes) + }) + + It("should return the error", func() { + resolved, err := u.tryRefreshObject(context.Background(), obj) + Expect(err).To(HaveOccurred()) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + Expect(resolved).To(BeFalse()) + }) + }) +}) diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 98b07fc..065c744 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -80,6 +80,7 @@ type Reconciler struct { gvk *schema.GroupVersionKind chrt *chart.Chart selectorPredicate predicate.Predicate + customPredicates []predicate.Predicate overrideValues map[string]string skipDependentWatches bool extraWatchSources []source.Source @@ -99,6 +100,9 @@ type Reconciler struct { upgradeAnnotations map[string]annotation.Upgrade uninstallAnnotations map[string]annotation.Uninstall pauseReconcileAnnotation string + + externallyManagedStatusConditions map[string]struct{} + enableAggressiveConflictResolution bool } // New creates a new Reconciler that reconciles custom resources that define a @@ -593,6 +597,20 @@ func WithSelector(s metav1.LabelSelector) Option { } } +// WithPredicate is an Option that adds a custom predicate to filter reconciliation events. +// Multiple predicates can be added and will be combined using AND logic. +// This is useful for filtering out unnecessary reconciliations, such as status-only updates. +// +// Example: +// +// reconciler.WithPredicate(predicate.GenerationChangedPredicate{}) +func WithPredicate(p predicate.Predicate) Option { + return func(r *Reconciler) error { + r.customPredicates = append(r.customPredicates, p) + return nil + } +} + // WithControllerSetupFunc is an Option that allows customizing a controller before it is started. // The only supported customization here is adding additional Watch sources to the controller. func WithControllerSetupFunc(f ControllerSetupFunc) Option { @@ -602,6 +620,25 @@ func WithControllerSetupFunc(f ControllerSetupFunc) Option { } } +func WithExternallyManagedStatusConditions(conditions ...string) Option { + return func(r *Reconciler) error { + if r.externallyManagedStatusConditions == nil { + r.externallyManagedStatusConditions = make(map[string]struct{}, len(conditions)) + } + for _, c := range conditions { + r.externallyManagedStatusConditions[c] = struct{}{} + } + return nil + } +} + +func WithAggressiveConflictResolution(enabled bool) Option { + return func(r *Reconciler) error { + r.enableAggressiveConflictResolution = enabled + return nil + } +} + // ControllerSetup allows restricted access to the Controller using the WithControllerSetupFunc option. // Currently, the only supposed configuration is adding additional watchers do the controller. type ControllerSetup interface { @@ -641,7 +678,7 @@ type ControllerSetupFunc func(c ControllerSetup) error // - Deployed - a release for this CR is deployed (but not necessarily ready). // - ReleaseFailed - an installation or upgrade failed. // - Irreconcilable - an error occurred during reconciliation -func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, err error) { +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { log := r.log.WithValues(strings.ToLower(r.gvk.Kind), req.NamespacedName) log.V(1).Info("Reconciliation triggered") @@ -667,18 +704,27 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re // This is a safety measure to ensure that the chart is fully uninstalled before the CR is deleted. if obj.GetDeletionTimestamp() == nil && !controllerutil.ContainsFinalizer(obj, uninstallFinalizer) { log.V(1).Info("Adding uninstall finalizer.") + patch := client.MergeFrom(obj.DeepCopy()) obj.SetFinalizers(append(obj.GetFinalizers(), uninstallFinalizer)) - if err := r.client.Update(ctx, obj); err != nil { + if err := r.client.Patch(ctx, obj, patch); err != nil { return ctrl.Result{}, errs.Wrapf(err, "failed to add uninstall finalizer to %s/%s", req.NamespacedName.Namespace, req.NamespacedName.Name) } } - u := updater.New(r.client) + u := updater.New(r.client, log) + u.RegisterExternallyManagedStatusConditions(r.externallyManagedStatusConditions) + if r.enableAggressiveConflictResolution { + u.EnableAggressiveConflictResolution() + } defer func() { applyErr := u.Apply(ctx, obj) if err == nil && !apierrors.IsNotFound(applyErr) { err = applyErr } + if err != nil { + // Otherwise controller-runtime will log a warning. + result = ctrl.Result{} + } }() if r.pauseReconcileAnnotation != "" { @@ -861,7 +907,11 @@ func (r *Reconciler) handleDeletion(ctx context.Context, actionClient helmclient // and we need to be able to update the conditions on the CR to // indicate that the uninstall failed. if err := func() (err error) { - uninstallUpdater := updater.New(r.client) + uninstallUpdater := updater.New(r.client, log) + uninstallUpdater.RegisterExternallyManagedStatusConditions(r.externallyManagedStatusConditions) + if r.enableAggressiveConflictResolution { + uninstallUpdater.EnableAggressiveConflictResolution() + } defer func() { applyErr := uninstallUpdater.Apply(ctx, obj) if err == nil { @@ -1155,10 +1205,11 @@ func (r *Reconciler) setupWatches(mgr ctrl.Manager, c controller.Controller) err obj := &unstructured.Unstructured{} obj.SetGroupVersionKind(*r.gvk) - var preds []predicate.Predicate + preds := make([]predicate.Predicate, 0, len(r.customPredicates)+1) if r.selectorPredicate != nil { preds = append(preds, r.selectorPredicate) } + preds = append(preds, r.customPredicates...) if err := c.Watch( source.Kind(