Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/cluster/cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ import (
)

func (cm *ClusterManager) GetClusters(c *gin.Context) {
result := make([]common.ClusterInfo, 0, len(cm.clusters))
clusters, errs, defaultCtx := cm.Snapshot()
result := make([]common.ClusterInfo, 0, len(clusters)+len(errs))
user := c.MustGet("user").(model.User)
for name, cluster := range cm.clusters {
for name, cluster := range clusters {
if !rbac.CanAccessCluster(user, name) {
continue
}
result = append(result, common.ClusterInfo{
Name: name,
Version: cluster.Version,
IsDefault: name == cm.defaultContext,
IsDefault: name == defaultCtx,
})
}
for name, errMsg := range cm.errors {
for name, errMsg := range errs {
if !rbac.CanAccessCluster(user, name) {
continue
}
Expand Down Expand Up @@ -66,10 +67,10 @@ func (cm *ClusterManager) GetClusterList(c *gin.Context) {
"config": "",
}

if clientSet, exists := cm.clusters[cluster.Name]; exists {
clusterInfo["version"] = clientSet.Version
if version, ok := cm.ClusterVersion(cluster.Name); ok {
clusterInfo["version"] = version
}
if errMsg, exists := cm.errors[cluster.Name]; exists {
if errMsg, ok := cm.ClusterError(cluster.Name); ok {
clusterInfo["error"] = errMsg
}

Expand Down
116 changes: 96 additions & 20 deletions pkg/cluster/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/zxh326/kite/pkg/kube"
Expand All @@ -30,11 +31,47 @@ type ClientSet struct {
}

type ClusterManager struct {
mu sync.RWMutex
clusters map[string]*ClientSet
errors map[string]string
defaultContext string
}

// Snapshot returns a shallow copy of the cluster and error maps plus the
// default context. Callers may iterate the returned maps without holding
// a lock because they are independent copies.
func (cm *ClusterManager) Snapshot() (clusters map[string]*ClientSet, errs map[string]string, defaultCtx string) {
cm.mu.RLock()
defer cm.mu.RUnlock()
clusters = make(map[string]*ClientSet, len(cm.clusters))
for k, v := range cm.clusters {
clusters[k] = v
}
errs = make(map[string]string, len(cm.errors))
for k, v := range cm.errors {
errs[k] = v
}
return clusters, errs, cm.defaultContext
}

// ClusterVersion returns the version string for a cluster, if it is loaded.
func (cm *ClusterManager) ClusterVersion(name string) (version string, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
if cs, exists := cm.clusters[name]; exists {
return cs.Version, true
}
return "", false
}

// ClusterError returns the error string for a cluster, if one is recorded.
func (cm *ClusterManager) ClusterError(name string) (errMsg string, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
errMsg, ok = cm.errors[name]
return
}

func createClientSetInCluster(name, prometheusURL string) (*ClientSet, error) {
config, err := rest.InClusterConfig()
if err != nil {
Expand Down Expand Up @@ -168,6 +205,13 @@ func (t *k8sProxyTransport) RoundTrip(req *http.Request) (*http.Response, error)
}

func (cm *ClusterManager) GetClientSet(clusterName string) (*ClientSet, error) {
cm.mu.RLock()
defer cm.mu.RUnlock()
return cm.getClientSetLocked(clusterName)
}

// getClientSetLocked assumes cm.mu is already held (at least RLock).
func (cm *ClusterManager) getClientSetLocked(clusterName string) (*ClientSet, error) {
if len(cm.clusters) == 0 {
return nil, fmt.Errorf("no clusters available")
}
Expand All @@ -178,7 +222,7 @@ func (cm *ClusterManager) GetClientSet(clusterName string) (*ClientSet, error) {
return cs, nil
}
}
return cm.GetClientSet(cm.defaultContext)
return cm.getClientSetLocked(cm.defaultContext)
}
if cluster, ok := cm.clusters[clusterName]; ok {
return cluster, nil
Expand Down Expand Up @@ -238,42 +282,74 @@ func syncClusters(cm *ClusterManager) error {
time.Sleep(5 * time.Second)
return err
}
dbClusterMap := make(map[string]interface{})

// ── Phase 1: Read current state under RLock (fast) ──────────────
cm.mu.RLock()
oldClusters := make(map[string]*ClientSet, len(cm.clusters))
for k, v := range cm.clusters {
oldClusters[k] = v
}
oldErrors := make(map[string]string, len(cm.errors))
for k, v := range cm.errors {
oldErrors[k] = v
}
cm.mu.RUnlock()

// ── Phase 2: Build new state WITHOUT holding any lock (slow I/O) ─
newClusters := make(map[string]*ClientSet, len(clusters))
newErrors := make(map[string]string)
newDefault := ""
var stoppedClients []*ClientSet // defer Stop calls until after unlock

dbClusterMap := make(map[string]struct{}, len(clusters))
for _, cluster := range clusters {
dbClusterMap[cluster.Name] = cluster
dbClusterMap[cluster.Name] = struct{}{}
if cluster.IsDefault {
cm.defaultContext = cluster.Name
newDefault = cluster.Name
}
current, currentExist := cm.clusters[cluster.Name]
current := oldClusters[cluster.Name]
if shouldUpdateCluster(current, cluster) {
if currentExist {
delete(cm.clusters, cluster.Name)
current.K8sClient.Stop(cluster.Name)
if current != nil {
stoppedClients = append(stoppedClients, current)
}
if cluster.Enable {
clientSet, err := buildClientSet(cluster)
if err != nil {
klog.Errorf("Failed to build k8s client for cluster %s, in cluster: %t, err: %v", cluster.Name, cluster.InCluster, err)
cm.errors[cluster.Name] = err.Error()
newErrors[cluster.Name] = err.Error()
continue
}
delete(cm.errors, cluster.Name)
cm.clusters[cluster.Name] = clientSet
} else {
delete(cm.errors, cluster.Name)
newClusters[cluster.Name] = clientSet
}
// If !cluster.Enable we simply omit it from both maps
} else {
// No update needed β€” carry forward current state
if current != nil {
newClusters[cluster.Name] = current
}
if errMsg, ok := oldErrors[cluster.Name]; ok {
newErrors[cluster.Name] = errMsg
}
}
}
for name, clientSet := range cm.clusters {

// Clusters removed from the DB: stop their clients
for name, clientSet := range oldClusters {
if _, ok := dbClusterMap[name]; !ok {
delete(cm.clusters, name)
clientSet.K8sClient.Stop(name)
stoppedClients = append(stoppedClients, clientSet)
}
}
for name := range cm.errors {
if _, ok := dbClusterMap[name]; !ok {
delete(cm.errors, name)
}

// ── Phase 3: Swap under Lock (microseconds β€” only pointer assignments) ─
cm.mu.Lock()
cm.clusters = newClusters
cm.errors = newErrors
cm.defaultContext = newDefault
cm.mu.Unlock()

// ── Phase 4: Stop old clients outside any lock ──────────────────
for _, cs := range stoppedClients {
cs.K8sClient.Stop(cs.Name)
}

return nil
Expand Down