Skip to content

Commit

Permalink
Fixed data races (#807)
Browse files Browse the repository at this point in the history
* fix(notifier): Fixed race in notifier test

Race condition was when accessing to SelfCheckWorker.Heartbeats on read because creation of Heartbeats was in worker goroutine (Start -> selfStateChecker). Fixed race condition by moving heartbeats creation to constructor of SelfCheckWorker. Made heartbeats internal to pay attention that it is not public by design. Created error variables to indicate that self state monitor is not started.

* fix(notifier): Fixed race in index test

Race condition was when accessing to trigger count without atomic on read. Fixed race by adding atomic load.

* feature: Added race flag to tests

Now tests will run with ``-race` flag to find data races at testing time.

* Log only. Also change to Info log level because it is just information about self checker state, it is expected behavior if checker is off

* remove unnecessary `.Error()` call

* don't log twice

* remove useless .Error() call

* godoc

* refactor: Remove dead code

Seems `Stop` method works without these checks. It is strange to check `Enabled` and check config in Stop method, I think checks in `Start` method is enough.
Also remove test, it is meaningless to test Stop method without Start method.

* refactor: SelfStateChecker system design

As I understand It is bad system design - each method must check Enabled flag. I rewrite it to new design, in which SelfCheckWorker instance will be created (initialized) in main func only if Enabled is true.

* Log if disabled

* fix log

* CI: upd ubuntu version

* linter version
  • Loading branch information
Dimedrolity authored Apr 6, 2023
1 parent 6175cb3 commit fe53343
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 108 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ on:
jobs:
golangci:
name: lint
runs-on: ubuntu-18.04
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Run linter
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v3
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.50.1
Expand Down
4 changes: 2 additions & 2 deletions .run/go test moira.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
<configuration default="false" name="go test moira" type="GoTestRunConfiguration" factoryName="Go Test" nameIsGenerated="true">
<module name="moira" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="-p 1" />
<parameters value="-p 1 -race" />
<kind value="DIRECTORY" />
<package value="github.com/moira-alert/moira" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$" />
<framework value="gotest" />
<method v="2" />
</configuration>
</component>
</component>
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mock:

.PHONY: test
test:
echo 'mode: atomic' > coverage.txt && go list ./... | xargs -n1 -I{} sh -c 'go test -v -bench=. -covermode=atomic -coverprofile=coverage.tmp {} && tail -n +2 coverage.tmp >> coverage.txt' && rm coverage.tmp
echo 'mode: atomic' > coverage.txt && go list ./... | xargs -n1 -I{} sh -c 'go test -race -v -bench=. -covermode=atomic -coverprofile=coverage.tmp {} && tail -n +2 coverage.tmp >> coverage.txt' && rm coverage.tmp

.PHONY: build
build:
Expand Down
21 changes: 10 additions & 11 deletions cmd/notifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,17 @@ func main() {
}

// Start moira self state checker
selfState := &selfstate.SelfCheckWorker{
Logger: logger,
Database: database,
Config: config.Notifier.SelfState.getSettings(),
Notifier: sender,
}
if err := selfState.Start(); err != nil {
logger.Fatal().
Error(err).
Msg("SelfState failed")
if config.Notifier.SelfState.getSettings().Enabled {
selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings())
if err := selfState.Start(); err != nil {
logger.Fatal().
Error(err).
Msg("SelfState failed")
}
defer stopSelfStateChecker(selfState)
} else {
logger.Debug().Msg("Moira Self State Monitoring disabled")
}
defer stopSelfStateChecker(selfState)

// Start moira notification fetcher
fetchNotificationsWorker := &notifications.FetchNotificationsWorker{
Expand Down
7 changes: 4 additions & 3 deletions index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (index *Index) getTriggerChecksWithRetries(batch []string) ([]*moira.Trigge
return nil, fmt.Errorf("cannot get trigger checks from DB after %d tries, last error: %s", triesCount, err.Error())
}

func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, toIndex int) error {
func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, triggersTotal int) error {
indexErrors := make(chan error)
wg := &sync.WaitGroup{}
defer wg.Wait()
Expand All @@ -83,8 +83,9 @@ func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.Trigger
return
}
index.logger.Debug().
Int64("batch_size", count).
Int("triggers_total", toIndex).
Int("batch_size", len(batch)).
Int64("count", atomic.LoadInt64(&count)).
Int("triggers_total", triggersTotal).
Msg("Batch of triggers added to index")
}(batch)
case err, ok := <-getTriggersErrors:
Expand Down
8 changes: 5 additions & 3 deletions index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ func TestIndex_CreateAndFill(t *testing.T) {
})

Convey("Test add Triggers to index, batch size is less than number of triggers", t, func() {
const batchSize = 20
dataBase.EXPECT().GetTriggerChecks(triggerIDs[:batchSize]).Return(triggerChecksPointers[:batchSize], nil)
dataBase.EXPECT().GetTriggerChecks(triggerIDs[batchSize:]).Return(triggerChecksPointers[batchSize:], nil)

index := NewSearchIndex(logger, dataBase, metrics.NewDummyRegistry())
dataBase.EXPECT().GetTriggerChecks(triggerIDs[:20]).Return(triggerChecksPointers[:20], nil)
dataBase.EXPECT().GetTriggerChecks(triggerIDs[20:]).Return(triggerChecksPointers[20:], nil)
err := index.writeByBatches(triggerIDs, 20)
err := index.writeByBatches(triggerIDs, batchSize)
So(err, ShouldBeNil)
docCount, _ := index.triggerIndex.GetCount()
So(docCount, ShouldEqual, int64(32))
Expand Down
24 changes: 1 addition & 23 deletions notifier/selfstate/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/notifier"
"github.com/moira-alert/moira/notifier/selfstate/heartbeat"
)

func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error {
Expand All @@ -18,27 +17,6 @@ func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error {

nextSendErrorMessage := time.Now().Unix()

selfCheck.Heartbeats = make([]heartbeat.Heartbeater, 0, 5)
if heartbeat := heartbeat.GetDatabase(selfCheck.Config.RedisDisconnectDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil {
selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat)
}

if heartbeat := heartbeat.GetFilter(selfCheck.Config.LastMetricReceivedDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil {
selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat)
}

if heartbeat := heartbeat.GetLocalChecker(selfCheck.Config.LastCheckDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil && heartbeat.NeedToCheckOthers() {
selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat)
}

if heartbeat := heartbeat.GetRemoteChecker(selfCheck.Config.LastRemoteCheckDelaySeconds, selfCheck.Logger, selfCheck.Database); heartbeat != nil && heartbeat.NeedToCheckOthers() {
selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat)
}

if heartbeat := heartbeat.GetNotifier(selfCheck.Logger, selfCheck.Database); heartbeat != nil {
selfCheck.Heartbeats = append(selfCheck.Heartbeats, heartbeat)
}

for {
select {
case <-stop:
Expand All @@ -53,7 +31,7 @@ func (selfCheck *SelfCheckWorker) selfStateChecker(stop <-chan struct{}) error {
func (selfCheck *SelfCheckWorker) handleCheckServices(nowTS int64) []moira.NotificationEvent {
var events []moira.NotificationEvent //nolint

for _, heartbeat := range selfCheck.Heartbeats {
for _, heartbeat := range selfCheck.heartbeats {
currentValue, needSend, err := heartbeat.Check(nowTS)
if err != nil {
selfCheck.Logger.Error().
Expand Down
49 changes: 33 additions & 16 deletions notifier/selfstate/selfstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@ type SelfCheckWorker struct {
Notifier notifier.Notifier
Config Config
tomb tomb.Tomb
Heartbeats []heartbeat.Heartbeater
heartbeats []heartbeat.Heartbeater
}

// NewSelfCheckWorker creates SelfCheckWorker.
func NewSelfCheckWorker(logger moira.Logger, database moira.Database, notifier notifier.Notifier, config Config) *SelfCheckWorker {
heartbeats := createStandardHeartbeats(logger, database, config)
return &SelfCheckWorker{Logger: logger, Database: database, Notifier: notifier, Config: config, heartbeats: heartbeats}
}

// Start self check worker
func (selfCheck *SelfCheckWorker) Start() error {
if !selfCheck.Config.Enabled {
selfCheck.Logger.Debug().Msg("Moira Self State Monitoring disabled")
return nil
}
senders := selfCheck.Notifier.GetSenders()
if err := selfCheck.Config.checkConfig(senders); err != nil {
selfCheck.Logger.Error().
Error(err).
Msg("Can't configure Moira Self State Monitoring")
return nil
return err
}

selfCheck.tomb.Go(func() error {
Expand All @@ -56,14 +55,32 @@ func (selfCheck *SelfCheckWorker) Start() error {

// Stop self check worker and wait for finish
func (selfCheck *SelfCheckWorker) Stop() error {
if !selfCheck.Config.Enabled {
return nil
selfCheck.tomb.Kill(nil)
return selfCheck.tomb.Wait()
}

func createStandardHeartbeats(logger moira.Logger, database moira.Database, conf Config) []heartbeat.Heartbeater {
heartbeats := make([]heartbeat.Heartbeater, 0)

if hb := heartbeat.GetDatabase(conf.RedisDisconnectDelaySeconds, logger, database); hb != nil {
heartbeats = append(heartbeats, hb)
}
senders := selfCheck.Notifier.GetSenders()
if err := selfCheck.Config.checkConfig(senders); err != nil {
return nil

if hb := heartbeat.GetFilter(conf.LastMetricReceivedDelaySeconds, logger, database); hb != nil {
heartbeats = append(heartbeats, hb)
}

selfCheck.tomb.Kill(nil)
return selfCheck.tomb.Wait()
if hb := heartbeat.GetLocalChecker(conf.LastCheckDelaySeconds, logger, database); hb != nil && hb.NeedToCheckOthers() {
heartbeats = append(heartbeats, hb)
}

if hb := heartbeat.GetRemoteChecker(conf.LastRemoteCheckDelaySeconds, logger, database); hb != nil && hb.NeedToCheckOthers() {
heartbeats = append(heartbeats, hb)
}

if hb := heartbeat.GetNotifier(logger, database); hb != nil {
heartbeats = append(heartbeats, hb)
}

return heartbeats
}
89 changes: 42 additions & 47 deletions notifier/selfstate/selfstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,58 +27,57 @@ type selfCheckWorkerMock struct {

func TestSelfCheckWorker_selfStateChecker(t *testing.T) {
mock := configureWorker(t, true)
mock.selfCheckWorker.Start() //nolint
Convey("Test creation all heartbeat", t, func() {
var nextSendErrorMessage int64
var events []moira.NotificationEvent
Convey("SelfCheckWorker should call all heartbeats checks", t, func() {
mock.database.EXPECT().GetChecksUpdatesCount().Return(int64(1), nil).Times(2)
mock.database.EXPECT().GetMetricsUpdatesCount().Return(int64(1), nil)
mock.database.EXPECT().GetRemoteChecksUpdatesCount().Return(int64(1), nil)
mock.database.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil)
mock.database.EXPECT().GetRemoteTriggersToCheckCount().Return(int64(1), nil)
mock.database.EXPECT().GetLocalTriggersToCheckCount().Return(int64(1), nil).Times(2)

// Start worker after configuring Mock to avoid race conditions
err := mock.selfCheckWorker.Start()
So(err, ShouldBeNil)

So(len(mock.selfCheckWorker.heartbeats), ShouldEqual, 5)

const oneTickDelay = time.Millisecond * 1500
time.Sleep(oneTickDelay) // wait for one tick of worker

err = mock.selfCheckWorker.Stop()
So(err, ShouldBeNil)
})

mock.mockCtrl.Finish()
}

func TestSelfCheckWorker_sendErrorMessages(t *testing.T) {
mock := configureWorker(t, true)

Convey("Should call notifier send", t, func() {
err := mock.selfCheckWorker.Start()
So(err, ShouldBeNil)

mock.notif.EXPECT().Send(gomock.Any(), gomock.Any())

var events []moira.NotificationEvent
mock.selfCheckWorker.sendErrorMessages(events)
time.Sleep(time.Second / 2)
mock.selfCheckWorker.check(time.Now().Unix(), nextSendErrorMessage)

So(len(mock.selfCheckWorker.Heartbeats), ShouldEqual, 5)
err = mock.selfCheckWorker.Stop()
So(err, ShouldBeNil)
})

mock.selfCheckWorker.Stop() //nolint
mock.mockCtrl.Finish()
}

func TestSelfCheckWorker_Start(t *testing.T) {
mock := configureWorker(t, false)
Convey("When Contact not corresponds to any Sender", t, func() {
mock.notif.EXPECT().GetSenders().Return(nil)

Convey("Test start selfCheckWorkerMock", t, func() {
Convey("Test enabled is false", func() {
mock.selfCheckWorker.Config.Enabled = false
mock.selfCheckWorker.Start() //nolint
So(mock.selfCheckWorker.Heartbeats, ShouldBeNil)
})
Convey("Check for error from checkConfig", func() {
mock.selfCheckWorker.Config.Enabled = true
mock.notif.EXPECT().GetSenders().Return(nil)
mock.selfCheckWorker.Start() //nolint
So(mock.selfCheckWorker.Heartbeats, ShouldBeNil)
})
})
}

func TestSelfCheckWorker_Stop(t *testing.T) {
Convey("Test stop selfCheckWorkerMock", t, func() {
mock := configureWorker(t, false)
Convey("Test enabled is false", func() {
mock.selfCheckWorker.Config.Enabled = false
So(mock.selfCheckWorker.Stop(), ShouldBeNil)
})
Convey("Check for error from checkConfig", func() {
mock.selfCheckWorker.Config.Enabled = true
mock.notif.EXPECT().GetSenders().Return(nil)
So(mock.selfCheckWorker.Stop(), ShouldBeNil)
Convey("Start should return error", func() {
err := mock.selfCheckWorker.Start()
So(err, ShouldNotBeNil)
})
})
}
Expand All @@ -92,7 +91,7 @@ func TestSelfCheckWorker(t *testing.T) {

Convey("Test handle error and no needed send events", func() {
check := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl)
mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{check}
mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{check}

check.EXPECT().Check(now).Return(int64(0), false, err)

Expand All @@ -104,7 +103,7 @@ func TestSelfCheckWorker(t *testing.T) {
first := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl)
second := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl)

mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{first, second}
mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{first, second}

first.EXPECT().NeedTurnOffNotifier().Return(true)
first.EXPECT().NeedToCheckOthers().Return(false)
Expand All @@ -121,7 +120,7 @@ func TestSelfCheckWorker(t *testing.T) {
first := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl)
second := mock_heartbeat.NewMockHeartbeater(mock.mockCtrl)

mock.selfCheckWorker.Heartbeats = []heartbeat.Heartbeater{first, second}
mock.selfCheckWorker.heartbeats = []heartbeat.Heartbeater{first, second}
nextSendErrorMessage := time.Now().Unix() - time.Hour.Milliseconds()

first.EXPECT().Check(now).Return(int64(0), true, nil)
Expand Down Expand Up @@ -174,15 +173,11 @@ func configureWorker(t *testing.T, isStart bool) *selfCheckWorkerMock {
}

return &selfCheckWorkerMock{
selfCheckWorker: &SelfCheckWorker{
Logger: logger,
Database: database,
Config: conf,
Notifier: notif,
},
database: database,
notif: notif,
conf: conf,
mockCtrl: mockCtrl,

selfCheckWorker: NewSelfCheckWorker(logger, database, notif, conf),
database: database,
notif: notif,
conf: conf,
mockCtrl: mockCtrl,
}
}

0 comments on commit fe53343

Please sign in to comment.