diff --git a/cmd/csi-resizer/main.go b/cmd/csi-resizer/main.go index 71e59c165..830ca3182 100644 --- a/cmd/csi-resizer/main.go +++ b/cmd/csi-resizer/main.go @@ -62,33 +62,16 @@ import ( var ( master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.") - kubeConfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig") resyncPeriod = flag.Duration("resync-period", time.Minute*10, "Resync period for cache") workers = flag.Int("workers", 10, "Concurrency to process multiple resize requests") extraModifyMetadata = flag.Bool("extra-modify-metadata", false, "If set, add pv/pvc metadata to plugin modify requests as parameters.") - csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") - timeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.") - - showVersion = flag.Bool("version", false, "Show version") + timeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.") retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume resize. It exponentially increases with each failure, up to retry-interval-max.") retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume resize.") - enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.") - leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") - leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") - leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") - leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") - - metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") - - kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") - kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") - handleVolumeInUseError = flag.Bool("handle-volume-inuse-error", true, "Flag to turn on/off capability to handle volume in use error in resizer controller. Defaults to true if not set.") featureGates map[string]bool @@ -103,6 +86,7 @@ func main() { c := logsapi.NewLoggingConfiguration() logsapi.AddGoFlags(c, flag.CommandLine) logs.InitLogs() + standardflags.RegisterCommonFlags(flag.CommandLine) standardflags.AddAutomaxprocs(klog.Infof) flag.Parse() if err := logsapi.ValidateAndApply(c, fg); err != nil { @@ -110,19 +94,19 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if *showVersion { + if standardflags.Configuration.ShowVersion { fmt.Println(os.Args[0], version) os.Exit(0) } klog.InfoS("Version", "version", version) - if *metricsAddress != "" && *httpEndpoint != "" { + if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" { klog.ErrorS(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set.") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - addr := *metricsAddress + addr := standardflags.Configuration.MetricsAddress if addr == "" { - addr = *httpEndpoint + addr = standardflags.Configuration.HttpEndpoint } if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil { klog.ErrorS(err, "Failed to set feature gates") @@ -131,8 +115,8 @@ func main() { var config *rest.Config var err error - if *master != "" || *kubeConfig != "" { - config, err = clientcmd.BuildConfigFromFlags(*master, *kubeConfig) + if *master != "" || standardflags.Configuration.KubeConfig != "" { + config, err = clientcmd.BuildConfigFromFlags(*master, standardflags.Configuration.KubeConfig) } else { config, err = rest.InClusterConfig() } @@ -141,8 +125,8 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - config.QPS = float32(*kubeAPIQPS) - config.Burst = *kubeAPIBurst + config.QPS = float32(standardflags.Configuration.KubeAPIQPS) + config.Burst = standardflags.Configuration.KubeAPIBurst config.ContentType = runtime.ContentTypeProtobuf kubeClient, err := kubernetes.NewForConfig(config) @@ -158,7 +142,7 @@ func main() { metricsManager := metrics.NewCSIMetricsManager("" /* driverName */) ctx := context.Background() - csiClient, err := csi.New(ctx, *csiAddress, *timeout, metricsManager) + csiClient, err := csi.New(ctx, standardflags.Configuration.CSIAddress, *timeout, metricsManager) if err != nil { klog.ErrorS(err, "Failed to create CSI client") klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -174,7 +158,7 @@ func main() { translator := csitrans.New() if translator.IsMigratedCSIDriverByName(driverName) { metricsManager = metrics.NewCSIMetricsManagerWithOptions(driverName, metrics.WithMigration()) - migratedCsiClient, err := csi.New(ctx, *csiAddress, *timeout, metricsManager) + migratedCsiClient, err := csi.New(ctx, standardflags.Configuration.CSIAddress, *timeout, metricsManager) if err != nil { klog.ErrorS(err, "Failed to create MigratedCSI client") klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -211,13 +195,13 @@ func main() { // Start HTTP server for metrics + leader election healthz if addr != "" { - metricsManager.RegisterToServer(mux, *metricsPath) + metricsManager.RegisterToServer(mux, standardflags.Configuration.MetricsPath) metricsManager.SetDriverName(driverName) go func() { klog.InfoS("ServeMux listening", "address", addr) err := http.ListenAndServe(addr, mux) if err != nil { - klog.ErrorS(err, "Failed to start HTTP server", "address", addr, "metricsPath", *metricsPath) + klog.ErrorS(err, "Failed to start HTTP server", "address", addr, "metricsPath", standardflags.Configuration.MetricsPath) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }() @@ -279,37 +263,15 @@ func main() { } } - if !*enableLeaderElection { - run(ctx) - } else { - lockName := "external-resizer-" + util.SanitizeName(resizerName) - leKubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - klog.ErrorS(err, "Failed to create leKubeClient") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - le := leaderelection.NewLeaderElection(leKubeClient, lockName, run) - if *httpEndpoint != "" { - le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) - } - - if *leaderElectionNamespace != "" { - le.WithNamespace(*leaderElectionNamespace) - } - - le.WithLeaseDuration(*leaderElectionLeaseDuration) - le.WithRenewDeadline(*leaderElectionRenewDeadline) - le.WithRetryPeriod(*leaderElectionRetryPeriod) - if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) { - le.WithReleaseOnCancel(true) - le.WithContext(ctx) - } - - if err := le.Run(); err != nil { - klog.ErrorS(err, "Error initializing leader election") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - } + leaderelection.RunWithLeaderElection( + ctx, + config, + standardflags.Configuration, + run, + "external-resizer-"+util.SanitizeName(resizerName), + mux, + utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit), + ) } func getDriverName(client csi.Client, timeout time.Duration) (string, error) { diff --git a/go.mod b/go.mod index e547a2486..bebe244d2 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,13 @@ go 1.24.6 require ( github.com/container-storage-interface/spec v1.11.0 github.com/google/go-cmp v0.7.0 - github.com/kubernetes-csi/csi-lib-utils v0.22.0 + github.com/kubernetes-csi/csi-lib-utils v0.23.0 google.golang.org/grpc v1.72.1 - k8s.io/api v0.34.0 - k8s.io/apimachinery v0.34.0 + k8s.io/api v0.34.1 + k8s.io/apimachinery v0.34.1 k8s.io/apiserver v0.34.0 - k8s.io/client-go v0.34.0 - k8s.io/component-base v0.34.0 + k8s.io/client-go v0.34.1 + k8s.io/component-base v0.34.1 k8s.io/csi-translation-lib v0.34.0 k8s.io/klog/v2 v2.130.1 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 diff --git a/go.sum b/go.sum index 13a6e7662..9fdbeb93f 100644 --- a/go.sum +++ b/go.sum @@ -94,8 +94,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubernetes-csi/csi-lib-utils v0.22.0 h1:EUAs1+uHGps3OtVj4XVx16urhpI02eu+Z8Vps6plpHY= -github.com/kubernetes-csi/csi-lib-utils v0.22.0/go.mod h1:f+PalKyS4Ujsjb9+m6Rj0W6c28y3nfea3paQ/VqjI28= +github.com/kubernetes-csi/csi-lib-utils v0.23.0 h1:070SC4ubEvJpQak0ibxgv7l5dUoDVdqKyktam6zkm4s= +github.com/kubernetes-csi/csi-lib-utils v0.23.0/go.mod h1:H5+JRXAvb7lpC4nrddI7sfQfaXA1O8Tek3uNrTIx1/g= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go index 4045127c5..b67de54f0 100644 --- a/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go @@ -25,10 +25,12 @@ import ( "strings" "time" + "github.com/kubernetes-csi/csi-lib-utils/standardflags" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -77,6 +79,8 @@ type leaderElection struct { ctx context.Context clientset kubernetes.Interface + + labels map[string]string } // NewLeaderElection returns the default & preferred leader election type @@ -122,6 +126,11 @@ func (l *leaderElection) WithReleaseOnCancel(releaseOnCancel bool) { l.releaseOnCancel = releaseOnCancel } +// WithLabels adds labels to the lease object when this instance becomes leader +func (l *leaderElection) WithLabels(labels map[string]string) { + l.labels = labels +} + // WithContext Add context func (l *leaderElection) WithContext(ctx context.Context) { l.ctx = ctx @@ -176,8 +185,7 @@ func (l *leaderElection) Run() error { Identity: sanitizeName(l.identity), EventRecorder: eventRecorder, } - - lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig) + lock, err := resourcelock.NewWithLabels(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig, l.labels) if err != nil { return err } @@ -209,6 +217,58 @@ func (l *leaderElection) Run() error { return nil // should never reach here } +func RunWithLeaderElection( + ctx context.Context, + config *rest.Config, + opts standardflags.SidecarConfiguration, + run func(context.Context), + driverName string, + mux *http.ServeMux, + releaseOnExit bool) { + + logger := klog.FromContext(ctx) + + if !opts.LeaderElection { + run(ctx) + } else { + // Create a new clientset for leader election. When the attacher + // gets busy and its client gets throttled, the leader election + // can proceed without issues. + leClientset, err := kubernetes.NewForConfig(config) + if err != nil { + logger.Error(err, "Failed to create leaderelection client") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + + // Name of config map with leader election lock + le := NewLeaderElection(leClientset, driverName, run) + if opts.HttpEndpoint != "" { + le.PrepareHealthCheck(mux, DefaultHealthCheckTimeout) + } + + if opts.LeaderElectionNamespace != "" { + le.WithNamespace(opts.LeaderElectionNamespace) + } + + if opts.LeaderElectionLabels != nil { + le.WithLabels(opts.LeaderElectionLabels) + } + + le.WithLeaseDuration(opts.LeaderElectionLeaseDuration) + le.WithRenewDeadline(opts.LeaderElectionRenewDeadline) + le.WithRetryPeriod(opts.LeaderElectionRetryPeriod) + if releaseOnExit { + le.WithReleaseOnCancel(true) + le.WithContext(ctx) + } + + if err := le.Run(); err != nil { + logger.Error(err, "Failed to initialize leader election") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + } +} + func defaultLeaderElectionIdentity() (string, error) { return os.Hostname() } diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/standardflags/flags.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/standardflags/flags.go new file mode 100644 index 000000000..54a2d12e3 --- /dev/null +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/standardflags/flags.go @@ -0,0 +1,83 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package standardflags + +import ( + "flag" + "fmt" + "strings" + "time" +) + +type SidecarConfiguration struct { + ShowVersion bool + + KubeConfig string + CSIAddress string + + LeaderElection bool + LeaderElectionNamespace string + LeaderElectionLeaseDuration time.Duration + LeaderElectionRenewDeadline time.Duration + LeaderElectionRetryPeriod time.Duration + LeaderElectionLabels stringMap + + KubeAPIQPS float64 + KubeAPIBurst int + + HttpEndpoint string + MetricsAddress string + MetricsPath string +} + +var Configuration = SidecarConfiguration{} + +func RegisterCommonFlags(flags *flag.FlagSet) { + flags.BoolVar(&Configuration.ShowVersion, "version", false, "Show version.") + flags.StringVar(&Configuration.KubeConfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") + flags.StringVar(&Configuration.CSIAddress, "csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.") + flags.BoolVar(&Configuration.LeaderElection, "leader-election", false, "Enable leader election.") + flags.StringVar(&Configuration.LeaderElectionNamespace, "leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") + flags.DurationVar(&Configuration.LeaderElectionLeaseDuration, "leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") + flags.DurationVar(&Configuration.LeaderElectionRenewDeadline, "leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") + flags.DurationVar(&Configuration.LeaderElectionRetryPeriod, "leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") + flags.Var(&Configuration.LeaderElectionLabels, "leader-election-labels", "List of labels to add to lease when given replica becomes leader. Formatted as a comma seperated list of key:value labels. Example: 'my-label:my-value,my-second-label:my-second-value'") + flags.Float64Var(&Configuration.KubeAPIQPS, "kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") + flags.IntVar(&Configuration.KubeAPIBurst, "kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") + flags.StringVar(&Configuration.HttpEndpoint, "http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") + flags.StringVar(&Configuration.MetricsAddress, "metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") + flag.StringVar(&Configuration.MetricsPath, "metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") +} + +type stringMap map[string]string + +func (sm *stringMap) String() string { + return fmt.Sprintf("%s", *sm) +} + +func (sm *stringMap) Set(value string) error { + outMap := *sm + items := strings.Split(value, ",") + for _, i := range items { + label := strings.Split(i, ":") + if len(label) != 2 { + return fmt.Errorf("malformed item in list of labels: %s", i) + } + outMap[label[0]] = label[1] + } + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f398a60bb..5ac4031cc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -136,8 +136,8 @@ github.com/josharian/intern # github.com/json-iterator/go v1.1.12 ## explicit; go 1.12 github.com/json-iterator/go -# github.com/kubernetes-csi/csi-lib-utils v0.22.0 -## explicit; go 1.24.0 +# github.com/kubernetes-csi/csi-lib-utils v0.23.0 +## explicit; go 1.24.6 github.com/kubernetes-csi/csi-lib-utils/accessmodes github.com/kubernetes-csi/csi-lib-utils/connection github.com/kubernetes-csi/csi-lib-utils/leaderelection @@ -505,7 +505,7 @@ gopkg.in/inf.v0 # gopkg.in/yaml.v3 v3.0.1 ## explicit gopkg.in/yaml.v3 -# k8s.io/api v0.34.0 => k8s.io/api v0.34.0 +# k8s.io/api v0.34.1 => k8s.io/api v0.34.0 ## explicit; go 1.24.0 k8s.io/api/admission/v1 k8s.io/api/admission/v1beta1 @@ -567,7 +567,7 @@ k8s.io/api/storage/v1 k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1beta1 k8s.io/api/storagemigration/v1alpha1 -# k8s.io/apimachinery v0.34.0 => k8s.io/apimachinery v0.34.0 +# k8s.io/apimachinery v0.34.1 => k8s.io/apimachinery v0.34.0 ## explicit; go 1.24.0 k8s.io/apimachinery/pkg/api/equality k8s.io/apimachinery/pkg/api/errors @@ -768,7 +768,7 @@ k8s.io/apiserver/pkg/util/x509metrics k8s.io/apiserver/pkg/validation k8s.io/apiserver/pkg/warning k8s.io/apiserver/plugin/pkg/authenticator/token/webhook -# k8s.io/client-go v0.34.0 => k8s.io/client-go v0.34.0 +# k8s.io/client-go v0.34.1 => k8s.io/client-go v0.34.0 ## explicit; go 1.24.0 k8s.io/client-go/applyconfigurations k8s.io/client-go/applyconfigurations/admissionregistration/v1 @@ -1104,7 +1104,7 @@ k8s.io/client-go/util/flowcontrol k8s.io/client-go/util/homedir k8s.io/client-go/util/keyutil k8s.io/client-go/util/workqueue -# k8s.io/component-base v0.34.0 => k8s.io/component-base v0.34.0 +# k8s.io/component-base v0.34.1 => k8s.io/component-base v0.34.0 ## explicit; go 1.24.0 k8s.io/component-base/cli/flag k8s.io/component-base/compatibility