Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

Add a timeout to sending events to sinks #200

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package main
import (
"context"
"flag"
"github.com/opsgenie/kubernetes-event-exporter/pkg/exporter"
"github.com/opsgenie/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v2"
"io/ioutil"
"os"
"os/signal"
"syscall"
"time"

"github.com/opsgenie/kubernetes-event-exporter/pkg/exporter"
"github.com/opsgenie/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v2"
)

// How long to wait for an event to be received by a sink channel
const defaultSendTimeoutMillis = 10000

var (
conf = flag.String("conf", "config.yaml", "The config path file")
)
Expand Down Expand Up @@ -61,12 +65,16 @@ func main() {
cfg.ThrottlePeriod = 5
}

if cfg.SendTimeoutMillis == 0 {
cfg.SendTimeoutMillis = defaultSendTimeoutMillis
}

kubeconfig, err := kube.GetKubernetesConfig()
if err != nil {
log.Fatal().Err(err).Msg("cannot get kubeconfig")
}

engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{})
engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{SendTimeoutMillis: cfg.SendTimeoutMillis})
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.ThrottlePeriod, engine.OnEvent)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
14 changes: 10 additions & 4 deletions pkg/exporter/channel_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package exporter
import (
"context"
"sync"
"time"

"github.com/opsgenie/kubernetes-event-exporter/pkg/kube"
"github.com/opsgenie/kubernetes-event-exporter/pkg/sinks"
Expand All @@ -15,9 +16,10 @@ import (
// and we might need a mechanism to drop the vents
// On closing, the registry sends a signal on all exit channels, and then waits for all to complete.
type ChannelBasedReceiverRegistry struct {
ch map[string]chan kube.EnhancedEvent
exitCh map[string]chan interface{}
wg *sync.WaitGroup
ch map[string]chan kube.EnhancedEvent
exitCh map[string]chan interface{}
wg *sync.WaitGroup
SendTimeoutMillis int64
}

func (r *ChannelBasedReceiverRegistry) SendEvent(name string, event *kube.EnhancedEvent) {
Expand All @@ -27,7 +29,11 @@ func (r *ChannelBasedReceiverRegistry) SendEvent(name string, event *kube.Enhanc
}

go func() {
ch <- *event
select {
case ch <- *event:
case <-time.After(time.Duration(r.SendTimeoutMillis) * time.Millisecond):
// drop event
}
}()
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ type Config struct {
// Route is the top route that the events will match
// TODO: There is currently a tight coupling with route and config, but not with receiver config and sink so
// TODO: I am not sure what to do here.
LogLevel string `yaml:"logLevel"`
LogFormat string `yaml:"logFormat"`
ThrottlePeriod int64 `yaml:"throttlePeriod"`
Namespace string `yaml:"namespace"`
LeaderElection kube.LeaderElectionConfig `yaml:"leaderElection"`
Route Route `yaml:"route"`
Receivers []sinks.ReceiverConfig `yaml:"receivers"`
LogLevel string `yaml:"logLevel"`
LogFormat string `yaml:"logFormat"`
ThrottlePeriod int64 `yaml:"throttlePeriod"`
Namespace string `yaml:"namespace"`
LeaderElection kube.LeaderElectionConfig `yaml:"leaderElection"`
Route Route `yaml:"route"`
Receivers []sinks.ReceiverConfig `yaml:"receivers"`
SendTimeoutMillis int64 `yaml:"sendTimeoutMillis"`
}

func (c *Config) Validate() error {
Expand Down