Skip to content

Commit 9859787

Browse files
committed
feat(kubernetes): cluster state and kubeconfig watchers
Signed-off-by: Marc Nuri <[email protected]>
1 parent b706778 commit 9859787

File tree

15 files changed

+974
-321
lines changed

15 files changed

+974
-321
lines changed

pkg/kubernetes/kubernetes_derived_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ users:
5050
s.Run("without authorization header returns original clientset", func() {
5151
testManager, err := NewKubeconfigManager(testStaticConfig, "")
5252
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
53-
s.T().Cleanup(testManager.Close)
5453

5554
derived, err := testManager.Derived(s.T().Context())
5655
s.Require().NoErrorf(err, "failed to create derived kubernetes: %v", err)
@@ -61,7 +60,6 @@ users:
6160
s.Run("with invalid authorization header returns original clientset", func() {
6261
testManager, err := NewKubeconfigManager(testStaticConfig, "")
6362
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
64-
s.T().Cleanup(testManager.Close)
6563

6664
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "invalid-token")
6765
derived, err := testManager.Derived(ctx)
@@ -73,7 +71,6 @@ users:
7371
s.Run("with valid bearer token creates derived kubernetes with correct configuration", func() {
7472
testManager, err := NewKubeconfigManager(testStaticConfig, "")
7573
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
76-
s.T().Cleanup(testManager.Close)
7774

7875
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "Bearer aiTana-julIA")
7976
derived, err := testManager.Derived(ctx)
@@ -150,7 +147,6 @@ users:
150147
s.Run("with bearer token but RawConfig fails returns original clientset", func() {
151148
testManager, err := NewKubeconfigManager(testStaticConfig, "")
152149
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
153-
s.T().Cleanup(testManager.Close)
154150

155151
// Corrupt the clientCmdConfig by setting it to a config that will fail on RawConfig()
156152
// We'll do this by creating a config with an invalid file path
@@ -191,7 +187,6 @@ users:
191187
`)))
192188
testManager, err := NewKubeconfigManager(workingConfig, "")
193189
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
194-
s.T().Cleanup(testManager.Close)
195190

196191
// Now create a bad manager with RequireOAuth=true
197192
badManager, _ := NewManager(testStaticConfig, testManager.accessControlClientset.cfg, testManager.accessControlClientset.clientCmdConfig)
@@ -219,7 +214,6 @@ users:
219214
s.Run("with bearer token but invalid rest config returns original clientset", func() {
220215
testManager, err := NewKubeconfigManager(testStaticConfig, "")
221216
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
222-
s.T().Cleanup(testManager.Close)
223217

224218
// Corrupt the rest config to make NewAccessControlClientset fail
225219
// Setting an invalid Host URL should cause client creation to fail
@@ -241,7 +235,6 @@ users:
241235
s.Run("with bearer token but invalid rest config returns error", func() {
242236
testManager, err := NewKubeconfigManager(testStaticConfig, "")
243237
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
244-
s.T().Cleanup(testManager.Close)
245238

246239
// Corrupt the rest config to make NewAccessControlClientset fail
247240
testManager.accessControlClientset.cfg.Host = "://invalid-url"
@@ -263,7 +256,6 @@ users:
263256
s.Run("with no authorization header returns oauth token required error", func() {
264257
testManager, err := NewKubeconfigManager(testStaticConfig, "")
265258
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
266-
s.T().Cleanup(testManager.Close)
267259

268260
derived, err := testManager.Derived(s.T().Context())
269261
s.Require().Error(err, "expected error for missing oauth token, got nil")
@@ -274,7 +266,6 @@ users:
274266
s.Run("with invalid authorization header returns oauth token required error", func() {
275267
testManager, err := NewKubeconfigManager(testStaticConfig, "")
276268
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
277-
s.T().Cleanup(testManager.Close)
278269

279270
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "invalid-token")
280271
derived, err := testManager.Derived(ctx)
@@ -286,7 +277,6 @@ users:
286277
s.Run("with valid bearer token creates derived kubernetes", func() {
287278
testManager, err := NewKubeconfigManager(testStaticConfig, "")
288279
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
289-
s.T().Cleanup(testManager.Close)
290280

291281
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "Bearer aiTana-julIA")
292282
derived, err := testManager.Derived(ctx)

pkg/kubernetes/manager.go

Lines changed: 6 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"os"
8-
"sort"
98
"strconv"
109
"strings"
11-
"sync"
12-
"time"
1310

1411
"github.com/containers/kubernetes-mcp-server/pkg/config"
15-
"github.com/fsnotify/fsnotify"
1612
authenticationv1api "k8s.io/api/authentication/v1"
1713
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1814
"k8s.io/client-go/rest"
@@ -24,40 +20,11 @@ import (
2420
type Manager struct {
2521
accessControlClientset *AccessControlClientset
2622

27-
staticConfig *config.StaticConfig
28-
CloseWatchKubeConfig CloseWatchKubeConfig
29-
30-
clusterWatcher *clusterStateWatcher
31-
}
32-
33-
// clusterState represents the cached state of the cluster
34-
type clusterState struct {
35-
apiGroups []string
36-
isOpenShift bool
37-
}
38-
39-
// clusterStateWatcher monitors cluster state changes and triggers debounced reloads
40-
type clusterStateWatcher struct {
41-
manager *Manager
42-
pollInterval time.Duration
43-
debounceWindow time.Duration
44-
lastKnownState clusterState
45-
reloadCallback func() error
46-
debounceTimer *time.Timer
47-
mu sync.Mutex
48-
stopCh chan struct{}
49-
stoppedCh chan struct{}
23+
staticConfig *config.StaticConfig
5024
}
5125

5226
var _ Openshift = (*Manager)(nil)
5327

54-
const (
55-
// DefaultClusterStatePollInterval is the default interval for polling cluster state changes
56-
DefaultClusterStatePollInterval = 30 * time.Second
57-
// DefaultClusterStateDebounceWindow is the default debounce window for cluster state changes
58-
DefaultClusterStateDebounceWindow = 5 * time.Second
59-
)
60-
6128
var (
6229
ErrorKubeconfigInClusterNotAllowed = errors.New("kubeconfig manager cannot be used in in-cluster deployments")
6330
ErrorInClusterNotInCluster = errors.New("in-cluster manager cannot be used outside of a cluster")
@@ -148,48 +115,6 @@ func NewManager(config *config.StaticConfig, restConfig *rest.Config, clientCmdC
148115
return k8s, nil
149116
}
150117

151-
func (m *Manager) WatchKubeConfig(onKubeConfigChange func() error) {
152-
kubeConfigFiles := m.accessControlClientset.ToRawKubeConfigLoader().ConfigAccess().GetLoadingPrecedence()
153-
if len(kubeConfigFiles) == 0 {
154-
return
155-
}
156-
watcher, err := fsnotify.NewWatcher()
157-
if err != nil {
158-
return
159-
}
160-
for _, file := range kubeConfigFiles {
161-
_ = watcher.Add(file)
162-
}
163-
go func() {
164-
for {
165-
select {
166-
case _, ok := <-watcher.Events:
167-
if !ok {
168-
return
169-
}
170-
_ = onKubeConfigChange()
171-
case _, ok := <-watcher.Errors:
172-
if !ok {
173-
return
174-
}
175-
}
176-
}
177-
}()
178-
if m.CloseWatchKubeConfig != nil {
179-
_ = m.CloseWatchKubeConfig()
180-
}
181-
m.CloseWatchKubeConfig = watcher.Close
182-
}
183-
184-
func (m *Manager) Close() {
185-
if m.CloseWatchKubeConfig != nil {
186-
_ = m.CloseWatchKubeConfig()
187-
}
188-
if m.clusterWatcher != nil {
189-
m.clusterWatcher.stop()
190-
}
191-
}
192-
193118
func (m *Manager) VerifyToken(ctx context.Context, token, audience string) (*authenticationv1api.UserInfo, []string, error) {
194119
tokenReviewClient := m.accessControlClientset.AuthenticationV1().TokenReviews()
195120
tokenReview := &authenticationv1api.TokenReview{
@@ -266,6 +191,11 @@ func (m *Manager) Derived(ctx context.Context) (*Kubernetes, error) {
266191
return &Kubernetes{derived}, nil
267192
}
268193

194+
// Invalidate invalidates the cached discovery information.
195+
func (m *Manager) Invalidate() {
196+
m.accessControlClientset.DiscoveryClient().Invalidate()
197+
}
198+
269199
// applyRateLimitFromEnv applies QPS and Burst rate limits from environment variables if set.
270200
// This is primarily useful for tests to avoid client-side rate limiting.
271201
// Environment variables:
@@ -283,117 +213,3 @@ func applyRateLimitFromEnv(cfg *rest.Config) {
283213
}
284214
}
285215
}
286-
287-
// WatchClusterState starts a background watcher that periodically polls for cluster state changes
288-
// and triggers a debounced reload when changes are detected.
289-
func (m *Manager) WatchClusterState(pollInterval, debounceWindow time.Duration, onClusterStateChange func() error) {
290-
if m.clusterWatcher != nil {
291-
m.clusterWatcher.stop()
292-
}
293-
294-
watcher := &clusterStateWatcher{
295-
manager: m,
296-
pollInterval: pollInterval,
297-
debounceWindow: debounceWindow,
298-
reloadCallback: onClusterStateChange,
299-
stopCh: make(chan struct{}),
300-
stoppedCh: make(chan struct{}),
301-
}
302-
303-
captureState := func() clusterState {
304-
state := clusterState{apiGroups: []string{}}
305-
if groups, err := m.accessControlClientset.DiscoveryClient().ServerGroups(); err == nil {
306-
for _, group := range groups.Groups {
307-
state.apiGroups = append(state.apiGroups, group.Name)
308-
}
309-
sort.Strings(state.apiGroups)
310-
}
311-
state.isOpenShift = m.IsOpenShift(context.Background())
312-
return state
313-
}
314-
watcher.lastKnownState = captureState()
315-
316-
m.clusterWatcher = watcher
317-
318-
// Start background monitoring
319-
go func() {
320-
defer close(watcher.stoppedCh)
321-
ticker := time.NewTicker(pollInterval)
322-
defer ticker.Stop()
323-
324-
klog.V(2).Infof("Started cluster state watcher (poll interval: %v, debounce: %v)", pollInterval, debounceWindow)
325-
326-
for {
327-
select {
328-
case <-watcher.stopCh:
329-
klog.V(2).Info("Stopping cluster state watcher")
330-
return
331-
case <-ticker.C:
332-
// Invalidate discovery cache to get fresh API groups
333-
m.accessControlClientset.DiscoveryClient().Invalidate()
334-
335-
watcher.mu.Lock()
336-
current := captureState()
337-
klog.V(3).Infof("Polled cluster state: %d API groups, OpenShift=%v", len(current.apiGroups), current.isOpenShift)
338-
339-
changed := current.isOpenShift != watcher.lastKnownState.isOpenShift ||
340-
len(current.apiGroups) != len(watcher.lastKnownState.apiGroups)
341-
342-
if !changed {
343-
for i := range current.apiGroups {
344-
if current.apiGroups[i] != watcher.lastKnownState.apiGroups[i] {
345-
changed = true
346-
break
347-
}
348-
}
349-
}
350-
351-
if changed {
352-
klog.V(2).Info("Cluster state changed, scheduling debounced reload")
353-
if watcher.debounceTimer != nil {
354-
watcher.debounceTimer.Stop()
355-
}
356-
watcher.debounceTimer = time.AfterFunc(debounceWindow, func() {
357-
klog.V(2).Info("Debounce window expired, triggering reload")
358-
if err := onClusterStateChange(); err != nil {
359-
klog.Errorf("Failed to reload: %v", err)
360-
} else {
361-
watcher.mu.Lock()
362-
watcher.lastKnownState = captureState()
363-
watcher.mu.Unlock()
364-
klog.V(2).Info("Reload completed")
365-
}
366-
})
367-
}
368-
watcher.mu.Unlock()
369-
}
370-
}
371-
}()
372-
}
373-
374-
// stop stops the cluster state watcher
375-
func (w *clusterStateWatcher) stop() {
376-
if w == nil {
377-
return
378-
}
379-
380-
w.mu.Lock()
381-
defer w.mu.Unlock()
382-
383-
if w.debounceTimer != nil {
384-
w.debounceTimer.Stop()
385-
}
386-
387-
if w.stopCh == nil || w.stoppedCh == nil {
388-
return
389-
}
390-
391-
select {
392-
case <-w.stopCh:
393-
// Already closed or stopped
394-
return
395-
default:
396-
close(w.stopCh)
397-
<-w.stoppedCh
398-
}
399-
}

pkg/kubernetes/manager_test.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -228,49 +228,6 @@ func (s *ManagerTestSuite) TestNewManager() {
228228
})
229229
}
230230

231-
func (s *ManagerTestSuite) TestClusterStateWatcherStop() {
232-
s.Run("stop() on nil watcher", func() {
233-
var watcher *clusterStateWatcher
234-
// Should not panic
235-
watcher.stop()
236-
})
237-
238-
s.Run("stop() on uninitialized watcher (nil channels)", func() {
239-
watcher := &clusterStateWatcher{}
240-
// Should not panic even with nil channels
241-
watcher.stop()
242-
})
243-
244-
s.Run("stop() on initialized watcher", func() {
245-
watcher := &clusterStateWatcher{
246-
stopCh: make(chan struct{}),
247-
stoppedCh: make(chan struct{}),
248-
}
249-
// Close the stoppedCh to simulate a running goroutine
250-
go func() {
251-
<-watcher.stopCh
252-
close(watcher.stoppedCh)
253-
}()
254-
// Should not panic and should stop cleanly
255-
watcher.stop()
256-
})
257-
258-
s.Run("stop() called multiple times", func() {
259-
watcher := &clusterStateWatcher{
260-
stopCh: make(chan struct{}),
261-
stoppedCh: make(chan struct{}),
262-
}
263-
go func() {
264-
<-watcher.stopCh
265-
close(watcher.stoppedCh)
266-
}()
267-
// First stop
268-
watcher.stop()
269-
// Second stop should not panic
270-
watcher.stop()
271-
})
272-
}
273-
274231
func TestManager(t *testing.T) {
275232
suite.Run(t, new(ManagerTestSuite))
276233
}

pkg/kubernetes/openshift.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package kubernetes
33
import (
44
"context"
55

6-
"k8s.io/apimachinery/pkg/runtime/schema"
6+
"github.com/containers/kubernetes-mcp-server/pkg/openshift"
77
)
88

99
type Openshift interface {
@@ -16,9 +16,5 @@ func (m *Manager) IsOpenShift(ctx context.Context) bool {
1616
if err != nil {
1717
return false
1818
}
19-
_, err = k.AccessControlClientset().DiscoveryClient().ServerResourcesForGroupVersion(schema.GroupVersion{
20-
Group: "project.openshift.io",
21-
Version: "v1",
22-
}.String())
23-
return err == nil
19+
return openshift.IsOpenshift(k.AccessControlClientset().DiscoveryClient())
2420
}

0 commit comments

Comments
 (0)