Skip to content

Commit

Permalink
Add configwatcher replacement (#351)
Browse files Browse the repository at this point in the history
* Add configwatcher replacement

* run go mod tidy and go work sync

* capture unchecked error on termination and fix other linter issues

* uncomment k3d log output lines

* re-run go mod tidy after rebase
  • Loading branch information
andrewstucki authored Dec 11, 2024
1 parent 26ef33c commit 5ce84b8
Show file tree
Hide file tree
Showing 5 changed files with 435 additions and 22 deletions.
52 changes: 39 additions & 13 deletions operator/cmd/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/redpanda-data/redpanda-operator/operator/internal/configwatcher"
"github.com/redpanda-data/redpanda-operator/operator/internal/decommissioning"
)

Expand All @@ -39,6 +40,9 @@ func Command() *cobra.Command {
decommissionVoteInterval time.Duration
decommissionMaxVoteCount int
redpandaYAMLPath string
usersDirectoryPath string
watchUsers bool
runDecommissioner bool
)

cmd := &cobra.Command{
Expand All @@ -58,6 +62,9 @@ func Command() *cobra.Command {
decommissionVoteInterval,
decommissionMaxVoteCount,
redpandaYAMLPath,
usersDirectoryPath,
watchUsers,
runDecommissioner,
)
},
}
Expand All @@ -71,6 +78,9 @@ func Command() *cobra.Command {
cmd.Flags().DurationVar(&decommissionVoteInterval, "decommission-vote-interval", 30*time.Second, "The time period between incrementing decommission vote counts since the last decommission conditions were met.")
cmd.Flags().IntVar(&decommissionMaxVoteCount, "decommission-vote-count", 2, "The number of times that a vote must be tallied when a resource meets decommission conditions for it to actually be decommissioned.")
cmd.Flags().StringVar(&redpandaYAMLPath, "redpanda-yaml", "/etc/redpanda/redpanda.yaml", "Path to redpanda.yaml whose rpk stanza will be used for connecting to a Redpanda cluster.")
cmd.Flags().BoolVar(&watchUsers, "watch-users", false, "Specifies if the sidecar should watch and configure superusers based on a mounted users file.")
cmd.Flags().StringVar(&usersDirectoryPath, "users-directory", "/etc/secrets/users/", "Path to users directory where secrets are mounted.")
cmd.Flags().BoolVar(&runDecommissioner, "run-decommissioner", false, "Specifies if the sidecar should run the broker decommissioner.")

return cmd
}
Expand All @@ -86,6 +96,9 @@ func Run(
decommissionVoteInterval time.Duration,
decommissionMaxVoteCount int,
redpandaYAMLPath string,
usersDirectoryPath string,
watchUsers bool,
runDecommissioner bool,
) error {
setupLog := ctrl.LoggerFrom(ctx).WithName("setup")

Expand Down Expand Up @@ -121,20 +134,33 @@ func Run(
return err
}

fetcher := decommissioning.NewChainedFetcher(
// prefer RPK profile first and then move on to fetch from helm values
decommissioning.NewRPKProfileFetcher(redpandaYAMLPath),
decommissioning.NewHelmFetcher(mgr),
)
if runDecommissioner {
fetcher := decommissioning.NewChainedFetcher(
// prefer RPK profile first and then move on to fetch from helm values
decommissioning.NewRPKProfileFetcher(redpandaYAMLPath),
decommissioning.NewHelmFetcher(mgr),
)

if err := decommissioning.NewStatefulSetDecommissioner(mgr, fetcher, []decommissioning.Option{
decommissioning.WithFilter(decommissioning.FilterStatefulSetOwner(clusterNamespace, clusterName)),
decommissioning.WithRequeueTimeout(decommissionRequeueTimeout),
decommissioning.WithDelayedCacheInterval(decommissionVoteInterval),
decommissioning.WithDelayedCacheMaxCount(decommissionMaxVoteCount),
}...).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner")
return err
}
}

if err := decommissioning.NewStatefulSetDecommissioner(mgr, fetcher, []decommissioning.Option{
decommissioning.WithFilter(decommissioning.FilterStatefulSetOwner(clusterNamespace, clusterName)),
decommissioning.WithRequeueTimeout(decommissionRequeueTimeout),
decommissioning.WithDelayedCacheInterval(decommissionVoteInterval),
decommissioning.WithDelayedCacheMaxCount(decommissionMaxVoteCount),
}...).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "StatefulSetDecommissioner")
return err
if watchUsers {
watcher := configwatcher.NewConfigWatcher(mgr.GetLogger(), true,
configwatcher.WithRedpandaConfigPath(redpandaYAMLPath),
configwatcher.WithUsersDirectory(usersDirectoryPath),
)
if err := mgr.Add(watcher); err != nil {
setupLog.Error(err, "unable to run config watcher")
return err
}
}

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/fluxcd/pkg/runtime v0.43.3
github.com/fluxcd/source-controller/api v1.2.3
github.com/fluxcd/source-controller/shim v0.0.0-00010101000000-000000000000
github.com/fsnotify/fsnotify v1.7.0
github.com/go-logr/logr v1.4.2
github.com/jcmturner/gokrb5/v8 v8.4.4
github.com/json-iterator/go v1.1.12
Expand Down Expand Up @@ -191,7 +192,6 @@ require (
github.com/fluxcd/pkg/tar v0.4.0 // indirect
github.com/fluxcd/pkg/version v0.2.2 // indirect
github.com/fluxcd/source-controller v1.2.3 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/go-chi/chi v4.1.2+incompatible // indirect
github.com/go-chi/chi/v5 v5.0.12 // indirect
Expand Down
242 changes: 242 additions & 0 deletions operator/internal/configwatcher/configwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package configwatcher

import (
"bufio"
"context"
"fmt"
"os"
"path"
"slices"
"strings"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/console/backend/pkg/config"
rpkconfig "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/spf13/afero"
"k8s.io/client-go/rest"

internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
)

const (
defaultConfigPath = "/var/lib/redpanda.yaml"
defaultUsersDirectory = "/etc/secret/users"
)

type Option func(c *ConfigWatcher)

// ConfigWatcher replaces the old bash scripts we leveraged for waiting
// for a cluster to become stable and then creating superusers
type ConfigWatcher struct {
adminClient *rpadmin.AdminAPI
configPath string
usersDirectory string
watch bool
fs afero.Fs
log logr.Logger

// for testing mostly
initialized chan struct{}
}

func WithRedpandaConfigPath(path string) Option {
return func(c *ConfigWatcher) {
c.configPath = path
}
}

func WithUsersDirectory(path string) Option {
return func(c *ConfigWatcher) {
c.usersDirectory = path
}
}

func WithFs(fs afero.Fs) Option {
return func(c *ConfigWatcher) {
c.fs = fs
}
}

func WithInitializedSignal(ch chan struct{}) Option {
return func(c *ConfigWatcher) {
c.initialized = ch
}
}

func NewConfigWatcher(log logr.Logger, watch bool, options ...Option) *ConfigWatcher {
watcher := &ConfigWatcher{
log: log,
watch: watch,
configPath: defaultConfigPath,
usersDirectory: defaultUsersDirectory,
fs: afero.NewOsFs(),
initialized: make(chan struct{}),
}

for _, option := range options {
option(watcher)
}

return watcher
}

func (w *ConfigWatcher) Start(ctx context.Context) error {
params := rpkconfig.Params{ConfigFlag: w.configPath}

config, err := params.Load(w.fs)
if err != nil {
return fmt.Errorf("loading rpk config: %w", err)
}

factory := internalclient.NewFactory(&rest.Config{}, nil).WithFS(w.fs)
client, err := factory.RedpandaAdminClient(ctx, config.VirtualProfile())
if err != nil {
return fmt.Errorf("initializing Redpanda admin API client: %w", err)
}

w.adminClient = client

close(w.initialized)

w.syncInitial(ctx)

return w.watchFilesystem(ctx)
}

func (w *ConfigWatcher) syncInitial(ctx context.Context) {
files, err := os.ReadDir(w.usersDirectory)
if err != nil {
w.log.Error(err, "unable to get user directory files")
return
}

for _, file := range files {
if file.IsDir() {
continue
}
filePath := path.Join(w.usersDirectory, file.Name())
w.SyncUsers(ctx, filePath)
}
}

func (w *ConfigWatcher) watchFilesystem(ctx context.Context) error {
if !w.watch {
<-ctx.Done()
return nil
}

watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()

if err := watcher.Add(w.usersDirectory); err != nil {
return err
}

for {
select {
case err := <-watcher.Errors:
// here we don't return as that'd crash the broker, instead
// just log the error and move on after some sleep time.
w.log.Error(err, "watcher returned an error")
time.Sleep(5 * time.Second)
case event := <-watcher.Events:
file := path.Join(w.usersDirectory, event.Name)
w.SyncUsers(ctx, file)
case <-ctx.Done():
return nil
}
}
}

func (w *ConfigWatcher) SyncUsers(ctx context.Context, path string) {
file, err := w.fs.Open(path)
if err != nil {
w.log.Error(err, "unable to open superusers file", "file", path)
return
}
defer file.Close()

// sync our internal superuser first
internalSuperuser, password, mechanism := getInternalUser()
// the internal user should only ever be created once, so don't
// update its password ever.
w.syncUser(ctx, internalSuperuser, password, mechanism, false)

users := []string{internalSuperuser}

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
tokens := strings.SplitN(line, ":", 3)
if len(tokens) != 3 && len(tokens) != 2 {
w.log.Error(err, "malformed line: %s", line)
continue
}

mechanism := config.SASLMechanismScramSHA256

user, password := tokens[0], tokens[1]
if len(tokens) == 3 {
mechanism = tokens[2]
}

if !slices.Contains(users, user) {
users = append(users, user)
}

w.syncUser(ctx, user, password, mechanism, true)
}

w.setSuperusers(ctx, users)
}

func (w *ConfigWatcher) setSuperusers(ctx context.Context, users []string) {
if _, err := w.adminClient.PatchClusterConfig(ctx, map[string]any{
"superusers": users,
}, []string{}); err != nil {
w.log.Error(err, "could not set superusers")
}
}

func (w *ConfigWatcher) syncUser(ctx context.Context, user, password, mechanism string, recreate bool) {
if err := w.adminClient.CreateUser(ctx, user, password, mechanism); err != nil {
if strings.Contains(err.Error(), "already exists") {
if recreate {
// the original implementation did an update via Delete + Create, so do that here
if err := w.adminClient.DeleteUser(ctx, user); err != nil {
w.log.Error(err, "could not delete user for recreation", "user", user)
return
}
if err := w.adminClient.CreateUser(ctx, user, password, mechanism); err != nil {
w.log.Error(err, "could not recreate user", "user", user)
}
}
return
}
w.log.Error(err, "could not create user", "user", user)
}
}

func getInternalUser() (string, string, string) {
mechanism := os.Getenv("RPK_SASL_MECHANISM")
if mechanism == "" {
mechanism = config.SASLMechanismScramSHA256
}

return os.Getenv("RPK_USER"), os.Getenv("RPK_PASS"), mechanism
}
Loading

0 comments on commit 5ce84b8

Please sign in to comment.