From fe53343e3098289be59714201926ba14122ef149 Mon Sep 17 00:00:00 2001 From: Dmitry Redkin <48379797+Dimedrolity@users.noreply.github.com> Date: Thu, 6 Apr 2023 14:50:42 +0500 Subject: [PATCH] Fixed data races (#807) * fix(notifier): Fixed race in notifier test Race condition was when accessing to SelfCheckWorker.Heartbeats on read because creation of Heartbeats was in worker goroutine (Start -> selfStateChecker). Fixed race condition by moving heartbeats creation to constructor of SelfCheckWorker. Made heartbeats internal to pay attention that it is not public by design. Created error variables to indicate that self state monitor is not started. * fix(notifier): Fixed race in index test Race condition was when accessing to trigger count without atomic on read. Fixed race by adding atomic load. * feature: Added race flag to tests Now tests will run with ``-race` flag to find data races at testing time. * Log only. Also change to Info log level because it is just information about self checker state, it is expected behavior if checker is off * remove unnecessary `.Error()` call * don't log twice * remove useless .Error() call * godoc * refactor: Remove dead code Seems `Stop` method works without these checks. It is strange to check `Enabled` and check config in Stop method, I think checks in `Start` method is enough. Also remove test, it is meaningless to test Stop method without Start method. * refactor: SelfStateChecker system design As I understand It is bad system design - each method must check Enabled flag. I rewrite it to new design, in which SelfCheckWorker instance will be created (initialized) in main func only if Enabled is true. * Log if disabled * fix log * CI: upd ubuntu version * linter version --- .github/workflows/lint.yml | 4 +- .run/go test moira.run.xml | 4 +- Makefile | 2 +- cmd/notifier/main.go | 21 ++++--- index/batch.go | 7 ++- index/index_test.go | 8 ++- notifier/selfstate/check.go | 24 +------- notifier/selfstate/selfstate.go | 49 ++++++++++----- notifier/selfstate/selfstate_test.go | 89 +++++++++++++--------------- 9 files changed, 100 insertions(+), 108 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d434c19ba..55154a94f 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -4,11 +4,11 @@ on: jobs: golangci: name: lint - runs-on: ubuntu-18.04 + runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Run linter - uses: golangci/golangci-lint-action@v2 + uses: golangci/golangci-lint-action@v3 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version version: v1.50.1 diff --git a/.run/go test moira.run.xml b/.run/go test moira.run.xml index b97239289..ed6c05db7 100644 --- a/.run/go test moira.run.xml +++ b/.run/go test moira.run.xml @@ -2,7 +2,7 @@ - + @@ -10,4 +10,4 @@ - + \ No newline at end of file diff --git a/Makefile b/Makefile index c38e7ad41..63c4eec96 100644 --- a/Makefile +++ b/Makefile @@ -41,7 +41,7 @@ mock: .PHONY: test test: - echo 'mode: atomic' > coverage.txt && go list ./... | xargs -n1 -I{} sh -c 'go test -v -bench=. -covermode=atomic -coverprofile=coverage.tmp {} && tail -n +2 coverage.tmp >> coverage.txt' && rm coverage.tmp + echo 'mode: atomic' > coverage.txt && go list ./... | xargs -n1 -I{} sh -c 'go test -race -v -bench=. -covermode=atomic -coverprofile=coverage.tmp {} && tail -n +2 coverage.tmp >> coverage.txt' && rm coverage.tmp .PHONY: build build: diff --git a/cmd/notifier/main.go b/cmd/notifier/main.go index 1202010de..c28737b28 100644 --- a/cmd/notifier/main.go +++ b/cmd/notifier/main.go @@ -102,18 +102,17 @@ func main() { } // Start moira self state checker - selfState := &selfstate.SelfCheckWorker{ - Logger: logger, - Database: database, - Config: config.Notifier.SelfState.getSettings(), - Notifier: sender, - } - if err := selfState.Start(); err != nil { - logger.Fatal(). - Error(err). - Msg("SelfState failed") + if config.Notifier.SelfState.getSettings().Enabled { + selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings()) + if err := selfState.Start(); err != nil { + logger.Fatal(). + Error(err). + Msg("SelfState failed") + } + defer stopSelfStateChecker(selfState) + } else { + logger.Debug().Msg("Moira Self State Monitoring disabled") } - defer stopSelfStateChecker(selfState) // Start moira notification fetcher fetchNotificationsWorker := ¬ifications.FetchNotificationsWorker{ diff --git a/index/batch.go b/index/batch.go index 7255056c8..1fbdc3502 100644 --- a/index/batch.go +++ b/index/batch.go @@ -61,7 +61,7 @@ func (index *Index) getTriggerChecksWithRetries(batch []string) ([]*moira.Trigge return nil, fmt.Errorf("cannot get trigger checks from DB after %d tries, last error: %s", triesCount, err.Error()) } -func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, toIndex int) error { +func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, triggersTotal int) error { indexErrors := make(chan error) wg := &sync.WaitGroup{} defer wg.Wait() @@ -83,8 +83,9 @@ func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.Trigger return } index.logger.Debug(). - Int64("batch_size", count). - Int("triggers_total", toIndex). + Int("batch_size", len(batch)). + Int64("count", atomic.LoadInt64(&count)). + Int("triggers_total", triggersTotal). Msg("Batch of triggers added to index") }(batch) case err, ok := <-getTriggersErrors: diff --git a/index/index_test.go b/index/index_test.go index 4c05c7c31..038d56398 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -88,10 +88,12 @@ func TestIndex_CreateAndFill(t *testing.T) { }) Convey("Test add Triggers to index, batch size is less than number of triggers", t, func() { + const batchSize = 20 + dataBase.EXPECT().GetTriggerChecks(triggerIDs[:batchSize]).Return(triggerChecksPointers[:batchSize], nil) + dataBase.EXPECT().GetTriggerChecks(triggerIDs[batchSize:]).Return(triggerChecksPointers[batchSize:], nil) + index := NewSearchIndex(logger, dataBase, metrics.NewDummyRegistry()) - dataBase.EXPECT().GetTriggerChecks(triggerIDs[:20]).Return(triggerChecksPointers[:20], nil) - dataBase.EXPECT().GetTriggerChecks(triggerIDs[20:]).Return(triggerChecksPointers[20:], nil) - err := index.writeByBatches(triggerIDs, 20) + err := index.writeByBatches(triggerIDs, batchSize) So(err, ShouldBeNil) docCount, _ := index.triggerIndex.GetCount() So(docCount, ShouldEqual, int64(32)) diff --git a/notifier/selfstate/check.go b/notifier/selfstate/check.go index 9aefd0350..489b7f667 100644 --- a/notifier/selfstate/check.go +++ b/notifier/selfstate/check.go @@ -7,7 +7,6 @@ import ( "github.com/moira-alert/moira" "github.com/moira-alert/moira/notifier" - "github.com/moira-alert/moira/notifier/selfstate/heartbeat" ) func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error { @@ -18,27 +17,6 @@ func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error { nextSendErrorMessage := time.Now().Unix() - selfCheck.Heartbeats = make([]heartbeat.Heartbeater, 0, 5) - if heartbeat := heartbeat.GetDatabase(selfCheck.Config.RedisDisconnectDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetFilter(selfCheck.Config.LastMetricReceivedDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetLocalChecker(selfCheck.Config.LastCheckDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil && heartbeat.NeedToCheckOthers() { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetRemoteChecker(selfCheck.Config.LastRemoteCheckDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil && heartbeat.NeedToCheckOthers() { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - - if heartbeat := heartbeat.GetNotifier(selfCheck.Logger, selfCheck.Database); heartbeat != nil { - selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat) - } - for { select { case <-stop: @@ -53,7 +31,7 @@ func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error { func (selfCheck *SelfCheckWorker) handleCheckServices(nowTS int64) []moira.NotificationEvent { var events []moira.NotificationEvent //nolint - for _, heartbeat := range selfCheck.Heartbeats { + for _, heartbeat := range selfCheck.heartbeats { currentValue, needSend, err := heartbeat.Check(nowTS) if err != nil { selfCheck.Logger.Error(). diff --git a/notifier/selfstate/selfstate.go b/notifier/selfstate/selfstate.go index 4a6bd89ba..98af88c66 100644 --- a/notifier/selfstate/selfstate.go +++ b/notifier/selfstate/selfstate.go @@ -24,21 +24,20 @@ type SelfCheckWorker struct { Notifier notifier.Notifier Config Config tomb tomb.Tomb - Heartbeats []heartbeat.Heartbeater + heartbeats []heartbeat.Heartbeater +} + +// NewSelfCheckWorker creates SelfCheckWorker. +func NewSelfCheckWorker(logger moira.Logger, database moira.Database, notifier notifier.Notifier, config Config) *SelfCheckWorker { + heartbeats := createStandardHeartbeats(logger, database, config) + return &SelfCheckWorker{Logger: logger, Database: database, Notifier: notifier, Config: config, heartbeats: heartbeats} } // Start self check worker func (selfCheck *SelfCheckWorker) Start() error { - if !selfCheck.Config.Enabled { - selfCheck.Logger.Debug().Msg("Moira Self State Monitoring disabled") - return nil - } senders := selfCheck.Notifier.GetSenders() if err := selfCheck.Config.checkConfig(senders); err != nil { - selfCheck.Logger.Error(). - Error(err). - Msg("Can't configure Moira Self State Monitoring") - return nil + return err } selfCheck.tomb.Go(func() error { @@ -56,14 +55,32 @@ func (selfCheck *SelfCheckWorker) Start() error { // Stop self check worker and wait for finish func (selfCheck *SelfCheckWorker) Stop() error { - if !selfCheck.Config.Enabled { - return nil + selfCheck.tomb.Kill(nil) + return selfCheck.tomb.Wait() +} + +func createStandardHeartbeats(logger moira.Logger, database moira.Database, conf Config) []heartbeat.Heartbeater { + heartbeats := make([]heartbeat.Heartbeater, 0) + + if hb := heartbeat.GetDatabase(conf.RedisDisconnectDelaySeconds, logger, database); hb != nil { + heartbeats = append(heartbeats, hb) } - senders := selfCheck.Notifier.GetSenders() - if err := selfCheck.Config.checkConfig(senders); err != nil { - return nil + + if hb := heartbeat.GetFilter(conf.LastMetricReceivedDelaySeconds, logger, database); hb != nil { + heartbeats = append(heartbeats, hb) } - selfCheck.tomb.Kill(nil) - return selfCheck.tomb.Wait() + if hb := heartbeat.GetLocalChecker(conf.LastCheckDelaySeconds, logger, database); hb != nil && hb.NeedToCheckOthers() { + heartbeats = append(heartbeats, hb) + } + + if hb := heartbeat.GetRemoteChecker(conf.LastRemoteCheckDelaySeconds, logger, database); hb != nil && hb.NeedToCheckOthers() { + heartbeats = append(heartbeats, hb) + } + + if hb := heartbeat.GetNotifier(logger, database); hb != nil { + heartbeats = append(heartbeats, hb) + } + + return heartbeats } diff --git a/notifier/selfstate/selfstate_test.go b/notifier/selfstate/selfstate_test.go index 7bdcbb282..c3c6ad335 100644 --- a/notifier/selfstate/selfstate_test.go +++ b/notifier/selfstate/selfstate_test.go @@ -27,58 +27,57 @@ type selfCheckWorkerMock struct { func TestSelfCheckWorker_selfStateChecker(t *testing.T) { mock := configureWorker(t, true) - mock.selfCheckWorker.Start() //nolint - Convey("Test creation all heartbeat", t, func() { - var nextSendErrorMessage int64 - var events []moira.NotificationEvent + Convey("SelfCheckWorker should call all heartbeats checks", t, func() { mock.database.EXPECT().GetChecksUpdatesCount().Return(int64(1), nil).Times(2) mock.database.EXPECT().GetMetricsUpdatesCount().Return(int64(1), nil) mock.database.EXPECT().GetRemoteChecksUpdatesCount().Return(int64(1), nil) mock.database.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil) mock.database.EXPECT().GetRemoteTriggersToCheckCount().Return(int64(1), nil) mock.database.EXPECT().GetLocalTriggersToCheckCount().Return(int64(1), nil).Times(2) + + // Start worker after configuring Mock to avoid race conditions + err := mock.selfCheckWorker.Start() + So(err, ShouldBeNil) + + So(len(mock.selfCheckWorker.heartbeats), ShouldEqual, 5) + + const oneTickDelay = time.Millisecond * 1500 + time.Sleep(oneTickDelay) // wait for one tick of worker + + err = mock.selfCheckWorker.Stop() + So(err, ShouldBeNil) + }) + + mock.mockCtrl.Finish() +} + +func TestSelfCheckWorker_sendErrorMessages(t *testing.T) { + mock := configureWorker(t, true) + + Convey("Should call notifier send", t, func() { + err := mock.selfCheckWorker.Start() + So(err, ShouldBeNil) + mock.notif.EXPECT().Send(gomock.Any(), gomock.Any()) + var events []moira.NotificationEvent mock.selfCheckWorker.sendErrorMessages(events) - time.Sleep(time.Second / 2) - mock.selfCheckWorker.check(time.Now().Unix(), nextSendErrorMessage) - So(len(mock.selfCheckWorker.Heartbeats), ShouldEqual, 5) + err = mock.selfCheckWorker.Stop() + So(err, ShouldBeNil) }) - mock.selfCheckWorker.Stop() //nolint mock.mockCtrl.Finish() } func TestSelfCheckWorker_Start(t *testing.T) { mock := configureWorker(t, false) + Convey("When Contact not corresponds to any Sender", t, func() { + mock.notif.EXPECT().GetSenders().Return(nil) - Convey("Test start selfCheckWorkerMock", t, func() { - Convey("Test enabled is false", func() { - mock.selfCheckWorker.Config.Enabled = false - mock.selfCheckWorker.Start() //nolint - So(mock.selfCheckWorker.Heartbeats, ShouldBeNil) - }) - Convey("Check for error from checkConfig", func() { - mock.selfCheckWorker.Config.Enabled = true - mock.notif.EXPECT().GetSenders().Return(nil) - mock.selfCheckWorker.Start() //nolint - So(mock.selfCheckWorker.Heartbeats, ShouldBeNil) - }) - }) -} - -func TestSelfCheckWorker_Stop(t *testing.T) { - Convey("Test stop selfCheckWorkerMock", t, func() { - mock := configureWorker(t, false) - Convey("Test enabled is false", func() { - mock.selfCheckWorker.Config.Enabled = false - So(mock.selfCheckWorker.Stop(), ShouldBeNil) - }) - Convey("Check for error from checkConfig", func() { - mock.selfCheckWorker.Config.Enabled = true - mock.notif.EXPECT().GetSenders().Return(nil) - So(mock.selfCheckWorker.Stop(), ShouldBeNil) + Convey("Start should return error", func() { + err := mock.selfCheckWorker.Start() + So(err, ShouldNotBeNil) }) }) } @@ -92,7 +91,7 @@ func TestSelfCheckWorker(t *testing.T) { Convey("Test handle error and no needed send events", func() { check := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) - mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{check} + mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{check} check.EXPECT().Check(now).Return(int64(0), false, err) @@ -104,7 +103,7 @@ func TestSelfCheckWorker(t *testing.T) { first := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) second := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) - mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{first, second} + mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{first, second} first.EXPECT().NeedTurnOffNotifier().Return(true) first.EXPECT().NeedToCheckOthers().Return(false) @@ -121,7 +120,7 @@ func TestSelfCheckWorker(t *testing.T) { first := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) second := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl) - mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{first, second} + mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{first, second} nextSendErrorMessage := time.Now().Unix() - time.Hour.Milliseconds() first.EXPECT().Check(now).Return(int64(0), true, nil) @@ -174,15 +173,11 @@ func configureWorker(t *testing.T, isStart bool) *selfCheckWorkerMock { } return &selfCheckWorkerMock{ - selfCheckWorker: &SelfCheckWorker{ - Logger: logger, - Database: database, - Config: conf, - Notifier: notif, - }, - database: database, - notif: notif, - conf: conf, - mockCtrl: mockCtrl, + + selfCheckWorker: NewSelfCheckWorker(logger, database, notif, conf), + database: database, + notif: notif, + conf: conf, + mockCtrl: mockCtrl, } }