Skip to content

Commit

Permalink
Merge pull request #67 from kubescape/bugfix/watcher-restart
Browse files Browse the repository at this point in the history
Adding restart backoff and return in case of an error
  • Loading branch information
amitschendel authored Feb 20, 2024
2 parents d4cffaa + 2a82b73 commit 9b8ba37
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 4 deletions.
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func main() {
IgnoreMounts: ignoreMounts,
IgnorePrefixes: ignorePrefixes,
StoreNamespace: storeNamespace,
OnError: func(err error) {
log.Fatalf("Error in collector manager watcher: %v", err)
},
}
cm, err := collector.StartCollectorManager(collectorManagerConfig)
if err != nil {
Expand All @@ -137,7 +140,9 @@ func main() {
defer tracer.Stop()

// Start AppProfile controller
appProfileController := controller.NewController(k8sConfig, storeNamespace)
appProfileController := controller.NewController(k8sConfig, storeNamespace, func(err error) {
log.Fatalf("AppProfile controller failed: %v\n", err)
})
appProfileController.StartController()
defer appProfileController.StopController()

Expand Down
2 changes: 2 additions & 0 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type CollectorManagerConfig struct {
IgnorePrefixes []string
// Should store profiles in the same namespace
StoreNamespace string
// Handle errors from the watcher
OnError func(err error)
}

type TotalEvents struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/collector/pod_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func (cm *CollectorManager) StartFinalizerWatcher() {
DeleteFunc: func(obj *unstructured.Unstructured) {
cm.handlePodDeleteEvent(obj)
},
OnError: func(err error) {
if cm.config.OnError != nil {
cm.config.OnError(err)
} else {
log.Printf("Error in pod finalizer watcher: %v", err)
}
},
}, schema.GroupVersionResource{
Group: "",
Version: "v1",
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ type Controller struct {
appProfileGvr schema.GroupVersionResource
watcher watcher.WatcherInterface
storeNamespace string
onError func(err error)
}

// Create a new controller based on given config
func NewController(config *rest.Config, storeNamespace string) *Controller {
func NewController(config *rest.Config, storeNamespace string, onError func(error)) *Controller {

// Initialize clients and channels
staticClient, _ := kubernetes.NewForConfig(config)
Expand All @@ -47,6 +48,7 @@ func NewController(config *rest.Config, storeNamespace string) *Controller {
dynamicClient: dynamicClient,
appProfileGvr: collector.AppProfileGvr,
storeNamespace: storeNamespace,
onError: onError,
}
}

Expand All @@ -70,6 +72,13 @@ func (c *Controller) StartController() {
DeleteFunc: func(obj *unstructured.Unstructured) {
c.handleApplicationProfile(obj)
},
OnError: func(err error) {
if c.onError != nil {
c.onError(err)
} else {
log.Printf("Error in AppProfile watcher: %v", err)
}
},
}, collector.AppProfileGvr, metav1.ListOptions{})

if err != nil {
Expand Down
22 changes: 20 additions & 2 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"strconv"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -21,6 +22,7 @@ type WatchNotifyFunctions struct {
AddFunc func(obj *unstructured.Unstructured)
UpdateFunc func(obj *unstructured.Unstructured)
DeleteFunc func(obj *unstructured.Unstructured)
OnError func(err error)
}

type WatcherInterface interface {
Expand Down Expand Up @@ -104,9 +106,25 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
w.watcher.Stop()
w.watcher = nil
listOptions.ResourceVersion = w.lastResourceVersion
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(context.TODO(), listOptions)
// Retry 5 times with exponential backoff.
for i := 0; i < 5; i++ {
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(context.TODO(), listOptions)
if err == nil {
break
}
log.Printf("watcher restart error: %v, on object: %+v, retrying...", err, gvr)
time.Sleep(time.Second * time.Duration(i*2)) // Exponential backoff
}

if err != nil {
log.Printf("watcher restart error: %v, on object: %+v", err, gvr)
// If the watcher restart fails after 5 attempts, log the error and call the OnError function.
if notifyF.OnError != nil {
notifyF.OnError(err)
} else {
log.Printf("Final watcher restart error: %v, on object: %+v", err, gvr)
log.Println("Closing watcher")
}
return
}
continue
} else {
Expand Down

0 comments on commit 9b8ba37

Please sign in to comment.