Skip to content

fix(ha): make more resilient with a job #1858

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions cmd/installer/cli/enable_ha.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package cli

import (
"context"
"fmt"
"os"

"github.com/replicatedhq/embedded-cluster/pkg/addons"
"github.com/replicatedhq/embedded-cluster/pkg/helm"
"github.com/replicatedhq/embedded-cluster/pkg/kubeutils"
"github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig"
rcutil "github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig/util"
"github.com/replicatedhq/embedded-cluster/pkg/versions"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

// EnableHACmd is the command for enabling HA mode.
func EnableHACmd(ctx context.Context, name string) *cobra.Command {
cmd := &cobra.Command{
Use: "enable-ha",
Short: fmt.Sprintf("Enable high availability for the %s cluster", name),
PreRunE: func(cmd *cobra.Command, args []string) error {
if os.Getuid() != 0 {
return fmt.Errorf("enable-ha command must be run as root")
}

rcutil.InitBestRuntimeConfig(cmd.Context())

os.Setenv("KUBECONFIG", runtimeconfig.PathToKubeConfig())
os.Setenv("TMPDIR", runtimeconfig.EmbeddedClusterTmpSubDir())

return nil
},
PostRun: func(cmd *cobra.Command, args []string) {
runtimeconfig.Cleanup()
},
RunE: func(cmd *cobra.Command, args []string) error {
if err := runEnableHA(cmd.Context()); err != nil {
return err
}

return nil
},
}

return cmd
}

func runEnableHA(ctx context.Context) error {
kcli, err := kubeutils.KubeClient()
if err != nil {
return fmt.Errorf("unable to get kube client: %w", err)
}

canEnableHA, reason, err := addons.CanEnableHA(ctx, kcli)
if err != nil {
return fmt.Errorf("unable to check if HA can be enabled: %w", err)
}
if !canEnableHA {
logrus.Warnf("High availability cannot be enabled: %s", reason)
return NewErrorNothingElseToAdd(fmt.Errorf("high availability cannot be enabled: %s", reason))
}

in, err := kubeutils.GetLatestInstallation(ctx, kcli)
if err != nil {
return fmt.Errorf("unable to get latest installation: %w", err)
}

airgapChartsPath := ""
if in.Spec.AirGap {
airgapChartsPath = runtimeconfig.EmbeddedClusterChartsSubDir()
}

hcli, err := helm.NewClient(helm.HelmOptions{
KubeConfig: runtimeconfig.PathToKubeConfig(),
K0sVersion: versions.K0sVersion,
AirgapPath: airgapChartsPath,
})
if err != nil {
return fmt.Errorf("unable to create helm client: %w", err)
}
defer hcli.Close()

return addons.EnableHA(ctx, kcli, hcli, in.Spec.AirGap, in.Spec.Network.ServiceCIDR, in.Spec.Proxy, in.Spec.Config)
}
3 changes: 0 additions & 3 deletions cmd/installer/cli/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ type InstallCmdFlags struct {
}

// InstallCmd returns a cobra command for installing the embedded cluster.
// This is the upcoming version of install without the operator and where
// install does all of the work. This is a hidden command until it's tested
// and ready.
func InstallCmd(ctx context.Context, name string) *cobra.Command {
var flags InstallCmdFlags

Expand Down
34 changes: 16 additions & 18 deletions cmd/installer/cli/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ type JoinCmdFlags struct {
ignoreHostPreflights bool
}

// This is the upcoming version of join without the operator and where
// join does all of the work. This is a hidden command until it's tested
// and ready.
// JoinCmd returns a cobra command for joining a node to the cluster.
func JoinCmd(ctx context.Context, name string) *cobra.Command {
var flags JoinCmdFlags

Expand Down Expand Up @@ -202,22 +200,22 @@ func runJoin(ctx context.Context, name string, flags JoinCmdFlags, jcmd *kotsadm
return nil
}

airgapChartsPath := ""
if flags.isAirgap {
airgapChartsPath = runtimeconfig.EmbeddedClusterChartsSubDir()
}
if flags.enableHighAvailability {
airgapChartsPath := ""
if flags.isAirgap {
airgapChartsPath = runtimeconfig.EmbeddedClusterChartsSubDir()
}

hcli, err := helm.NewClient(helm.HelmOptions{
KubeConfig: runtimeconfig.PathToKubeConfig(),
K0sVersion: versions.K0sVersion,
AirgapPath: airgapChartsPath,
})
if err != nil {
return fmt.Errorf("unable to create helm client: %w", err)
}
defer hcli.Close()
hcli, err := helm.NewClient(helm.HelmOptions{
KubeConfig: runtimeconfig.PathToKubeConfig(),
K0sVersion: versions.K0sVersion,
AirgapPath: airgapChartsPath,
})
if err != nil {
return fmt.Errorf("unable to create helm client: %w", err)
}
defer hcli.Close()

if flags.enableHighAvailability {
if err := maybeEnableHA(ctx, kcli, hcli, flags.isAirgap, cidrCfg.ServiceCIDR, jcmd.InstallationSpec.Proxy, jcmd.InstallationSpec.Config); err != nil {
return fmt.Errorf("unable to enable high availability: %w", err)
}
Expand Down Expand Up @@ -494,7 +492,7 @@ func waitForNodeToJoin(ctx context.Context, kcli client.Client, hostname string,
}

func maybeEnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirgap bool, serviceCIDR string, proxy *ecv1beta1.ProxySpec, cfgspec *ecv1beta1.ConfigSpec) error {
canEnableHA, err := addons.CanEnableHA(ctx, kcli)
canEnableHA, _, err := addons.CanEnableHA(ctx, kcli)
if err != nil {
return fmt.Errorf("unable to check if HA can be enabled: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/installer/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func RootCmd(ctx context.Context, name string) *cobra.Command {
cmd.AddCommand(JoinCmd(ctx, name))
cmd.AddCommand(ShellCmd(ctx, name))
cmd.AddCommand(NodeCmd(ctx, name))
cmd.AddCommand(EnableHACmd(ctx, name))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this is nested under the root

cmd.AddCommand(VersionCmd(ctx, name))
cmd.AddCommand(ResetCmd(ctx, name))
cmd.AddCommand(MaterializeCmd(ctx, name))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ require (
github.com/vmware-tanzu/velero v1.15.2
go.uber.org/multierr v1.11.0
golang.org/x/crypto v0.33.0
golang.org/x/sync v0.11.0
golang.org/x/term v0.29.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -272,6 +271,7 @@ require (
go.opentelemetry.io/otel/trace v1.34.0 // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
golang.org/x/mod v0.23.0 // indirect
golang.org/x/sync v0.11.0 // indirect
golang.org/x/tools v0.28.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/api v0.197.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/addons/embeddedclusteroperator/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package embeddedclusteroperator

import (
"context"
"log/slog"

"github.com/pkg/errors"
"github.com/replicatedhq/embedded-cluster/pkg/helm"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -15,7 +15,7 @@ func (e *EmbeddedClusterOperator) Upgrade(ctx context.Context, kcli client.Clien
return errors.Wrap(err, "check if release exists")
}
if !exists {
slog.Info("Release not found, installing", "release", releaseName, "namespace", namespace)
logrus.Debugf("Release not found, installing release %s in namespace %s", releaseName, namespace)
if err := e.Install(ctx, kcli, hcli, overrides, nil); err != nil {
return errors.Wrap(err, "install")
}
Expand Down
110 changes: 76 additions & 34 deletions pkg/addons/highavailability.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1"
"github.com/replicatedhq/embedded-cluster/pkg/addons/adminconsole"
"github.com/replicatedhq/embedded-cluster/pkg/addons/registry"
registrymigrate "github.com/replicatedhq/embedded-cluster/pkg/addons/registry/migrate"
"github.com/replicatedhq/embedded-cluster/pkg/addons/seaweedfs"
"github.com/replicatedhq/embedded-cluster/pkg/constants"
"github.com/replicatedhq/embedded-cluster/pkg/helm"
Expand All @@ -15,32 +16,34 @@ import (
"github.com/replicatedhq/embedded-cluster/pkg/spinner"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// CanEnableHA checks if high availability can be enabled in the cluster.
func CanEnableHA(ctx context.Context, kcli client.Client) (bool, error) {
func CanEnableHA(ctx context.Context, kcli client.Client) (bool, string, error) {
in, err := kubeutils.GetLatestInstallation(ctx, kcli)
if err != nil {
return false, errors.Wrap(err, "get latest installation")
return false, "", errors.Wrap(err, "get latest installation")
}
if in.Spec.HighAvailability {
return false, nil
return false, "already enabled", nil
}

if err := kcli.Get(ctx, types.NamespacedName{Name: constants.EcRestoreStateCMName, Namespace: "embedded-cluster"}, &corev1.ConfigMap{}); err == nil {
return false, nil // cannot enable HA during a restore
} else if !k8serrors.IsNotFound(err) {
return false, errors.Wrap(err, "get restore state configmap")
return false, "a restore is in progress", nil
} else if client.IgnoreNotFound(err) != nil {
return false, "", errors.Wrap(err, "get restore state configmap")
}

ncps, err := kubeutils.NumOfControlPlaneNodes(ctx, kcli)
if err != nil {
return false, errors.Wrap(err, "check control plane nodes")
return false, "", errors.Wrap(err, "check control plane nodes")
}
return ncps >= 3, nil
if ncps < 3 {
return false, "number of control plane nodes is less than 3", nil
}
return true, "", nil
}

// EnableHA enables high availability.
Expand All @@ -60,36 +63,45 @@ func EnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirga
ServiceCIDR: serviceCIDR,
ProxyRegistryDomain: domains.ProxyRegistryDomain,
}
exists, err := hcli.ReleaseExists(ctx, sw.Namespace(), sw.ReleaseName())
if err != nil {
return errors.Wrap(err, "check if seaweedfs release exists")
}
if !exists {
logrus.Debugf("Installing seaweedfs")
if err := sw.Install(ctx, kcli, hcli, addOnOverrides(sw, cfgspec, nil), nil); err != nil {
return errors.Wrap(err, "install seaweedfs")
}
logrus.Debugf("Seaweedfs installed!")
} else {
logrus.Debugf("Seaweedfs already installed")

logrus.Debugf("Maybe removing existing seaweedfs")
if err := sw.Uninstall(ctx, kcli); err != nil {
return errors.Wrap(err, "uninstall seaweedfs")
}

// TODO (@salah): add support for end user overrides
reg := &registry.Registry{
ServiceCIDR: serviceCIDR,
ProxyRegistryDomain: domains.ProxyRegistryDomain,
IsHA: true,
logrus.Debugf("Installing seaweedfs")
if err := sw.Install(ctx, kcli, hcli, addOnOverrides(sw, cfgspec, nil), nil); err != nil {
return errors.Wrap(err, "install seaweedfs")
}
logrus.Debugf("Migrating registry data")
if err := reg.Migrate(ctx, kcli, loading); err != nil {
return errors.Wrap(err, "migrate registry data")
logrus.Debugf("Seaweedfs installed!")

// TODO: timeout

loading.Infof("Migrating data for high availability")
logrus.Debugf("Migrating data for high availability")

progressCh := make(chan registrymigrate.Progress)
errCh := make(chan error, 1)

go func() {
errCh <- registrymigrate.RegistryData(ctx, kcli, progressCh)
close(errCh)
}()

err := waitForDataMigrationAndLogProgress(loading, progressCh, errCh)
if err != nil {
return errors.Wrap(err, "registry data migration job failed")
}
logrus.Debugf("Registry migration complete!")
logrus.Debugf("Upgrading registry")
if err := reg.Upgrade(ctx, kcli, hcli, addOnOverrides(reg, cfgspec, nil)); err != nil {
return errors.Wrap(err, "upgrade registry")

logrus.Debugf("Data migration complete!")

loading.Infof("Enabling registry high availability")
logrus.Debugf("Enabling registry high availability")
err = enableRegistryHA(ctx, kcli, hcli, serviceCIDR, cfgspec)
if err != nil {
return errors.Wrap(err, "enable registry high availability")
}
logrus.Debugf("Registry upgraded!")
logrus.Debugf("Registry high availability enabled!")
}

loading.Infof("Updating the Admin Console for high availability")
Expand Down Expand Up @@ -117,6 +129,23 @@ func EnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirga
return nil
}

// enableRegistryHA scales the registry deployment to the desired number of replicas.
func enableRegistryHA(ctx context.Context, kcli client.Client, hcli helm.Client, serviceCIDR string, cfgspec *ecv1beta1.ConfigSpec) error {
domains := runtimeconfig.GetDomains(cfgspec)

// TODO (@salah): add support for end user overrides
r := &registry.Registry{
ServiceCIDR: serviceCIDR,
ProxyRegistryDomain: domains.ProxyRegistryDomain,
IsHA: true,
}
if err := r.Upgrade(ctx, kcli, hcli, addOnOverrides(r, cfgspec, nil)); err != nil {
return errors.Wrap(err, "upgrade registry")
}

return nil
}

// EnableAdminConsoleHA enables high availability for the admin console.
func EnableAdminConsoleHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirgap bool, serviceCIDR string, proxy *ecv1beta1.ProxySpec, cfgspec *ecv1beta1.ConfigSpec) error {
domains := runtimeconfig.GetDomains(cfgspec)
Expand All @@ -137,3 +166,16 @@ func EnableAdminConsoleHA(ctx context.Context, kcli client.Client, hcli helm.Cli

return nil
}

func waitForDataMigrationAndLogProgress(progressWriter *spinner.MessageWriter, progressCh <-chan registrymigrate.Progress, errCh <-chan error) error {
for {
select {
case err := <-errCh:
return err
case progress := <-progressCh:
percent := progress.Current * 100 / progress.Total
logrus.Debugf("Migrating data for high availability (%d%%)", percent)
progressWriter.Infof("Migrating data for high availability (%d%%)", percent)
}
}
}
Loading
Loading