Skip to content
This repository has been archived by the owner on Mar 15, 2022. It is now read-only.

Commit

Permalink
Configuration flag to register last resource version position and sta…
Browse files Browse the repository at this point in the history
…rt from there next time to avoid duplicating events.

Signed-off-by: pepov <[email protected]>
  • Loading branch information
pepov committed Dec 20, 2019
1 parent eec9229 commit 21efc3d
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ _testmain.go
*.prof

eventrouter

.idea
31 changes: 24 additions & 7 deletions eventrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"github.com/heptiolabs/eventrouter/sinks"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cast"
"github.com/spf13/viper"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -90,10 +91,14 @@ type EventRouter struct {
// event sink
// TODO: Determine if we want to support multiple sinks.
eSink sinks.EventSinkInterface

lastSeenResourceVersion string
lastResourceVersionPosition func(string)
}

// NewEventRouter will create a new event router using the input params
func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter {
func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer,
lastSeenResourceVersion string, lastResourceVersionPosition func(rv string)) *EventRouter {
if viper.GetBool("enable-prometheus") {
prometheus.MustRegister(kubernetesWarningEventCounterVec)
prometheus.MustRegister(kubernetesNormalEventCounterVec)
Expand All @@ -102,8 +107,10 @@ func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformer
}

er := &EventRouter{
kubeClient: kubeClient,
eSink: sinks.ManufactureSink(),
kubeClient: kubeClient,
eSink: sinks.ManufactureSink(),
lastSeenResourceVersion: lastSeenResourceVersion,
lastResourceVersionPosition: lastResourceVersionPosition,
}
eventsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: er.addEvent,
Expand Down Expand Up @@ -133,16 +140,26 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) {
// addEvent is called when an event is created, or during the initial list
func (er *EventRouter) addEvent(obj interface{}) {
e := obj.(*v1.Event)
prometheusEvent(e)
er.eSink.UpdateEvents(e, nil)
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(e.ResourceVersion) {
prometheusEvent(e)
er.eSink.UpdateEvents(e, nil)
er.lastResourceVersionPosition(e.ResourceVersion)
} else {
glog.V(5).Infof("Event had already been processed:\n%v", e)
}
}

// updateEvent is called any time there is an update to an existing event
func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) {
eOld := objOld.(*v1.Event)
eNew := objNew.(*v1.Event)
prometheusEvent(eNew)
er.eSink.UpdateEvents(eNew, eOld)
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(eNew.ResourceVersion) {
prometheusEvent(eNew)
er.eSink.UpdateEvents(eNew, eOld)
er.lastResourceVersionPosition(eNew.ResourceVersion)
} else {
glog.V(5).Infof("Event had already been processed:\n%v", eNew)
}
}

// prometheusEvent is called when an event is added or updated
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ require (
github.com/prometheus/client_golang v1.1.0
github.com/rockset/rockset-go-client v0.6.0
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0
github.com/spf13/cast v1.3.0
github.com/spf13/viper v1.4.0
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
k8s.io/api v0.0.0-20190814101207-0772a1bdf941
k8s.io/apimachinery v0.0.0-20190814100815-533d101be9a6
k8s.io/client-go v12.0.0+incompatible
k8s.io/klog v0.4.0
k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a // indirect
)
36 changes: 34 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"io/ioutil"
"net/http"
"os"
"os/signal"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cast"
"github.com/spf13/viper"

"k8s.io/client-go/informers"
Expand All @@ -49,7 +51,7 @@ func sigHandler() <-chan struct{} {
syscall.SIGSEGV, // FullDerp
syscall.SIGABRT, // Abnormal termination
syscall.SIGILL, // illegal instruction
syscall.SIGFPE) // floating point - this is why we can't have nice things
syscall.SIGFPE) // floating point - this is why we can't have nice things
sig := <-c
glog.Warningf("Signal (%v) Detected, Shutting Down", sig)
close(stop)
Expand Down Expand Up @@ -107,11 +109,41 @@ func main() {
var wg sync.WaitGroup

clientset := loadConfig()

var lastResourceVersionPosition string
var mostRecentResourceVersion *string

resourceVersionPositionPath := viper.GetString("lastResourceVersionPositionPath")
resourceVersionPositionFunc := func(resourceVersion string) {
if resourceVersionPositionPath != "" {
if cast.ToInt(resourceVersion) > cast.ToInt(mostRecentResourceVersion) {
err := ioutil.WriteFile(resourceVersionPositionPath, []byte(resourceVersion), 0644)
if err != nil {
glog.Errorf("failed to write lastResourceVersionPosition")
} else {
mostRecentResourceVersion = &resourceVersion
}
}
}
}

if resourceVersionPositionPath != "" {
_, err := os.Stat(resourceVersionPositionPath)
if !os.IsNotExist(err) {
resourceVersionBytes, err := ioutil.ReadFile(resourceVersionPositionPath)
if err != nil {
glog.Errorf("failed to read resource version bookmark from %s", resourceVersionPositionPath)
} else {
lastResourceVersionPosition = string(resourceVersionBytes)
}
}
}

sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval"))
eventsInformer := sharedInformers.Core().V1().Events()

// TODO: Support locking for HA https://github.com/kubernetes/kubernetes/pull/42666
eventRouter := NewEventRouter(clientset, eventsInformer)
eventRouter := NewEventRouter(clientset, eventsInformer, lastResourceVersionPosition, resourceVersionPositionFunc)
stop := sigHandler()

// Startup the http listener for Prometheus Metrics endpoint.
Expand Down

0 comments on commit 21efc3d

Please sign in to comment.