Skip to content

Commit

Permalink
refactor: notifier metric alive count (#1130)
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksandrMatsko authored Dec 17, 2024
1 parent 83c63a0 commit b1aaf83
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 52 deletions.
4 changes: 4 additions & 0 deletions cmd/notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type notifierConfig struct {
MaxFailAttemptToSendAvailable int `yaml:"max_fail_attempt_to_send_available"`
// Specify log level by entities
SetLogLevel setLogLevelConfig `yaml:"set_log_level"`
// CheckNotifierStateTimeout is the timeout between marking *.alive.count metric based on notifier state.
CheckNotifierStateTimeout string `yaml:"check_notifier_state_timeout"`
}

type selfStateConfig struct {
Expand Down Expand Up @@ -116,6 +118,7 @@ func getDefault() config {
Timezone: "UTC",
ReadBatchSize: int(notifier.NotificationsLimitUnlimited),
MaxFailAttemptToSendAvailable: 3,
CheckNotifierStateTimeout: "10s",
},
Telemetry: cmd.TelemetryConfig{
Listen: ":8093",
Expand Down Expand Up @@ -202,6 +205,7 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config {
MaxFailAttemptToSendAvailable: config.MaxFailAttemptToSendAvailable,
LogContactsToLevel: contacts,
LogSubscriptionsToLevel: subscriptions,
CheckNotifierStateTimeout: to.Duration(config.CheckNotifierStateTimeout),
}
}

Expand Down
15 changes: 14 additions & 1 deletion cmd/notifier/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -99,6 +100,7 @@ func main() {
}

notifierMetrics := metrics.ConfigureNotifierMetrics(telemetry.Metrics, serviceName)

sender := notifier.NewNotifier(
database,
logger,
Expand All @@ -119,7 +121,7 @@ func main() {

// Start moira self state checker
if config.Notifier.SelfState.getSettings().Enabled {
selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings(), metrics.ConfigureHeartBeatMetrics(telemetry.Metrics))
selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings())
if err := selfState.Start(); err != nil {
logger.Fatal().
Error(err).
Expand Down Expand Up @@ -156,6 +158,17 @@ func main() {
fetchEventsWorker.Start()
defer stopFetchEvents(fetchEventsWorker)

aliveWatcher := notifier.NewAliveWatcher(
logger,
database,
notifierConfig.CheckNotifierStateTimeout,
notifierMetrics,
)
ctx, cancel := context.WithCancel(context.Background())

aliveWatcher.Start(ctx)
defer cancel()

logger.Info().
String("moira_version", MoiraVersion).
Msg("Moira Notifier Started")
Expand Down
22 changes: 0 additions & 22 deletions metrics/heartbeat.go

This file was deleted.

16 changes: 15 additions & 1 deletion metrics/notifier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import "time"
import (
"time"
)

// NotifierMetrics is a collection of metrics used in notifier.
type NotifierMetrics struct {
Expand All @@ -16,6 +18,7 @@ type NotifierMetrics struct {
PlotsBuildDurationMs Histogram
PlotsEvaluateTriggerDurationMs Histogram
fetchNotificationsDurationMs Histogram
notifierIsAlive Meter
}

// ConfigureNotifierMetrics is notifier metrics configurator.
Expand All @@ -33,6 +36,7 @@ func ConfigureNotifierMetrics(registry Registry, prefix string) *NotifierMetrics
PlotsBuildDurationMs: registry.NewHistogram("plots", "build", "duration", "ms"),
PlotsEvaluateTriggerDurationMs: registry.NewHistogram("plots", "evaluate", "trigger", "duration", "ms"),
fetchNotificationsDurationMs: registry.NewHistogram("fetch", "notifications", "duration", "ms"),
notifierIsAlive: registry.NewMeter("", "alive"),
}
}

Expand Down Expand Up @@ -66,3 +70,13 @@ func (metrics *NotifierMetrics) MarkSendersFailedMetrics(contactType string) {
func (metrics *NotifierMetrics) MarkSendingFailed() {
metrics.SendingFailed.Mark(1)
}

// MarkNotifierIsAlive marks metric value.
func (metrics *NotifierMetrics) MarkNotifierIsAlive(isAlive bool) {
if isAlive {
metrics.notifierIsAlive.Mark(1)
return
}

metrics.notifierIsAlive.Mark(0)
}
66 changes: 66 additions & 0 deletions notifier/alive_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package notifier

import (
"context"
"time"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics"
)

// AliveWatcher is responsible for checking notifier state and marking notifier.alive metrics.
type AliveWatcher struct {
logger moira.Logger
database moira.Database
checkNotifierStateTimeout time.Duration
notifierMetrics *metrics.NotifierMetrics
}

// NewAliveWatcher is an initializer for AliveWatcher.
func NewAliveWatcher(
logger moira.Logger,
database moira.Database,
checkNotifierStateTimeout time.Duration,
notifierMetrics *metrics.NotifierMetrics,
) *AliveWatcher {
return &AliveWatcher{
logger: logger,
database: database,
checkNotifierStateTimeout: checkNotifierStateTimeout,
notifierMetrics: notifierMetrics,
}
}

// Start starts the checking loop in separate goroutine.
// Use context.WithCancel, context.WithTimeout etc. to terminate check loop.
func (watcher *AliveWatcher) Start(ctx context.Context) {
go watcher.stateChecker(ctx)
}

func (watcher *AliveWatcher) stateChecker(ctx context.Context) {
watcher.logger.Info().
Interface("check_timeout_seconds", watcher.checkNotifierStateTimeout.Seconds()).
Msg("Moira Notifier alive watcher started")

ticker := time.NewTicker(watcher.checkNotifierStateTimeout)

for {
select {
case <-ctx.Done():
watcher.logger.Info().Msg("Moira Notifier alive watcher stopped")
return
case <-ticker.C:
watcher.checkNotifierState()
}
}
}

func (watcher *AliveWatcher) checkNotifierState() {
state, _ := watcher.database.GetNotifierState()
if state != moira.SelfStateOK {
watcher.notifierMetrics.MarkNotifierIsAlive(false)
return
}

watcher.notifierMetrics.MarkNotifierIsAlive(true)
}
112 changes: 112 additions & 0 deletions notifier/alive_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package notifier

import (
"context"
"errors"
"testing"
"time"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics"
. "github.com/smartystreets/goconvey/convey"
"go.uber.org/mock/gomock"

mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert"
mock_metrics "github.com/moira-alert/moira/mock/moira-alert/metrics"
)

func initAliveMeter(mockCtrl *gomock.Controller) (*mock_metrics.MockRegistry, *mock_metrics.MockMeter) {
mockRegistry := mock_metrics.NewMockRegistry(mockCtrl)
mockAliveMeter := mock_metrics.NewMockMeter(mockCtrl)

mockRegistry.EXPECT().NewMeter(gomock.Any()).Times(5)
mockRegistry.EXPECT().NewHistogram(gomock.Any()).Times(3)
mockRegistry.EXPECT().NewMeter("", "alive").Return(mockAliveMeter)

return mockRegistry, mockAliveMeter
}

func TestAliveWatcher_checkNotifierState(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

dataBase := mock_moira_alert.NewMockDatabase(mockCtrl)

mockRegistry, mockAliveMeter := initAliveMeter(mockCtrl)
testNotifierMetrics := metrics.ConfigureNotifierMetrics(mockRegistry, "")

aliveWatcher := NewAliveWatcher(nil, dataBase, 0, testNotifierMetrics)

Convey("checkNotifierState", t, func() {
Convey("when OK", func() {
dataBase.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil)
mockAliveMeter.EXPECT().Mark(int64(1))

aliveWatcher.checkNotifierState()
})

Convey("when not OK state and no errors", func() {
notOKStates := []string{moira.SelfStateERROR, "err", "bad", "", "1"}

for _, badState := range notOKStates {
dataBase.EXPECT().GetNotifierState().Return(badState, nil)
mockAliveMeter.EXPECT().Mark(int64(0))

aliveWatcher.checkNotifierState()
}
})

Convey("when not OK state and errors", func() {
notOKState := ""
givenErrors := []error{
errors.New("one error"),
errors.New("another error"),
}

for _, err := range givenErrors {
dataBase.EXPECT().GetNotifierState().Return(notOKState, err)
mockAliveMeter.EXPECT().Mark(int64(0))

aliveWatcher.checkNotifierState()
}
})
})
}

func TestAliveWatcher_Start(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

logger := mock_moira_alert.NewMockLogger(mockCtrl)
eventsBuilder := mock_moira_alert.NewMockEventBuilder(mockCtrl)
logger.EXPECT().Info().Return(eventsBuilder).AnyTimes()

dataBase := mock_moira_alert.NewMockDatabase(mockCtrl)

const (
testCheckNotifierStateTimeout = time.Second
)

mockRegistry, mockAliveMeter := initAliveMeter(mockCtrl)
testNotifierMetrics := metrics.ConfigureNotifierMetrics(mockRegistry, "")

aliveWatcher := NewAliveWatcher(logger, dataBase, testCheckNotifierStateTimeout, testNotifierMetrics)

Convey("AliveWatcher stops on cancel", t, func() {
eventsBuilder.EXPECT().
Interface("check_timeout_seconds", testCheckNotifierStateTimeout.Seconds()).
Return(eventsBuilder)
eventsBuilder.EXPECT().Msg("Moira Notifier alive watcher started")
eventsBuilder.EXPECT().Msg("Moira Notifier alive watcher stopped")

dataBase.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil).AnyTimes()
mockAliveMeter.EXPECT().Mark(int64(1)).AnyTimes()

ctx, cancel := context.WithCancel(context.Background())
aliveWatcher.Start(ctx)

time.Sleep(time.Second * 3)
cancel()
time.Sleep(time.Millisecond)
})
}
1 change: 1 addition & 0 deletions notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ type Config struct {
MaxFailAttemptToSendAvailable int
LogContactsToLevel map[string]string
LogSubscriptionsToLevel map[string]string
CheckNotifierStateTimeout time.Duration
}
17 changes: 5 additions & 12 deletions notifier/selfstate/heartbeat/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,30 @@ package heartbeat
import (
"fmt"

"github.com/moira-alert/moira/metrics"

"github.com/moira-alert/moira"
)

type notifier struct {
db moira.Database
log moira.Logger
metrics *metrics.HeartBeatMetrics
db moira.Database
log moira.Logger
}

func GetNotifier(logger moira.Logger, database moira.Database, metrics *metrics.HeartBeatMetrics) Heartbeater {
func GetNotifier(logger moira.Logger, database moira.Database) Heartbeater {
return &notifier{
db: database,
log: logger,
metrics: metrics,
db: database,
log: logger,
}
}

func (check notifier) Check(int64) (int64, bool, error) {
state, _ := check.db.GetNotifierState()
if state != moira.SelfStateOK {
check.metrics.MarkNotifierIsAlive(false)

check.log.Error().
String("error", check.GetErrorMessage()).
Msg("Notifier is not healthy")

return 0, true, nil
}
check.metrics.MarkNotifierIsAlive(true)

check.log.Debug().
String("state", state).
Expand Down
5 changes: 1 addition & 4 deletions notifier/selfstate/heartbeat/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"testing"
"time"

"github.com/moira-alert/moira/metrics"

"github.com/moira-alert/moira"
mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert"

Expand Down Expand Up @@ -47,7 +45,6 @@ func TestNotifierState(t *testing.T) {
func createNotifierStateTest(t *testing.T) *notifier {
mockCtrl := gomock.NewController(t)
logger, _ := logging.GetLogger("MetricDelay")
metric := metrics.ConfigureHeartBeatMetrics(metrics.NewDummyRegistry())

return GetNotifier(logger, mock_moira_alert.NewMockDatabase(mockCtrl), metric).(*notifier)
return GetNotifier(logger, mock_moira_alert.NewMockDatabase(mockCtrl)).(*notifier)
}
Loading

0 comments on commit b1aaf83

Please sign in to comment.