From 0ae55e84bfa6ea90664f369d7c604a250e2f4e58 Mon Sep 17 00:00:00 2001 From: DioCrafts Date: Sat, 21 Mar 2026 15:01:19 +0100 Subject: [PATCH] fix(cluster): make ClusterManager concurrency-safe with sync.RWMutex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Finding 2.1: ClusterManager.clusters and .errors maps were read from every HTTP request (via GetClientSet, GetClusters, GetClusterList) and written by the background syncClusters goroutine without any synchronization. This is a data race that causes 'concurrent map read and map write' panics in production. Solution A — sync.RWMutex: - Added sync.RWMutex to ClusterManager struct - GetClientSet() now holds RLock while reading maps - Readers (HTTP requests) can execute concurrently with each other - Only syncClusters takes an exclusive Lock, and only briefly Solution D — Encapsulated access methods: - Snapshot() returns shallow copies of both maps + defaultContext under RLock so callers can iterate safely without holding the lock - ClusterVersion(name) and ClusterError(name) provide single-key lookups under RLock for GetClusterList - GetClusters and GetClusterList no longer touch cm.clusters/errors directly — impossible to forget the lock in future changes Solution E — Minimal write-lock duration in syncClusters: - Phase 1: RLock — snapshot current state (microseconds) - Phase 2: No lock — build all new ClientSets (slow I/O, seconds) - Phase 3: Lock — swap 3 pointers (microseconds) - Phase 4: No lock — stop old clients - The exclusive Lock is held for only ~microseconds instead of the entire duration of building new K8s clients (potentially seconds) Dead code removed: - Replaced map[string]interface{} with map[string]struct{} for dbClusterMap (no value needed, just set membership) - Eliminated delete-during-iterate pattern that was unsafe with concurrent readers --- pkg/cluster/cluster_handler.go | 15 +++-- pkg/cluster/cluster_manager.go | 116 +++++++++++++++++++++++++++------ 2 files changed, 104 insertions(+), 27 deletions(-) diff --git a/pkg/cluster/cluster_handler.go b/pkg/cluster/cluster_handler.go index 45417a8b..2d563fca 100644 --- a/pkg/cluster/cluster_handler.go +++ b/pkg/cluster/cluster_handler.go @@ -17,19 +17,20 @@ import ( ) func (cm *ClusterManager) GetClusters(c *gin.Context) { - result := make([]common.ClusterInfo, 0, len(cm.clusters)) + clusters, errs, defaultCtx := cm.Snapshot() + result := make([]common.ClusterInfo, 0, len(clusters)+len(errs)) user := c.MustGet("user").(model.User) - for name, cluster := range cm.clusters { + for name, cluster := range clusters { if !rbac.CanAccessCluster(user, name) { continue } result = append(result, common.ClusterInfo{ Name: name, Version: cluster.Version, - IsDefault: name == cm.defaultContext, + IsDefault: name == defaultCtx, }) } - for name, errMsg := range cm.errors { + for name, errMsg := range errs { if !rbac.CanAccessCluster(user, name) { continue } @@ -66,10 +67,10 @@ func (cm *ClusterManager) GetClusterList(c *gin.Context) { "config": "", } - if clientSet, exists := cm.clusters[cluster.Name]; exists { - clusterInfo["version"] = clientSet.Version + if version, ok := cm.ClusterVersion(cluster.Name); ok { + clusterInfo["version"] = version } - if errMsg, exists := cm.errors[cluster.Name]; exists { + if errMsg, ok := cm.ClusterError(cluster.Name); ok { clusterInfo["error"] = errMsg } diff --git a/pkg/cluster/cluster_manager.go b/pkg/cluster/cluster_manager.go index fb45030e..23ea05f1 100644 --- a/pkg/cluster/cluster_manager.go +++ b/pkg/cluster/cluster_manager.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/zxh326/kite/pkg/kube" @@ -30,11 +31,47 @@ type ClientSet struct { } type ClusterManager struct { + mu sync.RWMutex clusters map[string]*ClientSet errors map[string]string defaultContext string } +// Snapshot returns a shallow copy of the cluster and error maps plus the +// default context. Callers may iterate the returned maps without holding +// a lock because they are independent copies. +func (cm *ClusterManager) Snapshot() (clusters map[string]*ClientSet, errs map[string]string, defaultCtx string) { + cm.mu.RLock() + defer cm.mu.RUnlock() + clusters = make(map[string]*ClientSet, len(cm.clusters)) + for k, v := range cm.clusters { + clusters[k] = v + } + errs = make(map[string]string, len(cm.errors)) + for k, v := range cm.errors { + errs[k] = v + } + return clusters, errs, cm.defaultContext +} + +// ClusterVersion returns the version string for a cluster, if it is loaded. +func (cm *ClusterManager) ClusterVersion(name string) (version string, ok bool) { + cm.mu.RLock() + defer cm.mu.RUnlock() + if cs, exists := cm.clusters[name]; exists { + return cs.Version, true + } + return "", false +} + +// ClusterError returns the error string for a cluster, if one is recorded. +func (cm *ClusterManager) ClusterError(name string) (errMsg string, ok bool) { + cm.mu.RLock() + defer cm.mu.RUnlock() + errMsg, ok = cm.errors[name] + return +} + func createClientSetInCluster(name, prometheusURL string) (*ClientSet, error) { config, err := rest.InClusterConfig() if err != nil { @@ -168,6 +205,13 @@ func (t *k8sProxyTransport) RoundTrip(req *http.Request) (*http.Response, error) } func (cm *ClusterManager) GetClientSet(clusterName string) (*ClientSet, error) { + cm.mu.RLock() + defer cm.mu.RUnlock() + return cm.getClientSetLocked(clusterName) +} + +// getClientSetLocked assumes cm.mu is already held (at least RLock). +func (cm *ClusterManager) getClientSetLocked(clusterName string) (*ClientSet, error) { if len(cm.clusters) == 0 { return nil, fmt.Errorf("no clusters available") } @@ -178,7 +222,7 @@ func (cm *ClusterManager) GetClientSet(clusterName string) (*ClientSet, error) { return cs, nil } } - return cm.GetClientSet(cm.defaultContext) + return cm.getClientSetLocked(cm.defaultContext) } if cluster, ok := cm.clusters[clusterName]; ok { return cluster, nil @@ -238,42 +282,74 @@ func syncClusters(cm *ClusterManager) error { time.Sleep(5 * time.Second) return err } - dbClusterMap := make(map[string]interface{}) + + // ── Phase 1: Read current state under RLock (fast) ────────────── + cm.mu.RLock() + oldClusters := make(map[string]*ClientSet, len(cm.clusters)) + for k, v := range cm.clusters { + oldClusters[k] = v + } + oldErrors := make(map[string]string, len(cm.errors)) + for k, v := range cm.errors { + oldErrors[k] = v + } + cm.mu.RUnlock() + + // ── Phase 2: Build new state WITHOUT holding any lock (slow I/O) ─ + newClusters := make(map[string]*ClientSet, len(clusters)) + newErrors := make(map[string]string) + newDefault := "" + var stoppedClients []*ClientSet // defer Stop calls until after unlock + + dbClusterMap := make(map[string]struct{}, len(clusters)) for _, cluster := range clusters { - dbClusterMap[cluster.Name] = cluster + dbClusterMap[cluster.Name] = struct{}{} if cluster.IsDefault { - cm.defaultContext = cluster.Name + newDefault = cluster.Name } - current, currentExist := cm.clusters[cluster.Name] + current := oldClusters[cluster.Name] if shouldUpdateCluster(current, cluster) { - if currentExist { - delete(cm.clusters, cluster.Name) - current.K8sClient.Stop(cluster.Name) + if current != nil { + stoppedClients = append(stoppedClients, current) } if cluster.Enable { clientSet, err := buildClientSet(cluster) if err != nil { klog.Errorf("Failed to build k8s client for cluster %s, in cluster: %t, err: %v", cluster.Name, cluster.InCluster, err) - cm.errors[cluster.Name] = err.Error() + newErrors[cluster.Name] = err.Error() continue } - delete(cm.errors, cluster.Name) - cm.clusters[cluster.Name] = clientSet - } else { - delete(cm.errors, cluster.Name) + newClusters[cluster.Name] = clientSet + } + // If !cluster.Enable we simply omit it from both maps + } else { + // No update needed — carry forward current state + if current != nil { + newClusters[cluster.Name] = current + } + if errMsg, ok := oldErrors[cluster.Name]; ok { + newErrors[cluster.Name] = errMsg } } } - for name, clientSet := range cm.clusters { + + // Clusters removed from the DB: stop their clients + for name, clientSet := range oldClusters { if _, ok := dbClusterMap[name]; !ok { - delete(cm.clusters, name) - clientSet.K8sClient.Stop(name) + stoppedClients = append(stoppedClients, clientSet) } } - for name := range cm.errors { - if _, ok := dbClusterMap[name]; !ok { - delete(cm.errors, name) - } + + // ── Phase 3: Swap under Lock (microseconds — only pointer assignments) ─ + cm.mu.Lock() + cm.clusters = newClusters + cm.errors = newErrors + cm.defaultContext = newDefault + cm.mu.Unlock() + + // ── Phase 4: Stop old clients outside any lock ────────────────── + for _, cs := range stoppedClients { + cs.K8sClient.Stop(cs.Name) } return nil