Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 58 additions & 32 deletions pkg/handlers/resources/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sort"

"github.com/gin-gonic/gin"
"github.com/samber/lo"
"github.com/zxh326/kite/pkg/cluster"
"github.com/zxh326/kite/pkg/common"
"github.com/zxh326/kite/pkg/kube"
Expand All @@ -16,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
metricsv1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type NodeHandler struct {
Expand Down Expand Up @@ -262,44 +262,54 @@ func (h *NodeHandler) List(c *gin.Context) {
klog.Warningf("Failed to list node metrics: %v", err)
}

// Get all pods to calculate resource requests per node
var pods corev1.PodList
if err := cs.K8sClient.List(c.Request.Context(), &pods); err != nil {
klog.Warningf("Failed to list pods for node resource calculation: %v", err)
// Build node metrics lookup map
nodeMetricsMap := make(map[string]metricsv1.NodeMetrics, len(nodeMetrics.Items))
for _, nm := range nodeMetrics.Items {
nodeMetricsMap[nm.Name] = nm
}

// Group pods by node name and calculate resource requests
nodeResourceRequests := make(map[string]common.MetricsCell)
for _, pod := range pods.Items {
if pod.Spec.NodeName == "" {
continue // Skip pods not scheduled to any node
}

nodeName := pod.Spec.NodeName
if _, exists := nodeResourceRequests[nodeName]; !exists {
nodeResourceRequests[nodeName] = common.MetricsCell{}
}

metrics := nodeResourceRequests[nodeName]
metrics.Pods++

// Calculate CPU and memory requests for this pod
for _, container := range pod.Spec.Containers {
if cpuRequest := container.Resources.Requests.Cpu(); cpuRequest != nil {
metrics.CPURequest += cpuRequest.MilliValue()
// Calculate resource requests per node.
// When the informer cache is enabled, use the spec.nodeName field indexer for
// O(1) lookups per node. When cache is disabled (DISABLE_CACHE=true), fall back
// to a single cluster-wide pod list to avoid O(N) API server round-trips.
nodeResourceRequests := make(map[string]common.MetricsCell, len(nodes.Items))
if cs.K8sClient.CacheEnabled {
// Optimised path: per-node indexed query against the local informer cache.
for _, node := range nodes.Items {
var nodePods corev1.PodList
if err := cs.K8sClient.List(c.Request.Context(), &nodePods,
client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
klog.Warningf("Failed to list pods for node %s: %v", node.Name, err)
continue
}
if memoryRequest := container.Resources.Requests.Memory(); memoryRequest != nil {
metrics.MemoryRequest += memoryRequest.Value()
nodeResourceRequests[node.Name] = sumPodResources(nodePods.Items)
}
} else {
// Fallback path: single cluster-wide list, then group in memory.
var allPods corev1.PodList
if err := cs.K8sClient.List(c.Request.Context(), &allPods); err != nil {
klog.Warningf("Failed to list pods: %v", err)
} else {
for i := range allPods.Items {
pod := &allPods.Items[i]
if pod.Spec.NodeName == "" {
continue
}
existing := nodeResourceRequests[pod.Spec.NodeName]
existing.Pods++
for _, container := range pod.Spec.Containers {
if cpuRequest := container.Resources.Requests.Cpu(); cpuRequest != nil {
existing.CPURequest += cpuRequest.MilliValue()
}
if memoryRequest := container.Resources.Requests.Memory(); memoryRequest != nil {
existing.MemoryRequest += memoryRequest.Value()
}
}
nodeResourceRequests[pod.Spec.NodeName] = existing
}
}

nodeResourceRequests[nodeName] = metrics
}

nodeMetricsMap := lo.KeyBy(nodeMetrics.Items, func(item metricsv1.NodeMetrics) string {
return item.Name
})

result := &common.NodeListWithMetrics{
TypeMeta: nodes.TypeMeta,
ListMeta: nodes.ListMeta,
Expand Down Expand Up @@ -343,3 +353,19 @@ func (h *NodeHandler) registerCustomRoutes(group *gin.RouterGroup) {
group.POST("/_all/:name/taint", h.TaintNode)
group.POST("/_all/:name/untaint", h.UntaintNode)
}

// sumPodResources aggregates pod count and resource requests from a slice of pods.
func sumPodResources(pods []corev1.Pod) common.MetricsCell {
m := common.MetricsCell{Pods: int64(len(pods))}
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if cpuRequest := container.Resources.Requests.Cpu(); cpuRequest != nil {
m.CPURequest += cpuRequest.MilliValue()
}
if memoryRequest := container.Resources.Requests.Memory(); memoryRequest != nil {
m.MemoryRequest += memoryRequest.Value()
}
}
}
return m
}
Loading