diff --git a/controllers/om/replicaset/om_replicaset.go b/controllers/om/replicaset/om_replicaset.go index 09ce05162..4634d8c59 100644 --- a/controllers/om/replicaset/om_replicaset.go +++ b/controllers/om/replicaset/om_replicaset.go @@ -9,8 +9,6 @@ import ( mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb" "github.com/mongodb/mongodb-kubernetes/controllers/om" "github.com/mongodb/mongodb-kubernetes/controllers/om/process" - "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/scale" - "github.com/mongodb/mongodb-kubernetes/pkg/dns" ) // BuildFromStatefulSet returns a replica set that can be set in the Automation Config @@ -65,36 +63,7 @@ func PrepareScaleDownFromMap(omClient om.Connection, rsMembers map[string][]stri log.Debugw("Marked replica set members as non-voting", "replica set with members", rsMembers) } - // TODO practice shows that automation agents can get stuck on setting db to "disabled" also it seems that this process - // works correctly without explicit disabling - feel free to remove this code after some time when it is clear - // that everything works correctly without disabling - - // Stage 2. Set disabled to true - //err = omClient.ReadUpdateDeployment( - // func(d om.Deployment) error { - // d.DisableProcesses(allProcesses) - // return nil - // }, - //) - // - //if err != nil { - // return errors.New(fmt.Sprintf("Unable to set disabled to true, hosts: %v, err: %w", allProcesses, err)) - //} - //log.Debugw("Disabled processes", "processes", allProcesses) - log.Infow("Performed some preliminary steps to support scale down", "hosts", processes) return nil } - -func PrepareScaleDownFromStatefulSet(omClient om.Connection, statefulSet appsv1.StatefulSet, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error { - _, podNames := dns.GetDnsForStatefulSetReplicasSpecified(statefulSet, rs.Spec.GetClusterDomain(), rs.Status.Members, nil) - podNames = podNames[scale.ReplicasThisReconciliation(rs):rs.Status.Members] - - if len(podNames) != 1 { - return xerrors.Errorf("dev error: the number of members being scaled down was > 1, scaling more than one member at a time is not possible! %s", podNames) - } - - log.Debugw("Setting votes to 0 for members", "members", podNames) - return PrepareScaleDownFromMap(omClient, map[string][]string{rs.Name: podNames}, podNames, log) -} diff --git a/controllers/operator/common_controller.go b/controllers/operator/common_controller.go index b4a289510..8356d585a 100644 --- a/controllers/operator/common_controller.go +++ b/controllers/operator/common_controller.go @@ -5,6 +5,7 @@ import ( "encoding/json" "encoding/pem" "fmt" + "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/scale" "reflect" "github.com/blang/semver" @@ -865,7 +866,7 @@ func publishAutomationConfigFirst(ctx context.Context, getter kubernetesClient.C return true } - if opts.Replicas < int(*currentSts.Spec.Replicas) { + if scale.IsScalingDown(&mdb) { log.Debug("Scaling down operation. automationConfig needs to be updated first") return true } diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index 028753228..65251384a 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -98,6 +98,7 @@ func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imag // Reconcile reads that state of the cluster for a MongoDbReplicaSet object and makes changes based on the state read // and what is in the MongoDbReplicaSet.Spec func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, e error) { + // ==== 1. Initial checks and setup ==== log := zap.S().With("ReplicaSet", request.NamespacedName) rs := &mdbv1.MongoDB{} @@ -145,6 +146,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco return r.updateStatus(ctx, rs, status, log) } + // ==== 2. Reconcile Certificates ==== status := certs.EnsureSSLCertsForStatefulSet(ctx, r.SecretClient, r.SecretClient, *rs.Spec.Security, certs.ReplicaSetConfig(*rs), log) if !status.IsOK() { return r.updateStatus(ctx, rs, status, log) @@ -156,6 +158,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco return r.updateStatus(ctx, rs, workflow.Pending("%s", err.Error()), log) } + // ==== 3. Reconcile Features and Authentication (Vault?) ==== if status := controlledfeature.EnsureFeatureControls(*rs, conn, conn.OpsManagerVersion(), log); !status.IsOK() { return r.updateStatus(ctx, rs, status, log) } @@ -171,61 +174,14 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco return r.updateStatus(ctx, rs, status, log) } - rsCertsConfig := certs.ReplicaSetConfig(*rs) - - var vaultConfig vault.VaultConfiguration - var databaseSecretPath string - if r.VaultClient != nil { - vaultConfig = r.VaultClient.VaultConfig - databaseSecretPath = r.VaultClient.DatabaseSecretPath() - } - - var automationAgentVersion string - if architectures.IsRunningStaticArchitecture(rs.Annotations) { - // In case the Agent *is* overridden, its version will be merged into the StatefulSet. The merging process - // happens after creating the StatefulSet definition. - if !rs.IsAgentImageOverridden() { - automationAgentVersion, err = r.getAgentVersion(conn, conn.OpsManagerVersion().VersionString, false, log) - if err != nil { - log.Errorf("Impossible to get agent version, please override the agent image by providing a pod template") - status := workflow.Failed(xerrors.Errorf("Failed to get agent version: %w", err)) - return r.updateStatus(ctx, rs, status, log) - } - } + // ==== 4. Replicaset Construction ==== + stsOpts, sts, err := r.getRsAndStsConfigs(ctx, rs, log, conn, projectConfig, currentAgentAuthMode, prometheusCertHash) + if err != nil { + return r.updateStatus(ctx, rs, workflow.Failed(err), log) } - rsConfig := construct.ReplicaSetOptions( - PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)), - CurrentAgentAuthMechanism(currentAgentAuthMode), - CertificateHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.CertSecretName, databaseSecretPath, log)), - InternalClusterHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.InternalClusterSecretName, databaseSecretPath, log)), - PrometheusTLSCertHash(prometheusCertHash), - WithVaultConfig(vaultConfig), - WithLabels(rs.Labels), - WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()), - WithInitDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.InitDatabaseImageUrlEnv, r.initDatabaseNonStaticImageVersion)), - WithDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.NonStaticDatabaseEnterpriseImage, r.databaseNonStaticImageVersion)), - WithAgentImage(images.ContainerImage(r.imageUrls, architectures.MdbAgentImageRepo, automationAgentVersion)), - WithMongodbImage(images.GetOfficialImage(r.imageUrls, rs.Spec.Version, rs.GetAnnotations())), - ) - + // ==== 5. Recovery logic ==== caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath) - - if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)), log) - } - - sts := construct.DatabaseStatefulSet(*rs, rsConfig, log) - if status := ensureRoles(rs.Spec.GetSecurity().Roles, conn, log); !status.IsOK() { - return r.updateStatus(ctx, rs, status, log) - } - - if scale.ReplicasThisReconciliation(rs) < rs.Status.Members { - if err := replicaset.PrepareScaleDownFromStatefulSet(conn, sts, rs, log); err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log) - } - } - agentCertSecretName := rs.GetSecurity().AgentClientCertificateSecretName(rs.Name).Name agentCertSecretName += certs.OperatorGeneratedCertSuffix @@ -235,20 +191,21 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) { log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition) automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") - deploymentError := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log) - if deploymentError != nil { - log.Errorf("Recovery failed because of deployment errors, %w", deploymentError) + if reconcileStatus := r.reconcileMemberResources(ctx, rs, sts, stsOpts, log, conn); !reconcileStatus.IsOK() { + log.Errorf("Recovery failed because of reconcile errors, %v", reconcileStatus) } if !automationConfigStatus.IsOK() { log.Errorf("Recovery failed because of Automation Config update errors, %v", automationConfigStatus) } } + // ==== 6. Actual reconciliation: OM updates and Kubernetes resources updates ==== lastSpec, err := rs.GetLastSpec() if err != nil { lastSpec = &mdbv1.MongoDbSpec{} } - status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, rsConfig, log), + + status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, stsOpts, log), func() workflow.Status { return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") }, @@ -261,8 +218,8 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco _, _ = r.updateStatus(ctx, rs, workflow.Pending(""), log, workflowStatus.StatusOptions()...) } - if err := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log); err != nil { - return workflow.Failed(xerrors.Errorf("Failed to create/update (Kubernetes reconciliation phase): %w", err)) + if reconcileStatus := r.reconcileMemberResources(ctx, rs, sts, stsOpts, log, conn); !reconcileStatus.IsOK() { + return reconcileStatus } if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, rs.Name, r.client); !status.IsOK() { @@ -277,6 +234,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco return r.updateStatus(ctx, rs, status, log) } + // ==== 7. Final steps ==== if scale.IsStillScaling(rs) { return r.updateStatus(ctx, rs, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), log, mdbstatus.MembersOption(rs)) } @@ -308,6 +266,79 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco return r.updateStatus(ctx, rs, workflow.OK(), log, mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) } +// reconcileMemberResources handles the synchronization of kubernetes resources, which can be statefulsets, services etc. +// All the resources required in the k8s cluster (as opposed to the automation config) for creating the replicaset +// should be reconciled in this method. +func (r *ReconcileMongoDbReplicaSet) reconcileMemberResources(ctx context.Context, rs *mdbv1.MongoDB, sts appsv1.StatefulSet, stsOpts func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, log *zap.SugaredLogger, conn om.Connection) workflow.Status { + + if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil { + return workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)) + } + + if status := ensureRoles(rs.GetSecurity().Roles, conn, log); !status.IsOK() { + return status + } + + return r.reconcileStatefulSet(ctx, rs, sts, stsOpts, log) +} + +func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, rs *mdbv1.MongoDB, sts appsv1.StatefulSet, stsOpts func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, log *zap.SugaredLogger) workflow.Status { + + if err := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, stsOpts, log); err != nil { + return workflow.Failed(xerrors.Errorf("Failed to create/update (Kubernetes reconciliation phase): %w", err)) + } + + if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, rs.Name, r.client); !status.IsOK() { + return status + } + return workflow.OK() +} + +func (r *ReconcileMongoDbReplicaSet) getRsAndStsConfigs(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger, conn om.Connection, projectConfig mdbv1.ProjectConfig, currentAgentAuthMode string, prometheusCertHash string) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, appsv1.StatefulSet, error) { + + rsCertsConfig := certs.ReplicaSetConfig(*rs) + + var vaultConfig vault.VaultConfiguration + var databaseSecretPath string + if r.VaultClient != nil { + vaultConfig = r.VaultClient.VaultConfig + databaseSecretPath = r.VaultClient.DatabaseSecretPath() + } + + // Database container and versions configuration + var automationAgentVersion string + var err error + if architectures.IsRunningStaticArchitecture(rs.Annotations) { + // In case the Agent *is* overridden, its version will be merged into the StatefulSet. The merging process + // happens after creating the StatefulSet definition. + if !rs.IsAgentImageOverridden() { + automationAgentVersion, err = r.getAgentVersion(conn, conn.OpsManagerVersion().VersionString, false, log) + if err != nil { + return nil, appsv1.StatefulSet{}, xerrors.Errorf("Impossible to get agent version, please override the agent image by providing a pod template: %w", err) + } + } + } + + opts := construct.ReplicaSetOptions( + PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)), + CurrentAgentAuthMechanism(currentAgentAuthMode), + CertificateHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.CertSecretName, databaseSecretPath, log)), + InternalClusterHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.InternalClusterSecretName, databaseSecretPath, log)), + PrometheusTLSCertHash(prometheusCertHash), + WithVaultConfig(vaultConfig), + WithLabels(rs.Labels), + WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()), + WithInitDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.InitDatabaseImageUrlEnv, r.initDatabaseNonStaticImageVersion)), + WithDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.NonStaticDatabaseEnterpriseImage, r.databaseNonStaticImageVersion)), + WithAgentImage(images.ContainerImage(r.imageUrls, architectures.MdbAgentImageRepo, automationAgentVersion)), + WithMongodbImage(images.GetOfficialImage(r.imageUrls, rs.Spec.Version, rs.GetAnnotations())), + ) + + sts := construct.DatabaseStatefulSet(*rs, opts, log) + + return opts, sts, nil +} + func getHostnameOverrideConfigMapForReplicaset(mdb mdbv1.MongoDB) corev1.ConfigMap { data := make(map[string]string) @@ -408,7 +439,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c // Only "concrete" RS members should be observed // - if scaling down, let's observe only members that will remain after scale-down operation // - if scaling up, observe only current members, because new ones might not exist yet - err := agents.WaitForRsAgentsToRegister(set, util_int.Min(membersNumberBefore, int(*set.Spec.Replicas)), rs.Spec.GetClusterDomain(), conn, log, rs) + err := agents.WaitForRsAgentsToRegister(set, util_int.Min(membersNumberBefore, scale.ReplicasThisReconciliation(rs)), rs.Spec.GetClusterDomain(), conn, log, rs) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -426,7 +457,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c // TLS to be completely disabled first. updatedMembers = membersNumberBefore } else { - updatedMembers = int(*set.Spec.Replicas) + updatedMembers = scale.ReplicasThisReconciliation(rs) } replicaSet := replicaset.BuildFromStatefulSetWithReplicas(r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, set, rs.GetSpec(), updatedMembers, rs.CalculateFeatureCompatibilityVersion())