diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d434c19ba..55154a94f 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -4,11 +4,11 @@ on: jobs: golangci: name: lint - runs-on: ubuntu-18.04 + runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Run linter - uses: golangci/golangci-lint-action@v2 + uses: golangci/golangci-lint-action@v3 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version version: v1.50.1 diff --git a/.run/go test moira.run.xml b/.run/go test moira.run.xml index b97239289..ed6c05db7 100644 --- a/.run/go test moira.run.xml +++ b/.run/go test moira.run.xml @@ -2,7 +2,7 @@ - + @@ -10,4 +10,4 @@ - + \ No newline at end of file diff --git a/Makefile b/Makefile index c38e7ad41..63c4eec96 100644 --- a/Makefile +++ b/Makefile @@ -41,7 +41,7 @@ mock: .PHONY: test test: - echo 'mode: atomic' > coverage.txt && go list ./... | xargs -n1 -I{} sh -c 'go test -v -bench=. -covermode=atomic -coverprofile=coverage.tmp {} && tail -n +2 coverage.tmp >> coverage.txt' && rm coverage.tmp + echo 'mode: atomic' > coverage.txt && go list ./... | xargs -n1 -I{} sh -c 'go test -race -v -bench=. -covermode=atomic -coverprofile=coverage.tmp {} && tail -n +2 coverage.tmp >> coverage.txt' && rm coverage.tmp .PHONY: build build: diff --git a/cmd/notifier/main.go b/cmd/notifier/main.go index 1202010de..c28737b28 100644 --- a/cmd/notifier/main.go +++ b/cmd/notifier/main.go @@ -102,18 +102,17 @@ func main() { } // Start moira self state checker - selfState := &selfstate.SelfCheckWorker{ - Logger: logger, - Database: database, - Config: config.Notifier.SelfState.getSettings(), - Notifier: sender, - } - if err := selfState.Start(); err != nil { - logger.Fatal(). - Error(err). - Msg("SelfState failed") + if config.Notifier.SelfState.getSettings().Enabled { + selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings()) + if err := selfState.Start(); err != nil { + logger.Fatal(). + Error(err). + Msg("SelfState failed") + } + defer stopSelfStateChecker(selfState) + } else { + logger.Debug().Msg("Moira Self State Monitoring disabled") } - defer stopSelfStateChecker(selfState) // Start moira notification fetcher fetchNotificationsWorker := ¬ifications.FetchNotificationsWorker{ diff --git a/index/batch.go b/index/batch.go index 7255056c8..1fbdc3502 100644 --- a/index/batch.go +++ b/index/batch.go @@ -61,7 +61,7 @@ func (index *Index) getTriggerChecksWithRetries(batch []string) ([]*moira.Trigge return nil, fmt.Errorf("cannot get trigger checks from DB after %d tries, last error: %s", triesCount, err.Error()) } -func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, toIndex int) error { +func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, triggersTotal int) error { indexErrors := make(chan error) wg := &sync.WaitGroup{} defer wg.Wait() @@ -83,8 +83,9 @@ func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.Trigger return } index.logger.Debug(). - Int64("batch_size", count). - Int("triggers_total", toIndex). + Int("batch_size", len(batch)). + Int64("count", atomic.LoadInt64(&count)). + Int("triggers_total", triggersTotal). Msg("Batch of triggers added to index") }(batch) case err, ok := <-getTriggersErrors: diff --git a/index/index_test.go b/index/index_test.go index 4c05c7c31..038d56398 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -88,10 +88,12 @@ func TestIndex_CreateAndFill(t *testing.T) { }) Convey("Test add Triggers to index, batch size is less than number of triggers", t, func() { + const batchSize = 20 + dataBase.EXPECT().GetTriggerChecks(triggerIDs[:batchSize]).Return(triggerChecksPointers[:batchSize], nil) + dataBase.EXPECT().GetTriggerChecks(triggerIDs[batchSize:]).Return(triggerChecksPointers[batchSize:], nil) + index := NewSearchIndex(logger, dataBase, metrics.NewDummyRegistry()) - dataBase.EXPECT().GetTriggerChecks(triggerIDs[:20]).Return(triggerChecksPointers[:20], nil) - dataBase.EXPECT().GetTriggerChecks(triggerIDs[20:]).Return(triggerChecksPointers[20:], nil) - err := index.writeByBatches(triggerIDs, 20) + err := index.writeByBatches(triggerIDs, batchSize) So(err, ShouldBeNil) docCount, _ := index.triggerIndex.GetCount() So(docCount, ShouldEqual, int64(32)) diff --git a/notifier/selfstate/check.go b/notifier/selfstate/check.go index 9aefd0350..489b7f667 100644 --- a/notifier/selfstate/check.go +++ b/notifier/selfstate/check.go @@ -7,7 +7,6 @@ import ( "github.com/moira-alert/moira" "github.com/moira-alert/moira/notifier" - "github.com/moira-alert/moira/notifier/selfstate/heartbeat" ) func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error { @@ -18,27 +17,6 @@ func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error { nextSendErrorMessage := time.Now().Unix() - selfCheck.Heartbeats = make([]heartbeat.Heartbeater, 0, 5) - if heartbeat := heartbeat.GetDatabase(selfCheck.Config.RedisDisconnectDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetFilter(selfCheck.Config.LastMetricReceivedDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetLocalChecker(selfCheck.Config.LastCheckDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil && heartbeat.NeedToCheckOthers() { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetRemoteChecker(selfCheck.Config.LastRemoteCheckDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil && heartbeat.NeedToCheckOthers() { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetNotifier(selfCheck.Logger, selfCheck.Database); heartbeat != nil { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - for { select { case <-stop: @@ -53,7 +31,7 @@ func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error { func (selfCheck *SelfCheckWorker) handleCheckServices(nowTS int64) []moira.NotificationEvent { var events []moira.NotificationEvent //nolint - for _, heartbeat := range selfCheck.Heartbeats { + for _, heartbeat := range selfCheck.heartbeats { currentValue, needSend, err := heartbeat.Check(nowTS) if err != nil { selfCheck.Logger.Error(). diff --git a/notifier/selfstate/selfstate.go b/notifier/selfstate/selfstate.go index 4a6bd89ba..98af88c66 100644 --- a/notifier/selfstate/selfstate.go +++ b/notifier/selfstate/selfstate.go @@ -24,21 +24,20 @@ type SelfCheckWorker struct { Notifier notifier.Notifier Config Config tomb tomb.Tomb - Heartbeats []heartbeat.Heartbeater + heartbeats []heartbeat.Heartbeater +} + +// NewSelfCheckWorker creates SelfCheckWorker. +func NewSelfCheckWorker(logger moira.Logger, database moira.Database, notifier notifier.Notifier, config Config) *SelfCheckWorker { + heartbeats := createStandardHeartbeats(logger, database, config) + return &SelfCheckWorker{Logger: logger, Database: database, Notifier: notifier, Config: config, heartbeats: heartbeats} } // Start self check worker func (selfCheck *SelfCheckWorker) Start() error { - if !selfCheck.Config.Enabled { - selfCheck.Logger.Debug().Msg("Moira Self State Monitoring disabled") - return nil - } senders := selfCheck.Notifier.GetSenders() if err := selfCheck.Config.checkConfig(senders); err != nil { - selfCheck.Logger.Error(). - Error(err). - Msg("Can't configure Moira Self State Monitoring") - return nil + return err } selfCheck.tomb.Go(func() error { @@ -56,14 +55,32 @@ func (selfCheck *SelfCheckWorker) Start() error { // Stop self check worker and wait for finish func (selfCheck *SelfCheckWorker) Stop() error { - if !selfCheck.Config.Enabled { - return nil + selfCheck.tomb.Kill(nil) + return selfCheck.tomb.Wait() +} + +func createStandardHeartbeats(logger moira.Logger, database moira.Database, conf Config) []heartbeat.Heartbeater { + heartbeats := make([]heartbeat.Heartbeater, 0) + + if hb := heartbeat.GetDatabase(conf.RedisDisconnectDelaySeconds, logger, database); hb != nil { + heartbeats = append(heartbeats, hb) } - senders := selfCheck.Notifier.GetSenders() - if err := selfCheck.Config.checkConfig(senders); err != nil { - return nil + + if hb := heartbeat.GetFilter(conf.LastMetricReceivedDelaySeconds, logger, database); hb != nil { + heartbeats = append(heartbeats, hb) } - selfCheck.tomb.Kill(nil) - return selfCheck.tomb.Wait() + if hb := heartbeat.GetLocalChecker(conf.LastCheckDelaySeconds, logger, database); hb != nil && hb.NeedToCheckOthers() { + heartbeats = append(heartbeats, hb) + } + + if hb := heartbeat.GetRemoteChecker(conf.LastRemoteCheckDelaySeconds, logger, database); hb != nil && hb.NeedToCheckOthers() { + heartbeats = append(heartbeats, hb) + } + + if hb := heartbeat.GetNotifier(logger, database); hb != nil { + heartbeats = append(heartbeats, hb) + } + + return heartbeats } diff --git a/notifier/selfstate/selfstate_test.go b/notifier/selfstate/selfstate_test.go index 7bdcbb282..c3c6ad335 100644 --- a/notifier/selfstate/selfstate_test.go +++ b/notifier/selfstate/selfstate_test.go @@ -27,58 +27,57 @@ type selfCheckWorkerMock struct { func TestSelfCheckWorker_selfStateChecker(t *testing.T) { mock := configureWorker(t, true) - mock.selfCheckWorker.Start() //nolint - Convey("Test creation all heartbeat", t, func() { - var nextSendErrorMessage int64 - var events []moira.NotificationEvent + Convey("SelfCheckWorker should call all heartbeats checks", t, func() { mock.database.EXPECT().GetChecksUpdatesCount().Return(int64(1), nil).Times(2) mock.database.EXPECT().GetMetricsUpdatesCount().Return(int64(1), nil) mock.database.EXPECT().GetRemoteChecksUpdatesCount().Return(int64(1), nil) mock.database.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil) mock.database.EXPECT().GetRemoteTriggersToCheckCount().Return(int64(1), nil) mock.database.EXPECT().GetLocalTriggersToCheckCount().Return(int64(1), nil).Times(2) + + // Start worker after configuring Mock to avoid race conditions + err := mock.selfCheckWorker.Start() + So(err, ShouldBeNil) + + So(len(mock.selfCheckWorker.heartbeats), ShouldEqual, 5) + + const oneTickDelay = time.Millisecond * 1500 + time.Sleep(oneTickDelay) // wait for one tick of worker + + err = mock.selfCheckWorker.Stop() + So(err, ShouldBeNil) + }) + + mock.mockCtrl.Finish() +} + +func TestSelfCheckWorker_sendErrorMessages(t *testing.T) { + mock := configureWorker(t, true) + + Convey("Should call notifier send", t, func() { + err := mock.selfCheckWorker.Start() + So(err, ShouldBeNil) + mock.notif.EXPECT().Send(gomock.Any(), gomock.Any()) + var events []moira.NotificationEvent mock.selfCheckWorker.sendErrorMessages(events) - time.Sleep(time.Second / 2) - mock.selfCheckWorker.check(time.Now().Unix(), nextSendErrorMessage) - So(len(mock.selfCheckWorker.Heartbeats), ShouldEqual, 5) + err = mock.selfCheckWorker.Stop() + So(err, ShouldBeNil) }) - mock.selfCheckWorker.Stop() //nolint mock.mockCtrl.Finish() } func TestSelfCheckWorker_Start(t *testing.T) { mock := configureWorker(t, false) + Convey("When Contact not corresponds to any Sender", t, func() { + mock.notif.EXPECT().GetSenders().Return(nil) - Convey("Test start selfCheckWorkerMock", t, func() { - Convey("Test enabled is false", func() { - mock.selfCheckWorker.Config.Enabled = false - mock.selfCheckWorker.Start() //nolint - So(mock.selfCheckWorker.Heartbeats, ShouldBeNil) - }) - Convey("Check for error from checkConfig", func() { - mock.selfCheckWorker.Config.Enabled = true - mock.notif.EXPECT().GetSenders().Return(nil) - mock.selfCheckWorker.Start() //nolint - So(mock.selfCheckWorker.Heartbeats, ShouldBeNil) - }) - }) -} - -func TestSelfCheckWorker_Stop(t *testing.T) { - Convey("Test stop selfCheckWorkerMock", t, func() { - mock := configureWorker(t, false) - Convey("Test enabled is false", func() { - mock.selfCheckWorker.Config.Enabled = false - So(mock.selfCheckWorker.Stop(), ShouldBeNil) - }) - Convey("Check for error from checkConfig", func() { - mock.selfCheckWorker.Config.Enabled = true - mock.notif.EXPECT().GetSenders().Return(nil) - So(mock.selfCheckWorker.Stop(), ShouldBeNil) + Convey("Start should return error", func() { + err := mock.selfCheckWorker.Start() + So(err, ShouldNotBeNil) }) }) } @@ -92,7 +91,7 @@ func TestSelfCheckWorker(t *testing.T) { Convey("Test handle error and no needed send events", func() { check := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) - mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{check} + mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{check} check.EXPECT().Check(now).Return(int64(0), false, err) @@ -104,7 +103,7 @@ func TestSelfCheckWorker(t *testing.T) { first := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) second := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) - mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{first, second} + mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{first, second} first.EXPECT().NeedTurnOffNotifier().Return(true) first.EXPECT().NeedToCheckOthers().Return(false) @@ -121,7 +120,7 @@ func TestSelfCheckWorker(t *testing.T) { first := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) second := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) - mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{first, second} + mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{first, second} nextSendErrorMessage := time.Now().Unix() - time.Hour.Milliseconds() first.EXPECT().Check(now).Return(int64(0), true, nil) @@ -174,15 +173,11 @@ func configureWorker(t *testing.T, isStart bool) *selfCheckWorkerMock { } return &selfCheckWorkerMock{ - selfCheckWorker: &SelfCheckWorker{ - Logger: logger, - Database: database, - Config: conf, - Notifier: notif, - }, - database: database, - notif: notif, - conf: conf, - mockCtrl: mockCtrl, + + selfCheckWorker: NewSelfCheckWorker(logger, database, notif, conf), + database: database, + notif: notif, + conf: conf, + mockCtrl: mockCtrl, } }