From a60936069a7261e464d413f36a6ef5f9b55633b7 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Thu, 7 Nov 2024 16:23:16 +0100 Subject: [PATCH] [nodeutilization]: actual usage client through kubernetes metrics --- README.md | 15 +- cmd/descheduler/app/options/options.go | 3 + kubernetes/base/rbac.yaml | 3 + pkg/api/types.go | 10 + pkg/api/v1alpha2/types.go | 10 + pkg/api/v1alpha2/zz_generated.conversion.go | 36 ++ pkg/api/v1alpha2/zz_generated.deepcopy.go | 17 + pkg/api/zz_generated.deepcopy.go | 17 + pkg/descheduler/client/client.go | 27 +- pkg/descheduler/descheduler.go | 68 ++- pkg/descheduler/descheduler_test.go | 134 ++++- .../metricscollector/metricscollector.go | 142 ++++++ .../metricscollector/metricscollector_test.go | 75 +++ pkg/framework/fake/fake.go | 6 + .../nodeutilization/lownodeutilization.go | 13 +- .../lownodeutilization_test.go | 471 ++++++++++++++---- .../plugins/nodeutilization/types.go | 14 +- .../plugins/nodeutilization/usageclients.go | 90 ++++ .../nodeutilization/usageclients_test.go | 137 +++++ .../nodeutilization/zz_generated.deepcopy.go | 2 + pkg/framework/profile/profile.go | 30 +- pkg/framework/types/types.go | 2 + test/e2e/e2e_lownodeutilization_test.go | 304 +++++++++++ test/run-e2e-tests.sh | 5 + test/test_utils.go | 39 +- 25 files changed, 1541 insertions(+), 129 deletions(-) create mode 100644 pkg/descheduler/metricscollector/metricscollector.go create mode 100644 pkg/descheduler/metricscollector/metricscollector_test.go create mode 100644 pkg/framework/plugins/nodeutilization/usageclients_test.go create mode 100644 test/e2e/e2e_lownodeutilization_test.go diff --git a/README.md b/README.md index 1ffce0ad56..85cc7509e7 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,8 @@ These are top level keys in the Descheduler Policy that you can use to configure | `maxNoOfPodsToEvictPerNode` |`int`| `nil` | maximum number of pods evicted from each node (summed through all strategies) | | `maxNoOfPodsToEvictPerNamespace` |`int`| `nil` | maximum number of pods evicted from each namespace (summed through all strategies) | | `maxNoOfPodsToEvictTotal` |`int`| `nil` | maximum number of pods evicted per rescheduling cycle (summed through all strategies) | +| `metricsCollector` |`object`| `nil` | configures collection of metrics for actual resource utilization | +| `metricsCollector.enabled` |`bool`| `false` | enables kubernetes [metrics server](https://kubernetes-sigs.github.io/metrics-server/) collection | ### Evictor Plugin configuration (Default Evictor) @@ -158,6 +160,8 @@ nodeSelector: "node=node1" # you don't need to set this, if not set all will be maxNoOfPodsToEvictPerNode: 5000 # you don't need to set this, unlimited if not set maxNoOfPodsToEvictPerNamespace: 5000 # you don't need to set this, unlimited if not set maxNoOfPodsToEvictTotal: 5000 # you don't need to set this, unlimited if not set +metricsCollector: + enabled: true # you don't need to set this, metrics are not collected if not set profiles: - name: ProfileName pluginConfig: @@ -277,11 +281,13 @@ If that parameter is set to `true`, the thresholds are considered as percentage `thresholds` will be deducted from the mean among all nodes and `targetThresholds` will be added to the mean. A resource consumption above (resp. below) this window is considered as overutilization (resp. underutilization). -**NOTE:** Node resource consumption is determined by the requests and limits of pods, not actual usage. +**NOTE:** By default node resource consumption is determined by the requests and limits of pods, not actual usage. This approach is chosen in order to maintain consistency with the kube-scheduler, which follows the same design for scheduling pods onto nodes. This means that resource usage as reported by Kubelet (or commands like `kubectl top`) may differ from the calculated consumption, due to these components reporting -actual usage metrics. Implementing metrics-based descheduling is currently TODO for the project. +actual usage metrics. Metrics-based descheduling can be enabled by setting `metricsUtilization.metricsServer` field. +In order to have the plugin consume the metrics the metric collector needs to be configured as well. +See `metricsCollector` field at [Top Level configuration](#top-level-configuration) for available options. **Parameters:** @@ -292,6 +298,9 @@ actual usage metrics. Implementing metrics-based descheduling is currently TODO |`targetThresholds`|map(string:int)| |`numberOfNodes`|int| |`evictableNamespaces`|(see [namespace filtering](#namespace-filtering))| +|`metricsUtilization`|object| +|`metricsUtilization.metricsServer`|bool| + **Example:** @@ -311,6 +320,8 @@ profiles: "cpu" : 50 "memory": 50 "pods": 50 + metricsUtilization: + metricsServer: true plugins: balance: enabled: diff --git a/cmd/descheduler/app/options/options.go b/cmd/descheduler/app/options/options.go index 254d2720d0..2b0d76846f 100644 --- a/cmd/descheduler/app/options/options.go +++ b/cmd/descheduler/app/options/options.go @@ -21,6 +21,7 @@ import ( "time" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiserver "k8s.io/apiserver/pkg/server" apiserveroptions "k8s.io/apiserver/pkg/server/options" @@ -29,6 +30,7 @@ import ( componentbaseconfig "k8s.io/component-base/config" componentbaseoptions "k8s.io/component-base/config/options" "k8s.io/klog/v2" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" "sigs.k8s.io/descheduler/pkg/apis/componentconfig" "sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1" @@ -46,6 +48,7 @@ type DeschedulerServer struct { Client clientset.Interface EventClient clientset.Interface + MetricsClient metricsclient.Interface SecureServing *apiserveroptions.SecureServingOptionsWithLoopback SecureServingInfo *apiserver.SecureServingInfo DisableMetrics bool diff --git a/kubernetes/base/rbac.yaml b/kubernetes/base/rbac.yaml index f38ed3eb65..ab628cb514 100644 --- a/kubernetes/base/rbac.yaml +++ b/kubernetes/base/rbac.yaml @@ -32,6 +32,9 @@ rules: resources: ["leases"] resourceNames: ["descheduler"] verbs: ["get", "patch", "delete"] +- apiGroups: ["metrics.k8s.io"] + resources: ["nodes", "pods"] + verbs: ["get", "list"] --- apiVersion: v1 kind: ServiceAccount diff --git a/pkg/api/types.go b/pkg/api/types.go index 6917e6cdb8..f282d0fe3d 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -41,6 +41,9 @@ type DeschedulerPolicy struct { // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. MaxNoOfPodsToEvictTotal *uint + + // MetricsCollector configures collection of metrics about actual resource utilization + MetricsCollector MetricsCollector } // Namespaces carries a list of included/excluded namespaces @@ -84,3 +87,10 @@ type PluginSet struct { Enabled []string Disabled []string } + +// MetricsCollector configures collection of metrics about actual resource utilization +type MetricsCollector struct { + // Enabled metrics collection from kubernetes metrics. + // Later, the collection can be extended to other providers. + Enabled bool +} diff --git a/pkg/api/v1alpha2/types.go b/pkg/api/v1alpha2/types.go index d663efb864..bd2dc74009 100644 --- a/pkg/api/v1alpha2/types.go +++ b/pkg/api/v1alpha2/types.go @@ -40,6 +40,9 @@ type DeschedulerPolicy struct { // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. MaxNoOfPodsToEvictTotal *uint `json:"maxNoOfPodsToEvictTotal,omitempty"` + + // MetricsCollector configures collection of metrics for actual resource utilization + MetricsCollector MetricsCollector `json:"metricsCollector,omitempty"` } type DeschedulerProfile struct { @@ -66,3 +69,10 @@ type PluginSet struct { Enabled []string `json:"enabled"` Disabled []string `json:"disabled"` } + +// MetricsCollector configures collection of metrics about actual resource utilization +type MetricsCollector struct { + // Enabled metrics collection from kubernetes metrics. + // Later, the collection can be extended to other providers. + Enabled bool `json:"enabled"` +} diff --git a/pkg/api/v1alpha2/zz_generated.conversion.go b/pkg/api/v1alpha2/zz_generated.conversion.go index a13151ed29..1b33be25cf 100644 --- a/pkg/api/v1alpha2/zz_generated.conversion.go +++ b/pkg/api/v1alpha2/zz_generated.conversion.go @@ -46,6 +46,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*MetricsCollector)(nil), (*api.MetricsCollector)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector(a.(*MetricsCollector), b.(*api.MetricsCollector), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.MetricsCollector)(nil), (*MetricsCollector)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(a.(*api.MetricsCollector), b.(*MetricsCollector), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*api.PluginConfig)(nil), (*PluginConfig)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_api_PluginConfig_To_v1alpha2_PluginConfig(a.(*api.PluginConfig), b.(*PluginConfig), scope) }); err != nil { @@ -105,6 +115,9 @@ func autoConvert_v1alpha2_DeschedulerPolicy_To_api_DeschedulerPolicy(in *Desched out.MaxNoOfPodsToEvictPerNode = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNode)) out.MaxNoOfPodsToEvictPerNamespace = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNamespace)) out.MaxNoOfPodsToEvictTotal = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictTotal)) + if err := Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector(&in.MetricsCollector, &out.MetricsCollector, s); err != nil { + return err + } return nil } @@ -124,6 +137,9 @@ func autoConvert_api_DeschedulerPolicy_To_v1alpha2_DeschedulerPolicy(in *api.Des out.MaxNoOfPodsToEvictPerNode = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNode)) out.MaxNoOfPodsToEvictPerNamespace = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNamespace)) out.MaxNoOfPodsToEvictTotal = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictTotal)) + if err := Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(&in.MetricsCollector, &out.MetricsCollector, s); err != nil { + return err + } return nil } @@ -175,6 +191,26 @@ func Convert_api_DeschedulerProfile_To_v1alpha2_DeschedulerProfile(in *api.Desch return autoConvert_api_DeschedulerProfile_To_v1alpha2_DeschedulerProfile(in, out, s) } +func autoConvert_v1alpha2_MetricsCollector_To_api_MetricsCollector(in *MetricsCollector, out *api.MetricsCollector, s conversion.Scope) error { + out.Enabled = in.Enabled + return nil +} + +// Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector is an autogenerated conversion function. +func Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector(in *MetricsCollector, out *api.MetricsCollector, s conversion.Scope) error { + return autoConvert_v1alpha2_MetricsCollector_To_api_MetricsCollector(in, out, s) +} + +func autoConvert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in *api.MetricsCollector, out *MetricsCollector, s conversion.Scope) error { + out.Enabled = in.Enabled + return nil +} + +// Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector is an autogenerated conversion function. +func Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in *api.MetricsCollector, out *MetricsCollector, s conversion.Scope) error { + return autoConvert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in, out, s) +} + func autoConvert_v1alpha2_PluginConfig_To_api_PluginConfig(in *PluginConfig, out *api.PluginConfig, s conversion.Scope) error { out.Name = in.Name if err := runtime.Convert_runtime_RawExtension_To_runtime_Object(&in.Args, &out.Args, s); err != nil { diff --git a/pkg/api/v1alpha2/zz_generated.deepcopy.go b/pkg/api/v1alpha2/zz_generated.deepcopy.go index a0129fd8ec..bfaf198785 100644 --- a/pkg/api/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha2/zz_generated.deepcopy.go @@ -56,6 +56,7 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = new(uint) **out = **in } + out.MetricsCollector = in.MetricsCollector return } @@ -101,6 +102,22 @@ func (in *DeschedulerProfile) DeepCopy() *DeschedulerProfile { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetricsCollector) DeepCopyInto(out *MetricsCollector) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsCollector. +func (in *MetricsCollector) DeepCopy() *MetricsCollector { + if in == nil { + return nil + } + out := new(MetricsCollector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PluginConfig) DeepCopyInto(out *PluginConfig) { *out = *in diff --git a/pkg/api/zz_generated.deepcopy.go b/pkg/api/zz_generated.deepcopy.go index 2f84cf1ed5..2ac45bb6a8 100644 --- a/pkg/api/zz_generated.deepcopy.go +++ b/pkg/api/zz_generated.deepcopy.go @@ -56,6 +56,7 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = new(uint) **out = **in } + out.MetricsCollector = in.MetricsCollector return } @@ -101,6 +102,22 @@ func (in *DeschedulerProfile) DeepCopy() *DeschedulerProfile { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetricsCollector) DeepCopyInto(out *MetricsCollector) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsCollector. +func (in *MetricsCollector) DeepCopy() *MetricsCollector { + if in == nil { + return nil + } + out := new(MetricsCollector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Namespaces) DeepCopyInto(out *Namespaces) { *out = *in diff --git a/pkg/descheduler/client/client.go b/pkg/descheduler/client/client.go index 964ed3b9b3..c5ff5e018a 100644 --- a/pkg/descheduler/client/client.go +++ b/pkg/descheduler/client/client.go @@ -19,16 +19,16 @@ package client import ( "fmt" - clientset "k8s.io/client-go/kubernetes" - componentbaseconfig "k8s.io/component-base/config" - // Ensure to load all auth plugins. + clientset "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + componentbaseconfig "k8s.io/component-base/config" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" ) -func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) { +func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) { var cfg *rest.Config if len(clientConnection.Kubeconfig) != 0 { master, err := GetMasterFromKubeconfig(clientConnection.Kubeconfig) @@ -56,9 +56,28 @@ func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfigura cfg = rest.AddUserAgent(cfg, userAgt) } + return cfg, nil +} + +func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) { + cfg, err := createConfig(clientConnection, userAgt) + if err != nil { + return nil, fmt.Errorf("unable to create config: %v", err) + } + return clientset.NewForConfig(cfg) } +func CreateMetricsClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (metricsclient.Interface, error) { + cfg, err := createConfig(clientConnection, userAgt) + if err != nil { + return nil, fmt.Errorf("unable to create config: %v", err) + } + + // Create the metrics clientset to access the metrics.k8s.io API + return metricsclient.NewForConfig(cfg) +} + func GetMasterFromKubeconfig(filename string) (string, error) { config, err := clientcmd.LoadFromFile(filename) if err != nil { diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index c039bdac17..db6088101d 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -23,44 +23,43 @@ import ( "strconv" "time" - policyv1 "k8s.io/api/policy/v1" - schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/client-go/discovery" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/events" - componentbaseconfig "k8s.io/component-base/config" - "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" + policyv1 "k8s.io/api/policy/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" - - "sigs.k8s.io/descheduler/pkg/descheduler/client" - eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" - nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" - "sigs.k8s.io/descheduler/pkg/tracing" - "sigs.k8s.io/descheduler/pkg/utils" - "sigs.k8s.io/descheduler/pkg/version" + "k8s.io/client-go/tools/events" + componentbaseconfig "k8s.io/component-base/config" + "k8s.io/klog/v2" "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/metrics" "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/pkg/descheduler/client" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + "sigs.k8s.io/descheduler/pkg/tracing" + "sigs.k8s.io/descheduler/pkg/utils" + "sigs.k8s.io/descheduler/pkg/version" ) type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status @@ -79,6 +78,7 @@ type descheduler struct { eventRecorder events.EventRecorder podEvictor *evictions.PodEvictor podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) + metricsCollector *metricscollector.MetricsCollector } type informerResources struct { @@ -156,6 +156,15 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche WithMetricsEnabled(!rs.DisableMetrics), ) + var metricsCollector *metricscollector.MetricsCollector + if deschedulerPolicy.MetricsCollector.Enabled { + nodeSelector := "" + if deschedulerPolicy.NodeSelector != nil { + nodeSelector = *deschedulerPolicy.NodeSelector + } + metricsCollector = metricscollector.NewMetricsCollector(rs.Client, rs.MetricsClient, nodeSelector) + } + return &descheduler{ rs: rs, ir: ir, @@ -165,6 +174,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche eventRecorder: eventRecorder, podEvictor: podEvictor, podEvictionReactionFnc: podEvictionReactionFnc, + metricsCollector: metricsCollector, }, nil } @@ -244,6 +254,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), + frameworkprofile.WithMetricsCollector(d.metricsCollector), ) if err != nil { klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) @@ -308,6 +319,14 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error { return err } + if deschedulerPolicy.MetricsCollector.Enabled { + metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler") + if err != nil { + return err + } + rs.MetricsClient = metricsClient + } + runFn := func() error { return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion) } @@ -415,6 +434,21 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) + if deschedulerPolicy.MetricsCollector.Enabled { + go func() { + klog.V(2).Infof("Starting metrics collector") + descheduler.metricsCollector.Run(ctx) + klog.V(2).Infof("Stopped metrics collector") + }() + klog.V(2).Infof("Waiting for metrics collector to sync") + err := wait.PollWithContext(ctx, time.Second, time.Minute, func(context.Context) (done bool, err error) { + return descheduler.metricsCollector.HasSynced(), nil + }) + if err != nil { + return fmt.Errorf("unable to wait for metrics collector to sync: %v", err) + } + } + wait.NonSlidingUntil(func() { // A next context is created here intentionally to avoid nesting the spans via context. sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil") diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 76bbf2e96d..312c6b271c 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -10,17 +10,23 @@ import ( policy "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" apiversion "k8s.io/apimachinery/pkg/version" fakediscovery "k8s.io/client-go/discovery/fake" "k8s.io/client-go/informers" fakeclientset "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/klog/v2" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" utilptr "k8s.io/utils/ptr" + "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + "sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization" "sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates" "sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints" "sigs.k8s.io/descheduler/pkg/utils" @@ -28,11 +34,17 @@ import ( "sigs.k8s.io/descheduler/test" ) +var ( + nodesgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"} + podsgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"} +) + func initPluginRegistry() { pluginregistry.PluginRegistry = pluginregistry.NewRegistry() pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry) pluginregistry.Register(defaultevictor.PluginName, defaultevictor.New, &defaultevictor.DefaultEvictor{}, &defaultevictor.DefaultEvictorArgs{}, defaultevictor.ValidateDefaultEvictorArgs, defaultevictor.SetDefaults_DefaultEvictorArgs, pluginregistry.PluginRegistry) pluginregistry.Register(removepodsviolatingnodetaints.PluginName, removepodsviolatingnodetaints.New, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaints{}, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaintsArgs{}, removepodsviolatingnodetaints.ValidateRemovePodsViolatingNodeTaintsArgs, removepodsviolatingnodetaints.SetDefaults_RemovePodsViolatingNodeTaintsArgs, pluginregistry.PluginRegistry) + pluginregistry.Register(nodeutilization.LowNodeUtilizationPluginName, nodeutilization.NewLowNodeUtilization, &nodeutilization.LowNodeUtilization{}, &nodeutilization.LowNodeUtilizationArgs{}, nodeutilization.ValidateLowNodeUtilizationArgs, nodeutilization.SetDefaults_LowNodeUtilizationArgs, pluginregistry.PluginRegistry) } func removePodsViolatingNodeTaintsPolicy() *api.DeschedulerPolicy { @@ -99,7 +111,45 @@ func removeDuplicatesPolicy() *api.DeschedulerPolicy { } } -func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { +func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThresholds, metricsEnabled bool) *api.DeschedulerPolicy { + return &api.DeschedulerPolicy{ + Profiles: []api.DeschedulerProfile{ + { + Name: "Profile", + PluginConfigs: []api.PluginConfig{ + { + Name: nodeutilization.LowNodeUtilizationPluginName, + Args: &nodeutilization.LowNodeUtilizationArgs{ + Thresholds: thresholds, + TargetThresholds: targetThresholds, + MetricsUtilization: nodeutilization.MetricsUtilization{ + MetricsServer: metricsEnabled, + }, + }, + }, + { + Name: defaultevictor.PluginName, + Args: &defaultevictor.DefaultEvictorArgs{}, + }, + }, + Plugins: api.Plugins{ + Filter: api.PluginSet{ + Enabled: []string{ + defaultevictor.PluginName, + }, + }, + Balance: api.PluginSet{ + Enabled: []string{ + nodeutilization.LowNodeUtilizationPluginName, + }, + }, + }, + }, + }, + } +} + +func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { client := fakeclientset.NewSimpleClientset(objects...) eventClient := fakeclientset.NewSimpleClientset(objects...) @@ -109,6 +159,7 @@ func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolic } rs.Client = client rs.EventClient = eventClient + rs.MetricsClient = metricsClient sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) @@ -402,7 +453,7 @@ func TestPodEvictorReset(t *testing.T) { internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy() ctxCancel, cancel := context.WithCancel(ctx) - rs, descheduler, client := initDescheduler(t, ctxCancel, internalDeschedulerPolicy, node1, node2, p1, p2) + rs, descheduler, client := initDescheduler(t, ctxCancel, internalDeschedulerPolicy, nil, node1, node2, p1, p2) defer cancel() var evictedPods []string @@ -503,7 +554,7 @@ func TestDeschedulingLimits(t *testing.T) { node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) nodes := []*v1.Node{node1, node2} ctxCancel, cancel := context.WithCancel(ctx) - _, descheduler, client := initDescheduler(t, ctxCancel, tc.policy, node1, node2) + _, descheduler, client := initDescheduler(t, ctxCancel, tc.policy, nil, node1, node2) defer cancel() pods := []*v1.Pod{ @@ -539,3 +590,80 @@ func TestDeschedulingLimits(t *testing.T) { }) } } + +func TestLoadAwareDescheduling(t *testing.T) { + initPluginRegistry() + + ownerRef1 := test.GetReplicaSetOwnerRefList() + updatePod := func(pod *v1.Pod) { + pod.ObjectMeta.OwnerReferences = ownerRef1 + } + + ctx := context.Background() + node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule) + node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + nodes := []*v1.Node{node1, node2} + + p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod) + p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod) + p3 := test.BuildTestPod("p3", 300, 0, node1.Name, updatePod) + p4 := test.BuildTestPod("p4", 300, 0, node1.Name, updatePod) + p5 := test.BuildTestPod("p5", 300, 0, node1.Name, updatePod) + + nodemetricses := []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics("n1", 2400, 3000), + test.BuildNodeMetrics("n2", 400, 0), + } + + podmetricses := []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 400, 0), + test.BuildPodMetrics("p2", 400, 0), + test.BuildPodMetrics("p3", 400, 0), + test.BuildPodMetrics("p4", 400, 0), + test.BuildPodMetrics("p5", 400, 0), + } + + metricsClientset := fakemetricsclient.NewSimpleClientset() + for _, nodemetrics := range nodemetricses { + metricsClientset.Tracker().Create(nodesgvr, nodemetrics, "") + } + for _, podmetrics := range podmetricses { + metricsClientset.Tracker().Create(podsgvr, podmetrics, podmetrics.Namespace) + } + + policy := lowNodeUtilizationPolicy( + api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + true, // enabled metrics utilization + ) + policy.MetricsCollector.Enabled = true + + ctxCancel, cancel := context.WithCancel(ctx) + _, descheduler, _ := initDescheduler( + t, + ctxCancel, + policy, + metricsClientset, + node1, node2, p1, p2, p3, p4, p5) + defer cancel() + + // This needs to be run since the metrics collector is started + // after newDescheduler in RunDeschedulerStrategies. + descheduler.metricsCollector.Collect(ctx) + + err := descheduler.runDeschedulerLoop(ctx, nodes) + if err != nil { + t.Fatalf("Unable to run a descheduling loop: %v", err) + } + totalEs := descheduler.podEvictor.TotalEvicted() + if totalEs != 2 { + t.Fatalf("Expected %v evictions in total, got %v instead", 2, totalEs) + } + t.Logf("Total evictions: %v", totalEs) +} diff --git a/pkg/descheduler/metricscollector/metricscollector.go b/pkg/descheduler/metricscollector/metricscollector.go new file mode 100644 index 0000000000..3fb6b3a45e --- /dev/null +++ b/pkg/descheduler/metricscollector/metricscollector.go @@ -0,0 +1,142 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricscollector + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" + utilptr "k8s.io/utils/ptr" +) + +const ( + beta float64 = 0.9 +) + +type MetricsCollector struct { + clientset kubernetes.Interface + metricsClientset metricsclient.Interface + nodeSelector string + + nodes map[string]map[v1.ResourceName]*resource.Quantity + + mu sync.RWMutex + // hasSynced signals at least one sync succeeded + hasSynced bool +} + +func NewMetricsCollector(clientset kubernetes.Interface, metricsClientset metricsclient.Interface, nodeSelector string) *MetricsCollector { + return &MetricsCollector{ + clientset: clientset, + metricsClientset: metricsClientset, + nodeSelector: nodeSelector, + nodes: make(map[string]map[v1.ResourceName]*resource.Quantity), + } +} + +func (mc *MetricsCollector) Run(ctx context.Context) { + wait.NonSlidingUntil(func() { + mc.Collect(ctx) + }, 5*time.Second, ctx.Done()) +} + +func weightedAverage(prevValue, value int64) int64 { + return int64(math.Floor(beta*float64(prevValue) + (1-beta)*float64(value))) +} + +func (mc *MetricsCollector) AllNodesUsage() (map[string]map[v1.ResourceName]*resource.Quantity, error) { + mc.mu.RLock() + defer mc.mu.RUnlock() + + allNodesUsage := make(map[string]map[v1.ResourceName]*resource.Quantity) + for nodeName := range mc.nodes { + allNodesUsage[nodeName] = map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[nodeName][v1.ResourceCPU].DeepCopy()), + v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[nodeName][v1.ResourceMemory].DeepCopy()), + } + } + + return allNodesUsage, nil +} + +func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resource.Quantity, error) { + mc.mu.RLock() + defer mc.mu.RUnlock() + + if _, exists := mc.nodes[node.Name]; !exists { + klog.V(4).Infof("unable to find node %q in the collected metrics", node.Name) + return nil, fmt.Errorf("unable to find node %q in the collected metrics", node.Name) + } + return map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceCPU].DeepCopy()), + v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceMemory].DeepCopy()), + }, nil +} + +func (mc *MetricsCollector) HasSynced() bool { + return mc.hasSynced +} + +func (mc *MetricsCollector) MetricsClient() metricsclient.Interface { + return mc.metricsClientset +} + +func (mc *MetricsCollector) Collect(ctx context.Context) error { + mc.mu.Lock() + defer mc.mu.Unlock() + nodes, err := mc.clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: mc.nodeSelector}) + if err != nil { + return fmt.Errorf("unable to list nodes: %v", err) + } + + for _, node := range nodes.Items { + metrics, err := mc.metricsClientset.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Error fetching metrics for node %s: %v\n", node.Name, err) + // No entry -> duplicate the previous value -> do nothing as beta*PV + (1-beta)*PV = PV + continue + } + + if _, exists := mc.nodes[node.Name]; !exists { + mc.nodes[node.Name] = map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: utilptr.To[resource.Quantity](metrics.Usage.Cpu().DeepCopy()), + v1.ResourceMemory: utilptr.To[resource.Quantity](metrics.Usage.Memory().DeepCopy()), + } + } else { + // get MilliValue to reduce loss of precision + mc.nodes[node.Name][v1.ResourceCPU].SetMilli( + weightedAverage(mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), metrics.Usage.Cpu().MilliValue()), + ) + mc.nodes[node.Name][v1.ResourceMemory].SetMilli( + weightedAverage(mc.nodes[node.Name][v1.ResourceMemory].MilliValue(), metrics.Usage.Memory().MilliValue()), + ) + } + } + + mc.hasSynced = true + return nil +} diff --git a/pkg/descheduler/metricscollector/metricscollector_test.go b/pkg/descheduler/metricscollector/metricscollector_test.go new file mode 100644 index 0000000000..345b066ddd --- /dev/null +++ b/pkg/descheduler/metricscollector/metricscollector_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricscollector + +import ( + "context" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime/schema" + fakeclientset "k8s.io/client-go/kubernetes/fake" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" + + "sigs.k8s.io/descheduler/test" +) + +func checkCpuNodeUsage(t *testing.T, usage map[v1.ResourceName]*resource.Quantity, millicpu int64) { + t.Logf("current node cpu usage: %v\n", usage[v1.ResourceCPU].MilliValue()) + if usage[v1.ResourceCPU].MilliValue() != millicpu { + t.Fatalf("cpu node usage expected to be %v, got %v instead", millicpu, usage[v1.ResourceCPU].MilliValue()) + } +} + +func TestMetricsCollector(t *testing.T) { + gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"} + + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil) + + n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816) + n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816) + n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816) + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3) + metricsClientset := fakemetricsclient.NewSimpleClientset() + metricsClientset.Tracker().Create(gvr, n1metrics, "") + metricsClientset.Tracker().Create(gvr, n2metrics, "") + metricsClientset.Tracker().Create(gvr, n3metrics, "") + + t.Logf("Set initial node cpu usage to 1400") + collector := NewMetricsCollector(clientset, metricsClientset, "") + collector.Collect(context.TODO()) + nodesUsage, _ := collector.NodeUsage(n2) + checkCpuNodeUsage(t, nodesUsage, 1400) + + t.Logf("Set current node cpu usage to 500") + n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(500, resource.DecimalSI) + metricsClientset.Tracker().Update(gvr, n2metrics, "") + collector.Collect(context.TODO()) + nodesUsage, _ = collector.NodeUsage(n2) + checkCpuNodeUsage(t, nodesUsage, 1310) + + t.Logf("Set current node cpu usage to 500") + n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI) + metricsClientset.Tracker().Update(gvr, n2metrics, "") + collector.Collect(context.TODO()) + nodesUsage, _ = collector.NodeUsage(n2) + checkCpuNodeUsage(t, nodesUsage, 1269) +} diff --git a/pkg/framework/fake/fake.go b/pkg/framework/fake/fake.go index a16132d407..d274228935 100644 --- a/pkg/framework/fake/fake.go +++ b/pkg/framework/fake/fake.go @@ -8,6 +8,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" ) @@ -18,6 +19,7 @@ type HandleImpl struct { SharedInformerFactoryImpl informers.SharedInformerFactory EvictorFilterImpl frameworktypes.EvictorPlugin PodEvictorImpl *evictions.PodEvictor + MetricsCollectorImpl *metricscollector.MetricsCollector } var _ frameworktypes.Handle = &HandleImpl{} @@ -26,6 +28,10 @@ func (hi *HandleImpl) ClientSet() clientset.Interface { return hi.ClientsetImpl } +func (hi *HandleImpl) MetricsCollector() *metricscollector.MetricsCollector { + return hi.MetricsCollectorImpl +} + func (hi *HandleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc { return hi.GetPodsAssignedToNodeFuncImpl } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index df3b84743d..8abb48465f 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" + "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" @@ -88,6 +89,16 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds) + var usageClient usageClient + if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer { + if handle.MetricsCollector() == nil { + return nil, fmt.Errorf("metrics client not initialized") + } + usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) + } else { + usageClient = newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()) + } + return &LowNodeUtilization{ handle: handle, args: lowNodeUtilizationArgsArgs, @@ -95,7 +106,7 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f overutilizationCriteria: overutilizationCriteria, resourceNames: resourceNames, podFilter: podFilter, - usageClient: newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()), + usageClient: usageClient, }, nil } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go index a99acc7b6a..1bd895c3c7 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go @@ -21,19 +21,21 @@ import ( "fmt" "testing" - "sigs.k8s.io/descheduler/pkg/api" - "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" - frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing" - frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" - v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" + "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing" + frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" "sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/test" ) @@ -48,14 +50,17 @@ func TestLowNodeUtilization(t *testing.T) { notMatchingNodeSelectorValue := "east" testCases := []struct { - name string - useDeviationThresholds bool - thresholds, targetThresholds api.ResourceThresholds - nodes []*v1.Node - pods []*v1.Pod - expectedPodsEvicted uint - evictedPods []string - evictableNamespaces *api.Namespaces + name string + useDeviationThresholds bool + thresholds, targetThresholds api.ResourceThresholds + nodes []*v1.Node + pods []*v1.Pod + nodemetricses []*v1beta1.NodeMetrics + podmetricses []*v1beta1.PodMetrics + expectedPodsEvicted uint + expectedPodsWithMetricsEvicted uint + evictedPods []string + evictableNamespaces *api.Namespaces }{ { name: "no evictable pods", @@ -103,7 +108,20 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 0, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 2401, 1714978816), + test.BuildNodeMetrics(n2NodeName, 401, 1714978816), + test.BuildNodeMetrics(n3NodeName, 10, 1714978816), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities", @@ -153,7 +171,20 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 4, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, }, { name: "without priorities, but excluding namespaces", @@ -218,12 +249,25 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, evictableNamespaces: &api.Namespaces{ Exclude: []string{ "namespace1", }, }, - expectedPodsEvicted: 0, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities, but include only default namespace", @@ -283,12 +327,25 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, evictableNamespaces: &api.Namespaces{ Include: []string{ "default", }, }, - expectedPodsEvicted: 2, + expectedPodsEvicted: 2, + expectedPodsWithMetricsEvicted: 2, }, { name: "without priorities stop when cpu capacity is depleted", @@ -306,14 +363,14 @@ func TestLowNodeUtilization(t *testing.T) { test.BuildTestNode(n3NodeName, 4000, 3000, 10, test.SetNodeUnschedulable), }, pods: []*v1.Pod{ - test.BuildTestPod("p1", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p2", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p3", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p4", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p5", 400, 300, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), // These won't be evicted. - test.BuildTestPod("p6", 400, 300, n1NodeName, test.SetDSOwnerRef), - test.BuildTestPod("p7", 400, 300, n1NodeName, func(pod *v1.Pod) { + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) { // A pod with local storage. test.SetNormalOwnerRef(pod) pod.Spec.Volumes = []v1.Volume{ @@ -330,17 +387,29 @@ func TestLowNodeUtilization(t *testing.T) { // A Mirror Pod. pod.Annotations = test.GetMirrorPodAnnotation() }), - test.BuildTestPod("p8", 400, 300, n1NodeName, func(pod *v1.Pod) { + test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) { // A Critical Pod. test.SetNormalOwnerRef(pod) pod.Namespace = "kube-system" priority := utils.SystemCriticalPriority pod.Spec.Priority = &priority }), - test.BuildTestPod("p9", 400, 2100, n2NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - // 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before cpu is depleted - expectedPodsEvicted: 3, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 0, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, }, { name: "with priorities", @@ -410,7 +479,20 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 4, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, }, { name: "without priorities evicting best-effort pods only", @@ -478,8 +560,21 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 4, - evictedPods: []string{"p1", "p2", "p4", "p5"}, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, + evictedPods: []string{"p1", "p2", "p4", "p5"}, }, { name: "with extended resource", @@ -558,8 +653,21 @@ func TestLowNodeUtilization(t *testing.T) { test.SetPodExtendedResourceRequest(pod, extendedResource, 1) }), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, // 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before extended resource is depleted - expectedPodsEvicted: 3, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 0, }, { name: "with extended resource in some of nodes", @@ -586,8 +694,21 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, // 0 pods available for eviction because there's no enough extended resource in node2 - expectedPodsEvicted: 0, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities, but only other node is unschedulable", @@ -636,7 +757,19 @@ func TestLowNodeUtilization(t *testing.T) { pod.Spec.Priority = &priority }), }, - expectedPodsEvicted: 0, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities, but only other node doesn't match pod node selector for p4 and p5", @@ -701,7 +834,17 @@ func TestLowNodeUtilization(t *testing.T) { pod.Spec.Priority = &priority }), }, - expectedPodsEvicted: 3, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + }, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 3, }, { name: "without priorities, but only other node doesn't match pod node affinity for p4 and p5", @@ -795,7 +938,17 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 3, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + }, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 3, }, { name: "deviation thresholds", @@ -847,71 +1000,211 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 2, - evictedPods: []string{}, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 2, + expectedPodsWithMetricsEvicted: 2, + evictedPods: []string{}, + }, + { + name: "without priorities different evictions for requested and actual resources", + thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + targetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeSelectorKey: notMatchingNodeSelectorValue, + } + }), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p4", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A pod with affinity to run in the "west" datacenter upon scheduling + test.SetNormalOwnerRef(pod) + pod.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: nodeSelectorKey, + Operator: "In", + Values: []string{nodeSelectorValue}, + }, + }, + }, + }, + }, + }, + } + }), + test.BuildTestPod("p5", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A pod with affinity to run in the "west" datacenter upon scheduling + test.SetNormalOwnerRef(pod) + pod.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: nodeSelectorKey, + Operator: "In", + Values: []string{nodeSelectorValue}, + }, + }, + }, + }, + }, + }, + } + }), + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A pod with local storage. + test.SetNormalOwnerRef(pod) + pod.Spec.Volumes = []v1.Volume{ + { + Name: "sample", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{Path: "somePath"}, + EmptyDir: &v1.EmptyDirVolumeSource{ + SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI), + }, + }, + }, + } + // A Mirror Pod. + pod.Annotations = test.GetMirrorPodAnnotation() + }), + test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A Critical Pod. + test.SetNormalOwnerRef(pod) + pod.Namespace = "kube-system" + priority := utils.SystemCriticalPriority + pod.Spec.Priority = &priority + }), + test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), + }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 801, 0), + test.BuildPodMetrics("p2", 801, 0), + test.BuildPodMetrics("p3", 801, 0), + }, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 2, }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - var objs []runtime.Object - for _, node := range tc.nodes { - objs = append(objs, node) - } - for _, pod := range tc.pods { - objs = append(objs, pod) - } - fakeClient := fake.NewSimpleClientset(objs...) + var objs []runtime.Object + for _, node := range tc.nodes { + objs = append(objs, node) + } + for _, pod := range tc.pods { + objs = append(objs, pod) + } - podsForEviction := make(map[string]struct{}) - for _, pod := range tc.evictedPods { - podsForEviction[pod] = struct{}{} - } + metricsClientset := fakemetricsclient.NewSimpleClientset() + for _, nodemetrics := range tc.nodemetricses { + metricsClientset.Tracker().Create(nodesgvr, nodemetrics, "") + } + for _, podmetrics := range tc.podmetricses { + metricsClientset.Tracker().Create(podsgvr, podmetrics, podmetrics.Namespace) + } + + fakeClient := fake.NewSimpleClientset(objs...) + + collector := metricscollector.NewMetricsCollector(fakeClient, metricsClientset, "") + err := collector.Collect(ctx) + if err != nil { + t.Fatalf("unable to collect metrics: %v", err) + } - evictionFailed := false - if len(tc.evictedPods) > 0 { - fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { - getAction := action.(core.CreateAction) - obj := getAction.GetObject() - if eviction, ok := obj.(*policy.Eviction); ok { - if _, exists := podsForEviction[eviction.Name]; exists { - return true, obj, nil + podsForEviction := make(map[string]struct{}) + for _, pod := range tc.evictedPods { + podsForEviction[pod] = struct{}{} + } + + evictionFailed := false + if len(tc.evictedPods) > 0 { + fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.CreateAction) + obj := getAction.GetObject() + if eviction, ok := obj.(*policy.Eviction); ok { + if _, exists := podsForEviction[eviction.Name]; exists { + return true, obj, nil + } + evictionFailed = true + return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) } - evictionFailed = true - return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) - } - return true, obj, nil - }) - } + return true, obj, nil + }) + } - handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) - if err != nil { - t.Fatalf("Unable to initialize a framework handle: %v", err) - } + handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) + if err != nil { + t.Fatalf("Unable to initialize a framework handle: %v", err) + } + handle.MetricsCollectorImpl = collector - plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ - Thresholds: tc.thresholds, - TargetThresholds: tc.targetThresholds, - UseDeviationThresholds: tc.useDeviationThresholds, - EvictableNamespaces: tc.evictableNamespaces, - }, - handle) - if err != nil { - t.Fatalf("Unable to initialize the plugin: %v", err) - } - plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) + plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ + Thresholds: tc.thresholds, + TargetThresholds: tc.targetThresholds, + UseDeviationThresholds: tc.useDeviationThresholds, + EvictableNamespaces: tc.evictableNamespaces, + MetricsUtilization: MetricsUtilization{ + MetricsServer: metricsEnabled, + }, + }, + handle) + if err != nil { + t.Fatalf("Unable to initialize the plugin: %v", err) + } + plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) - podsEvicted := podEvictor.TotalEvicted() - if tc.expectedPodsEvicted != podsEvicted { - t.Errorf("Expected %v pods to be evicted but %v got evicted", tc.expectedPodsEvicted, podsEvicted) + podsEvicted := podEvictor.TotalEvicted() + if expectedPodsEvicted != podsEvicted { + t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted) + } + if evictionFailed { + t.Errorf("Pod evictions failed unexpectedly") + } } - if evictionFailed { - t.Errorf("Pod evictions failed unexpectedly") - } - }) + } + t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted)) + t.Run(tc.name+" with metrics enabled", testFnc(true, tc.expectedPodsWithMetricsEvicted)) } } diff --git a/pkg/framework/plugins/nodeutilization/types.go b/pkg/framework/plugins/nodeutilization/types.go index 4823b0695d..8e005fa02d 100644 --- a/pkg/framework/plugins/nodeutilization/types.go +++ b/pkg/framework/plugins/nodeutilization/types.go @@ -28,6 +28,7 @@ type LowNodeUtilizationArgs struct { Thresholds api.ResourceThresholds `json:"thresholds"` TargetThresholds api.ResourceThresholds `json:"targetThresholds"` NumberOfNodes int `json:"numberOfNodes,omitempty"` + MetricsUtilization MetricsUtilization `json:"metricsUtilization,omitempty"` // Naming this one differently since namespaces are still // considered while considering resources used by pods @@ -41,10 +42,19 @@ type LowNodeUtilizationArgs struct { type HighNodeUtilizationArgs struct { metav1.TypeMeta `json:",inline"` - Thresholds api.ResourceThresholds `json:"thresholds"` - NumberOfNodes int `json:"numberOfNodes,omitempty"` + Thresholds api.ResourceThresholds `json:"thresholds"` + NumberOfNodes int `json:"numberOfNodes,omitempty"` + MetricsUtilization MetricsUtilization `json:"metricsUtilization,omitempty"` + // Naming this one differently since namespaces are still // considered while considering resources used by pods // but then filtered out before eviction EvictableNamespaces *api.Namespaces `json:"evictableNamespaces,omitempty"` } + +// MetricsUtilization allow to consume actual resource utilization from metrics +type MetricsUtilization struct { + // metricsServer enables metrics from a kubernetes metrics server. + // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. + MetricsServer bool `json:"metricsServer,omitempty"` +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 6085238970..0bf89aca70 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -17,12 +17,16 @@ limitations under the License. package nodeutilization import ( + "context" "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" utilptr "k8s.io/utils/ptr" + + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/utils" @@ -100,3 +104,89 @@ func (s *requestedUsageClient) sync(nodes []*v1.Node) error { return nil } + +type actualUsageClient struct { + resourceNames []v1.ResourceName + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + metricsCollector *metricscollector.MetricsCollector + + _pods map[string][]*v1.Pod + _nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity +} + +var _ usageClient = &actualUsageClient{} + +func newActualUsageClient( + resourceNames []v1.ResourceName, + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, + metricsCollector *metricscollector.MetricsCollector, +) *actualUsageClient { + return &actualUsageClient{ + resourceNames: resourceNames, + getPodsAssignedToNode: getPodsAssignedToNode, + metricsCollector: metricsCollector, + } +} + +func (client *actualUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity { + return client._nodeUtilization[node] +} + +func (client *actualUsageClient) pods(node string) []*v1.Pod { + return client._pods[node] +} + +func (client *actualUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + // It's not efficient to keep track of all pods in a cluster when only their fractions is evicted. + // Thus, take the current pod metrics without computing any softening (like e.g. EWMA). + podMetrics, err := client.metricsCollector.MetricsClient().MetricsV1beta1().PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("unable to get podmetrics for %q/%q: %v", pod.Namespace, pod.Name, err) + } + + totalUsage := make(map[v1.ResourceName]*resource.Quantity) + for _, container := range podMetrics.Containers { + for _, resourceName := range client.resourceNames { + if _, exists := container.Usage[resourceName]; !exists { + continue + } + if totalUsage[resourceName] == nil { + totalUsage[resourceName] = utilptr.To[resource.Quantity](container.Usage[resourceName].DeepCopy()) + } else { + totalUsage[resourceName].Add(container.Usage[resourceName]) + } + } + } + + return totalUsage, nil +} + +func (client *actualUsageClient) sync(nodes []*v1.Node) error { + client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) + client._pods = make(map[string][]*v1.Pod) + + nodesUsage, err := client.metricsCollector.AllNodesUsage() + if err != nil { + return err + } + + for _, node := range nodes { + pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) + if err != nil { + klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) + return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + } + + nodeUsage, ok := nodesUsage[node.Name] + if !ok { + return fmt.Errorf("unable to find node %q in the collected metrics", node.Name) + } + nodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI) + + // store the snapshot of pods from the same (or the closest) node utilization computation + client._pods[node.Name] = pods + client._nodeUtilization[node.Name] = nodeUsage + } + + return nil +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go new file mode 100644 index 0000000000..016335520a --- /dev/null +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeutilization + +import ( + "context" + "fmt" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" + + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/test" +) + +var ( + nodesgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"} + podsgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"} +) + +func updateMetricsAndCheckNodeUtilization( + t *testing.T, + ctx context.Context, + newValue, expectedValue int64, + metricsClientset *fakemetricsclient.Clientset, + collector *metricscollector.MetricsCollector, + usageClient usageClient, + nodes []*v1.Node, + nodeName string, + nodemetrics *v1beta1.NodeMetrics, +) { + t.Logf("Set current node cpu usage to %v", newValue) + nodemetrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(newValue, resource.DecimalSI) + metricsClientset.Tracker().Update(nodesgvr, nodemetrics, "") + err := collector.Collect(ctx) + if err != nil { + t.Fatalf("failed to capture metrics: %v", err) + } + err = usageClient.sync(nodes) + if err != nil { + t.Fatalf("failed to capture a snapshot: %v", err) + } + nodeUtilization := usageClient.nodeUtilization(nodeName) + t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue()) + if nodeUtilization[v1.ResourceCPU].MilliValue() != expectedValue { + t.Fatalf("cpu node usage expected to be %v, got %v instead", expectedValue, nodeUtilization[v1.ResourceCPU].MilliValue()) + } + pods := usageClient.pods(nodeName) + fmt.Printf("pods: %#v\n", pods) + if len(pods) != 2 { + t.Fatalf("expected 2 pods for node %v, got %v instead", nodeName, len(pods)) + } +} + +func TestActualUsageClient(t *testing.T) { + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil) + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil) + p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + nodes := []*v1.Node{n1, n2, n3} + + n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816) + n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816) + n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816) + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3) + metricsClientset := fakemetricsclient.NewSimpleClientset() + metricsClientset.Tracker().Create(nodesgvr, n1metrics, "") + metricsClientset.Tracker().Create(nodesgvr, n2metrics, "") + metricsClientset.Tracker().Create(nodesgvr, n3metrics, "") + + ctx := context.TODO() + + resourceNames := []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } + + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + collector := metricscollector.NewMetricsCollector(clientset, metricsClientset, "") + + usageClient := newActualUsageClient( + resourceNames, + podsAssignedToNode, + collector, + ) + + updateMetricsAndCheckNodeUtilization(t, ctx, + 1400, 1400, + metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, + ) + + updateMetricsAndCheckNodeUtilization(t, ctx, + 500, 1310, + metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, + ) + + updateMetricsAndCheckNodeUtilization(t, ctx, + 900, 1269, + metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, + ) +} diff --git a/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go b/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go index 927c9089f7..ca108ef616 100644 --- a/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go +++ b/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go @@ -37,6 +37,7 @@ func (in *HighNodeUtilizationArgs) DeepCopyInto(out *HighNodeUtilizationArgs) { (*out)[key] = val } } + out.MetricsUtilization = in.MetricsUtilization if in.EvictableNamespaces != nil { in, out := &in.EvictableNamespaces, &out.EvictableNamespaces *out = new(api.Namespaces) @@ -81,6 +82,7 @@ func (in *LowNodeUtilizationArgs) DeepCopyInto(out *LowNodeUtilizationArgs) { (*out)[key] = val } } + out.MetricsUtilization = in.MetricsUtilization if in.EvictableNamespaces != nil { in, out := &in.EvictableNamespaces, &out.EvictableNamespaces *out = new(api.Namespaces) diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index 5cac3ea271..f142b0cdb7 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -20,21 +20,22 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "sigs.k8s.io/descheduler/metrics" - "sigs.k8s.io/descheduler/pkg/api" - "sigs.k8s.io/descheduler/pkg/descheduler/evictions" - podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" - "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" - frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" - "sigs.k8s.io/descheduler/pkg/tracing" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" + + "sigs.k8s.io/descheduler/metrics" + "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" + frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + "sigs.k8s.io/descheduler/pkg/tracing" ) // evictorImpl implements the Evictor interface so plugins @@ -67,6 +68,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev // handleImpl implements the framework handle which gets passed to plugins type handleImpl struct { clientSet clientset.Interface + metricsCollector *metricscollector.MetricsCollector getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc sharedInformerFactory informers.SharedInformerFactory evictor *evictorImpl @@ -79,6 +81,10 @@ func (hi *handleImpl) ClientSet() clientset.Interface { return hi.clientSet } +func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector { + return hi.metricsCollector +} + // GetPodsAssignedToNodeFunc retrieves GetPodsAssignedToNodeFunc implementation func (hi *handleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc { return hi.getPodsAssignedToNodeFunc @@ -128,6 +134,7 @@ type handleImplOpts struct { sharedInformerFactory informers.SharedInformerFactory getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc podEvictor *evictions.PodEvictor + metricsCollector *metricscollector.MetricsCollector } // WithClientSet sets clientSet for the scheduling frameworkImpl. @@ -155,6 +162,12 @@ func WithGetPodsAssignedToNodeFnc(getPodsAssignedToNodeFunc podutil.GetPodsAssig } } +func WithMetricsCollector(metricsCollector *metricscollector.MetricsCollector) Option { + return func(o *handleImplOpts) { + o.metricsCollector = metricsCollector + } +} + func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) { for idx, pluginConfig := range pluginConfigs { if pluginConfig.Name == pluginName { @@ -253,6 +266,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts profileName: config.Name, podEvictor: hOpts.podEvictor, }, + metricsCollector: hOpts.metricsCollector, } pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...) diff --git a/pkg/framework/types/types.go b/pkg/framework/types/types.go index 1d5e1d95cd..6cb95ca247 100644 --- a/pkg/framework/types/types.go +++ b/pkg/framework/types/types.go @@ -24,6 +24,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" ) @@ -36,6 +37,7 @@ type Handle interface { Evictor() Evictor GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc SharedInformerFactory() informers.SharedInformerFactory + MetricsCollector() *metricscollector.MetricsCollector } // Evictor defines an interface for filtering and evicting pods diff --git a/test/e2e/e2e_lownodeutilization_test.go b/test/e2e/e2e_lownodeutilization_test.go new file mode 100644 index 0000000000..125c5497f2 --- /dev/null +++ b/test/e2e/e2e_lownodeutilization_test.go @@ -0,0 +1,304 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "os" + "strings" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + componentbaseconfig "k8s.io/component-base/config" + utilptr "k8s.io/utils/ptr" + + "sigs.k8s.io/descheduler/pkg/api" + apiv1alpha2 "sigs.k8s.io/descheduler/pkg/api/v1alpha2" + "sigs.k8s.io/descheduler/pkg/descheduler/client" + "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + "sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization" +) + +func lowNodeUtilizationPolicy(lowNodeUtilizationArgs *nodeutilization.LowNodeUtilizationArgs, evictorArgs *defaultevictor.DefaultEvictorArgs, metricsCollectorEnabled bool) *apiv1alpha2.DeschedulerPolicy { + return &apiv1alpha2.DeschedulerPolicy{ + MetricsCollector: apiv1alpha2.MetricsCollector{ + Enabled: metricsCollectorEnabled, + }, + Profiles: []apiv1alpha2.DeschedulerProfile{ + { + Name: nodeutilization.LowNodeUtilizationPluginName + "Profile", + PluginConfigs: []apiv1alpha2.PluginConfig{ + { + Name: nodeutilization.LowNodeUtilizationPluginName, + Args: runtime.RawExtension{ + Object: lowNodeUtilizationArgs, + }, + }, + { + Name: defaultevictor.PluginName, + Args: runtime.RawExtension{ + Object: evictorArgs, + }, + }, + }, + Plugins: apiv1alpha2.Plugins{ + Filter: apiv1alpha2.PluginSet{ + Enabled: []string{ + defaultevictor.PluginName, + }, + }, + Balance: apiv1alpha2.PluginSet{ + Enabled: []string{ + nodeutilization.LowNodeUtilizationPluginName, + }, + }, + }, + }, + }, + } +} + +func TestLowNodeUtilizationKubernetesMetrics(t *testing.T) { + ctx := context.Background() + + clientSet, err := client.CreateClient(componentbaseconfig.ClientConnectionConfiguration{Kubeconfig: os.Getenv("KUBECONFIG")}, "") + if err != nil { + t.Errorf("Error during kubernetes client creation with %v", err) + } + + metricsClient, err := client.CreateMetricsClient(componentbaseconfig.ClientConnectionConfiguration{Kubeconfig: os.Getenv("KUBECONFIG")}, "descheduler") + if err != nil { + t.Errorf("Error during kubernetes metrics client creation with %v", err) + } + + nodeList, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("Error listing node with %v", err) + } + + _, workerNodes := splitNodesAndWorkerNodes(nodeList.Items) + + testNamespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "e2e-" + strings.ToLower(t.Name())}} + t.Logf("Creating testing namespace %q", testNamespace.Name) + if _, err := clientSet.CoreV1().Namespaces().Create(ctx, testNamespace, metav1.CreateOptions{}); err != nil { + t.Fatalf("Unable to create ns %v: %v", testNamespace.Name, err) + } + defer clientSet.CoreV1().Namespaces().Delete(ctx, testNamespace.Name, metav1.DeleteOptions{}) + + t.Log("Creating duplicates pods") + testLabel := map[string]string{"app": "test-lownodeutilization-kubernetes-metrics", "name": "test-lownodeutilization-kubernetes-metrics"} + deploymentObj := buildTestDeployment("lownodeutilization-kubernetes-metrics-pod", testNamespace.Name, 0, testLabel, nil) + deploymentObj.Spec.Template.Spec.Containers[0].Image = "narmidm/k8s-pod-cpu-stressor:latest" + deploymentObj.Spec.Template.Spec.Containers[0].Args = []string{"-cpu=3", "-duration=10s", "-forever"} + deploymentObj.Spec.Template.Spec.Containers[0].Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3000m"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0m"), + }, + } + + tests := []struct { + name string + replicasNum int + beforeFunc func(deployment *appsv1.Deployment) + expectedEvictedPodCount int + lowNodeUtilizationArgs *nodeutilization.LowNodeUtilizationArgs + evictorArgs *defaultevictor.DefaultEvictorArgs + metricsCollectorEnabled bool + }{ + { + name: "metric server not enabled", + replicasNum: 4, + beforeFunc: func(deployment *appsv1.Deployment) { + deployment.Spec.Replicas = utilptr.To[int32](4) + deployment.Spec.Template.Spec.NodeName = workerNodes[0].Name + }, + expectedEvictedPodCount: 0, + lowNodeUtilizationArgs: &nodeutilization.LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + TargetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + MetricsUtilization: nodeutilization.MetricsUtilization{ + MetricsServer: true, + }, + }, + evictorArgs: &defaultevictor.DefaultEvictorArgs{}, + metricsCollectorEnabled: false, + }, + { + name: "requested cpu resource zero, actual cpu utilization 3 per pod", + replicasNum: 4, + beforeFunc: func(deployment *appsv1.Deployment) { + deployment.Spec.Replicas = utilptr.To[int32](4) + deployment.Spec.Template.Spec.NodeName = workerNodes[0].Name + }, + expectedEvictedPodCount: 2, + lowNodeUtilizationArgs: &nodeutilization.LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + TargetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + MetricsUtilization: nodeutilization.MetricsUtilization{ + MetricsServer: true, + }, + }, + evictorArgs: &defaultevictor.DefaultEvictorArgs{}, + metricsCollectorEnabled: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Creating deployment %v in %v namespace", deploymentObj.Name, deploymentObj.Namespace) + tc.beforeFunc(deploymentObj) + + _, err = clientSet.AppsV1().Deployments(deploymentObj.Namespace).Create(ctx, deploymentObj, metav1.CreateOptions{}) + if err != nil { + t.Logf("Error creating deployment: %v", err) + if err = clientSet.AppsV1().Deployments(deploymentObj.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(deploymentObj.Labels).String(), + }); err != nil { + t.Fatalf("Unable to delete deployment: %v", err) + } + return + } + defer func() { + clientSet.AppsV1().Deployments(deploymentObj.Namespace).Delete(ctx, deploymentObj.Name, metav1.DeleteOptions{}) + waitForPodsToDisappear(ctx, t, clientSet, deploymentObj.Labels, deploymentObj.Namespace) + }() + waitForPodsRunning(ctx, t, clientSet, deploymentObj.Labels, tc.replicasNum, deploymentObj.Namespace) + // wait until workerNodes[0].Name has the right actual cpu utilization and all the testing pods are running + // and producing ~12 cores in total + wait.PollUntilWithContext(ctx, 5*time.Second, func(context.Context) (done bool, err error) { + item, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, workerNodes[0].Name, metav1.GetOptions{}) + t.Logf("Waiting for %q nodemetrics cpu utilization to get over 12, currently %v", workerNodes[0].Name, item.Usage.Cpu().Value()) + if item.Usage.Cpu().Value() < 12 { + return false, nil + } + totalCpu := resource.NewMilliQuantity(0, resource.DecimalSI) + podItems, err := metricsClient.MetricsV1beta1().PodMetricses(deploymentObj.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Logf("unable to list podmetricses: %v", err) + return false, nil + } + for _, podMetrics := range podItems.Items { + for _, container := range podMetrics.Containers { + if _, exists := container.Usage[v1.ResourceCPU]; !exists { + continue + } + totalCpu.Add(container.Usage[v1.ResourceCPU]) + } + } + // Value() will round up (e.g. 11.1 -> 12), which is still ok + t.Logf("Waiting for totalCpu to get to 12 at least, got %v\n", totalCpu.Value()) + return totalCpu.Value() >= 12, nil + }) + + preRunNames := sets.NewString(getCurrentPodNames(ctx, clientSet, testNamespace.Name, t)...) + + // Deploy the descheduler with the configured policy + deschedulerPolicyConfigMapObj, err := deschedulerPolicyConfigMap(lowNodeUtilizationPolicy(tc.lowNodeUtilizationArgs, tc.evictorArgs, tc.metricsCollectorEnabled)) + if err != nil { + t.Fatalf("Error creating %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + + t.Logf("Creating %q policy CM with LowNodeUtilization configured...", deschedulerPolicyConfigMapObj.Name) + _, err = clientSet.CoreV1().ConfigMaps(deschedulerPolicyConfigMapObj.Namespace).Create(ctx, deschedulerPolicyConfigMapObj, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + + defer func() { + t.Logf("Deleting %q CM...", deschedulerPolicyConfigMapObj.Name) + err = clientSet.CoreV1().ConfigMaps(deschedulerPolicyConfigMapObj.Namespace).Delete(ctx, deschedulerPolicyConfigMapObj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Unable to delete %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + }() + + deschedulerDeploymentObj := deschedulerDeployment(testNamespace.Name) + t.Logf("Creating descheduler deployment %v", deschedulerDeploymentObj.Name) + _, err = clientSet.AppsV1().Deployments(deschedulerDeploymentObj.Namespace).Create(ctx, deschedulerDeploymentObj, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating %q deployment: %v", deschedulerDeploymentObj.Name, err) + } + + deschedulerPodName := "" + defer func() { + if deschedulerPodName != "" { + printPodLogs(ctx, t, clientSet, deschedulerPodName) + } + + t.Logf("Deleting %q deployment...", deschedulerDeploymentObj.Name) + err = clientSet.AppsV1().Deployments(deschedulerDeploymentObj.Namespace).Delete(ctx, deschedulerDeploymentObj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Unable to delete %q deployment: %v", deschedulerDeploymentObj.Name, err) + } + + waitForPodsToDisappear(ctx, t, clientSet, deschedulerDeploymentObj.Labels, deschedulerDeploymentObj.Namespace) + }() + + t.Logf("Waiting for the descheduler pod running") + deschedulerPods := waitForPodsRunning(ctx, t, clientSet, deschedulerDeploymentObj.Labels, 1, deschedulerDeploymentObj.Namespace) + if len(deschedulerPods) != 0 { + deschedulerPodName = deschedulerPods[0].Name + } + + // Run LowNodeUtilization plugin + var meetsExpectations bool + var actualEvictedPodCount int + if err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + currentRunNames := sets.NewString(getCurrentPodNames(ctx, clientSet, testNamespace.Name, t)...) + actualEvictedPod := preRunNames.Difference(currentRunNames) + actualEvictedPodCount = actualEvictedPod.Len() + t.Logf("preRunNames: %v, currentRunNames: %v, actualEvictedPodCount: %v\n", preRunNames.List(), currentRunNames.List(), actualEvictedPodCount) + if actualEvictedPodCount != tc.expectedEvictedPodCount { + t.Logf("Expecting %v number of pods evicted, got %v instead", tc.expectedEvictedPodCount, actualEvictedPodCount) + return false, nil + } + meetsExpectations = true + return true, nil + }); err != nil { + t.Errorf("Error waiting for descheduler running: %v", err) + } + + if !meetsExpectations { + t.Errorf("Unexpected number of pods have been evicted, got %v, expected %v", actualEvictedPodCount, tc.expectedEvictedPodCount) + } else { + t.Logf("Total of %d Pods were evicted for %s", actualEvictedPodCount, tc.name) + } + }) + } +} diff --git a/test/run-e2e-tests.sh b/test/run-e2e-tests.sh index 6856df2f32..ebbee4f958 100755 --- a/test/run-e2e-tests.sh +++ b/test/run-e2e-tests.sh @@ -53,5 +53,10 @@ fi # Deploy rbac, sa and binding for a descheduler running through a deployment kubectl apply -f kubernetes/base/rbac.yaml +METRICS_SERVER_VERSION="v0.5.0" +kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/${METRICS_SERVER_VERSION}/components.yaml +kubectl patch -n kube-system deployment metrics-server --type=json \ + -p '[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--kubelet-insecure-tls"}]' + PRJ_PREFIX="sigs.k8s.io/descheduler" go test ${PRJ_PREFIX}/test/e2e/ -v -timeout 0 diff --git a/test/test_utils.go b/test/test_utils.go index 21c85a07b2..5a3169dd19 100644 --- a/test/test_utils.go +++ b/test/test_utils.go @@ -23,17 +23,17 @@ import ( "testing" "time" - policyv1 "k8s.io/api/policy/v1" - "k8s.io/apimachinery/pkg/util/intstr" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" utilptr "k8s.io/utils/ptr" ) @@ -89,6 +89,26 @@ func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget { return pdb } +// BuildPodMetrics creates a test podmetrics with given parameters. +func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics { + return &v1beta1.PodMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Window: metav1.Duration{Duration: 20010000000}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container-1", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI), + }, + }, + }, + } +} + // GetMirrorPodAnnotation returns the annotation needed for mirror pod. func GetMirrorPodAnnotation() map[string]string { return map[string]string{ @@ -157,6 +177,19 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) return node } +func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics { + return &v1beta1.NodeMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Window: metav1.Duration{Duration: 20010000000}, + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI), + }, + } +} + // MakeBestEffortPod makes the given pod a BestEffort pod func MakeBestEffortPod(pod *v1.Pod) { pod.Spec.Containers[0].Resources.Requests = nil