Skip to content

fix: detect and report cluster connection errors #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 170 additions & 14 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
@@ -57,6 +57,9 @@ const (
// Limit is required to avoid memory spikes during cache initialization.
// The default limit of 50 is chosen based on experiments.
defaultListSemaphoreWeight = 50

// The default interval for monitoring the cluster connection status.
defaultClusterConnectionInterval = 10 * time.Second
)

const (
@@ -89,6 +92,8 @@ type ClusterInfo struct {
SyncError error
// APIResources holds list of API resources supported by the cluster
APIResources []kube.APIResourceInfo
// ConnectionStatus indicates the status of the connection with the cluster.
ConnectionStatus ConnectionStatus
}

// OnEventHandler is a function that handles Kubernetes event
@@ -132,6 +137,8 @@ type ClusterCache interface {
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
// OnEvent register event handler that is executed every time when new K8S event received
OnEvent(handler OnEventHandler) Unsubscribe
// StartClusterConnectionStatusMonitoring starts a goroutine that checks the watch errors periodically and updates the cluster connection status.
StartClusterConnectionStatusMonitoring(ctx context.Context)
}

type WeightedSemaphore interface {
@@ -140,7 +147,7 @@ type WeightedSemaphore interface {
Release(n int64)
}

type ListRetryFunc func(err error) bool
type RetryFunc func(err error) bool

// NewClusterCache creates new instance of cluster cache
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
@@ -162,24 +169,48 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
resyncTimeout: defaultClusterResyncTimeout,
syncTime: nil,
},
watchResyncTimeout: defaultWatchResyncTimeout,
clusterSyncRetryTimeout: ClusterRetryTimeout,
resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
eventHandlers: map[uint64]OnEventHandler{},
log: log,
listRetryLimit: 1,
listRetryUseBackoff: false,
listRetryFunc: ListRetryFuncNever,
watchResyncTimeout: defaultWatchResyncTimeout,
clusterSyncRetryTimeout: ClusterRetryTimeout,
resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
eventHandlers: map[uint64]OnEventHandler{},
log: log,
listRetryLimit: 1,
listRetryUseBackoff: false,
listRetryFunc: RetryFuncNever,
connectionStatus: ConnectionStatusUnknown,
watchFails: newWatchFailures(),
clusterStatusRetryFunc: RetryFuncNever,
clusterConnectionInterval: defaultClusterConnectionInterval,
}
for i := range opts {
opts[i](cache)
}
return cache
}

// ConnectionStatus indicates the status of the connection with the cluster.
type ConnectionStatus string

const (
ConnectionStatusSuccessful ConnectionStatus = "Successful"
ConnectionStatusFailed ConnectionStatus = "Failed"
ConnectionStatusUnknown ConnectionStatus = "Unknown"
)

type clusterCache struct {
syncStatus clusterCacheSync

// connectionStatus indicates the status of the connection with the cluster.
connectionStatus ConnectionStatus

// clusterConnectionInterval is the interval used to monitor the cluster connection status.
clusterConnectionInterval time.Duration

// watchFails is used to keep track of the failures while watching resources.
watchFails *watchFailures

clusterStatusRetryFunc RetryFunc

apisMeta map[schema.GroupKind]*apiMeta
serverVersion string
apiResources []kube.APIResourceInfo
@@ -200,7 +231,7 @@ type clusterCache struct {
// retry options for list operations
listRetryLimit int32
listRetryUseBackoff bool
listRetryFunc ListRetryFunc
listRetryFunc RetryFunc

// lock is a rw lock which protects the fields of clusterInfo
lock sync.RWMutex
@@ -236,13 +267,13 @@ type clusterCacheSync struct {
resyncTimeout time.Duration
}

// ListRetryFuncNever never retries on errors
func ListRetryFuncNever(err error) bool {
// RetryFuncNever never retries on errors
func RetryFuncNever(err error) bool {
return false
}

// ListRetryFuncAlways always retries on errors
func ListRetryFuncAlways(err error) bool {
// RetryFuncAlways always retries on errors
func RetryFuncAlways(err error) bool {
return true
}

@@ -595,6 +626,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
watchKey := api.GroupKind.String()
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
if r := recover(); r != nil {
@@ -615,7 +647,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
res, err := resClient.Watch(ctx, options)
if errors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
c.watchFails.remove(watchKey)
return res, err
}

if err != nil {
c.watchFails.add(watchKey)
} else {
c.watchFails.remove(watchKey)
}

return res, err
},
})
@@ -810,8 +851,14 @@ func (c *clusterCache) sync() error {
version, err := c.kubectl.GetServerVersion(config)

if err != nil {
if c.connectionStatus != ConnectionStatusFailed {
c.log.Info("unable to access cluster", "cluster", c.config.Host, "reason", err.Error())
c.connectionStatus = ConnectionStatusFailed
}
return err
}

c.connectionStatus = ConnectionStatusSuccessful
c.serverVersion = version
apiResources, err := c.kubectl.GetAPIResources(config, false, NewNoopSettings())
if err != nil {
@@ -1186,6 +1233,7 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo {
LastCacheSyncTime: c.syncStatus.syncTime,
SyncError: c.syncStatus.syncError,
APIResources: c.apiResources,
ConnectionStatus: c.connectionStatus,
}
}

@@ -1194,3 +1242,111 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo {
func skipAppRequeuing(key kube.ResourceKey) bool {
return ignoredRefreshResources[key.Group+"/"+key.Kind]
}

// StartClusterConnectionStatusMonitoring starts a goroutine that checks for watch failures.
// If there are any watch errors, it will periodically ping the remote cluster
// and update the cluster connection status.
func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Context) {
go c.clusterConnectionService(ctx)
}

func (c *clusterCache) clusterConnectionService(ctx context.Context) {
if c.clusterConnectionInterval <= 0 {
return
}

ticker := time.NewTicker(c.clusterConnectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
watchErrors := c.watchFails.len()
// Ping the cluster for connection verification if there are watch failures or
// if the cluster has recovered back from watch failures.
watchesRecovered := false
if watchErrors == 0 {
// If there are no watch failures check if the status needs to be updated.
c.lock.RLock()
if c.connectionStatus == ConnectionStatusFailed {
watchesRecovered = true
}
c.lock.RUnlock()
}

if watchErrors > 0 || watchesRecovered {
c.log.V(1).Info("verifying cluster connection", "server", c.config.Host)
// Retry fetching the server version to avoid invalidating the cache due to transient errors.
err := retry.OnError(retry.DefaultBackoff, c.clusterStatusRetryFunc, func() error {
_, err := c.kubectl.GetServerVersion(c.config)
if err != nil && c.clusterStatusRetryFunc(err) {
c.log.V(1).Info("Error while fetching server version", "error", err.Error())
}
return err
})
if err != nil {
c.updateConnectionStatus(ConnectionStatusFailed)
} else {
c.updateConnectionStatus(ConnectionStatusSuccessful)
}
}
case <-ctx.Done():
c.log.V(1).Info("Stopping cluster connection status monitoring", "server", c.config.Host)
ticker.Stop()
return
}
}

}

func (c *clusterCache) updateConnectionStatus(status ConnectionStatus) {
invalidateCache := false
c.lock.Lock()
if c.connectionStatus != status {
c.connectionStatus = status
invalidateCache = true
}
c.lock.Unlock()

if !invalidateCache {
return
}

c.log.V(1).Info("updated cluster connection status", "server", c.config.Host, "status", status)

c.Invalidate()
if err := c.EnsureSynced(); err != nil {
c.log.Error(err, "failed to sync cache state after updating cluster connection status", "server", c.config.Host)
}
}

// watchFailures is used to keep track of the failures while watching resources. It is updated
// whenever an error occurs during watch or when the watch recovers back from a failure.
type watchFailures struct {
watches map[string]bool
mu sync.RWMutex
}

func newWatchFailures() *watchFailures {
return &watchFailures{
watches: make(map[string]bool),
}
}

func (w *watchFailures) add(key string) {
w.mu.Lock()
defer w.mu.Unlock()
w.watches[key] = true
}

func (w *watchFailures) remove(key string) {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.watches, key)
}

func (w *watchFailures) len() int {
w.mu.RLock()
defer w.mu.RUnlock()
return len(w.watches)
}
49 changes: 26 additions & 23 deletions pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
@@ -3,12 +3,13 @@ package cache
import (
"context"
"fmt"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"sort"
"strings"
"testing"
"time"

"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
@@ -148,6 +149,7 @@ func TestEnsureSynced(t *testing.T) {
}

cluster := newCluster(t, obj1, obj2)
assert.Equal(t, cluster.connectionStatus, ConnectionStatusUnknown)
err := cluster.EnsureSynced()
require.NoError(t, err)

@@ -160,6 +162,7 @@ func TestEnsureSynced(t *testing.T) {
names = append(names, k.Name)
}
assert.ElementsMatch(t, []string{"helm-guestbook1", "helm-guestbook2"}, names)
assert.Equal(t, cluster.connectionStatus, ConnectionStatusSuccessful)
}

func TestStatefulSetOwnershipInferred(t *testing.T) {
@@ -492,23 +495,23 @@ metadata:
func TestGetManagedLiveObjsFailedConversion(t *testing.T) {
cronTabGroup := "stable.example.com"

testCases := []struct{
name string
localConvertFails bool
testCases := []struct {
name string
localConvertFails bool
expectConvertToVersionCalled bool
expectGetResourceCalled bool
expectGetResourceCalled bool
}{
{
name: "local convert fails, so GetResource is called",
localConvertFails: true,
name: "local convert fails, so GetResource is called",
localConvertFails: true,
expectConvertToVersionCalled: true,
expectGetResourceCalled: true,
expectGetResourceCalled: true,
},
{
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
expectConvertToVersionCalled: true,
expectGetResourceCalled: false,
expectGetResourceCalled: false,
},
}

@@ -557,7 +560,6 @@ metadata:
return testCronTab(), nil
})


managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool {
return true
})
@@ -716,9 +718,10 @@ func TestGetClusterInfo(t *testing.T) {
cluster.serverVersion = "v1.16"
info := cluster.GetClusterInfo()
assert.Equal(t, ClusterInfo{
Server: cluster.config.Host,
APIResources: cluster.apiResources,
K8SVersion: cluster.serverVersion,
Server: cluster.config.Host,
APIResources: cluster.apiResources,
K8SVersion: cluster.serverVersion,
ConnectionStatus: ConnectionStatusUnknown,
}, info)
}

@@ -816,25 +819,25 @@ func testPod() *corev1.Pod {

func testCRD() *apiextensions.CustomResourceDefinition {
return &apiextensions.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "crontabs.stable.example.com",
},
Spec: apiextensions.CustomResourceDefinitionSpec{
Spec: apiextensions.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Versions: []apiextensions.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensions.CustomResourceValidation{
OpenAPIV3Schema: &apiextensions.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensions.JSONSchemaProps{
"cronSpec": {Type: "string"},
"image": {Type: "string"},
"image": {Type: "string"},
"replicas": {Type: "integer"},
},
},
@@ -855,14 +858,14 @@ func testCRD() *apiextensions.CustomResourceDefinition {
func testCronTab() *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"kind": "CronTab",
"metadata": map[string]interface{}{
"name": "test-crontab",
"name": "test-crontab",
"namespace": "default",
},
"spec": map[string]interface{}{
"cronSpec": "* * * * */5",
"image": "my-awesome-cron-image",
"image": "my-awesome-cron-image",
},
}}
}
78 changes: 75 additions & 3 deletions pkg/cache/mocks/ClusterCache.go
16 changes: 15 additions & 1 deletion pkg/cache/settings.go
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ func SetTracer(tracer tracing.Tracer) UpdateSettingsFunc {
}

// SetRetryOptions sets cluster list retry options
func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc) UpdateSettingsFunc {
func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc RetryFunc) UpdateSettingsFunc {
return func(cache *clusterCache) {
// Max retries must be at least one
if maxRetries < 1 {
@@ -170,3 +170,17 @@ func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc {
}
}
}

// SetClusterConnectionInterval sets the interval for monitoring the cluster connection status.
func SetClusterConnectionInterval(interval time.Duration) UpdateSettingsFunc {
return func(cache *clusterCache) {
cache.clusterConnectionInterval = interval
}
}

// SetClusterStatusRetryFunc sets the retry function for monitoring the cluster connection status.
func SetClusterStatusRetryFunc(retryFunc RetryFunc) UpdateSettingsFunc {
return func(cache *clusterCache) {
cache.clusterStatusRetryFunc = retryFunc
}
}