Skip to content

Commit

Permalink
Merge pull request #6003 from mysteriumnetwork/fix/datarace
Browse files Browse the repository at this point in the history
Fix data races
  • Loading branch information
Zensey authored Apr 3, 2024
2 parents 39432b9 + bb20c95 commit 6720107
Show file tree
Hide file tree
Showing 15 changed files with 396 additions and 61 deletions.
12 changes: 9 additions & 3 deletions ci/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func TestWithCoverage() error {
return err
}
args := append([]string{"test", "-race", "-timeout", "5m", "-cover", "-coverprofile", "coverage.txt", "-covermode", "atomic"}, packages...)
return sh.RunV("go", args...)

env := make(map[string]string)
env["GORACE"] = "halt_on_error=1"
return sh.RunWith(env, "go", args...)
}

// Test runs unit tests
Expand All @@ -39,8 +42,11 @@ func Test() error {
if err != nil {
return err
}
args := append([]string{"test", "-race", "-count=1", "-timeout", "5m"}, packages...)
return sh.RunV("go", args...)
args := append([]string{"test", "-race", "-timeout", "5m"}, packages...)

env := make(map[string]string)
env["GORACE"] = "halt_on_error=1"
return sh.RunWith(env, "go", args...)
}

func unitTestPackages() ([]string, error) {
Expand Down
36 changes: 29 additions & 7 deletions core/service/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package service
import (
"sync"

"github.com/jinzhu/copier"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"

Expand Down Expand Up @@ -149,13 +150,15 @@ func NewInstance(

// Instance represents a run service
type Instance struct {
ID ID
state servicestate.State
stateLock sync.RWMutex
ProviderID identity.Identity
Type string
Options Options
service Service
ID ID
state servicestate.State
stateLock sync.RWMutex
ProviderID identity.Identity
Type string
Options Options
service Service

muProposal sync.Mutex
Proposal market.ServiceProposal
policyProvider policy.Provider
discovery Discovery
Expand Down Expand Up @@ -189,7 +192,9 @@ func (i *Instance) proposalWithCurrentLocation() market.ServiceProposal {
return i.Proposal
}

i.muProposal.Lock()
i.Proposal.Location = *market.NewLocation(location)
i.muProposal.Unlock()

return i.Proposal
}
Expand Down Expand Up @@ -237,3 +242,20 @@ func (i *Instance) toEvent() servicestate.AppEventServiceStatus {
Status: string(i.state),
}
}

// CopyProposal returns a copy of Proposal
func (i *Instance) CopyProposal() market.ServiceProposal {
i.muProposal.Lock()
defer i.muProposal.Unlock()

var proposal market.ServiceProposal
if err := copier.CopyWithOption(&proposal, i.Proposal, copier.Option{DeepCopy: true}); err != nil {
panic(err)
}
// workaround b/c of copier bug: it make empty slice instead of nil
if i.Proposal.Contacts == nil {
proposal.Contacts = nil
}

return proposal
}
43 changes: 43 additions & 0 deletions core/service/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"errors"
"net"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/mysteriumnetwork/node/market"
"github.com/mysteriumnetwork/node/mocks"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -71,6 +74,46 @@ func Test_Pool_Add(t *testing.T) {
assert.Len(t, pool.instances, 1)
}

func Test_Pool_DataRace(t *testing.T) {
service := &Instance{
service: &mockService{},
eventPublisher: mocks.NewEventBus(),
location: mockLocationResolver{},
}

active := new(atomic.Bool)
active.Store(true)

var wg sync.WaitGroup

wg.Add(2)
go func() {
var location market.Location

defer wg.Done()
for i := 0; i < 100; i++ {
p := service.proposalWithCurrentLocation()
location = p.Location

time.Sleep(1 * time.Millisecond)
}
active.Store(false)
_ = location
}()
go func() {
var proposal market.ServiceProposal

defer wg.Done()
for active.Load() == true {
proposal = service.CopyProposal()

time.Sleep(1 * time.Millisecond)
}
_ = proposal
}()
wg.Wait()
}

func Test_Pool_StopAllSuccess(t *testing.T) {
instance := &Instance{
service: &mockService{},
Expand Down
2 changes: 1 addition & 1 deletion core/service/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewSession(service *Instance, request *pb.SessionRequest, tracer *trace.Tra
ConsumerID: identity.FromAddress(request.GetConsumer().GetId()),
ConsumerLocation: consumerLocation,
HermesID: common.HexToAddress(request.GetConsumer().GetHermesID()),
Proposal: service.Proposal,
Proposal: service.CopyProposal(),
ServiceID: string(service.ID),
CreatedAt: time.Now().UTC(),
request: request,
Expand Down
65 changes: 65 additions & 0 deletions core/service/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2024 The "MysteriumNetwork/node" Authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package service

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/mysteriumnetwork/node/pb"
"github.com/mysteriumnetwork/node/trace"
)

func TestSession_DataRace(t *testing.T) {

service := &Instance{
location: mockLocationResolver{},
}

active := new(atomic.Bool)
active.Store(true)

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

for i := 0; i < 100; i++ {
session, _ := NewSession(service, &pb.SessionRequest{}, trace.NewTracer(""))
_ = session

time.Sleep(time.Millisecond)
}
active.Store(false)
}()

go func() {
defer wg.Done()

for active.Load() == true {
service.proposalWithCurrentLocation()

time.Sleep(time.Millisecond)
}
}()
wg.Wait()

}
12 changes: 6 additions & 6 deletions core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,22 @@ func (k *Keeper) fetchIdentities() []stateEvent.Identity {
log.Warn().Err(err).Msgf("Could not get registration status for %s", id.Address)
status = registry.Unknown
}

hermesID, err := k.deps.IdentityChannelCalculator.GetActiveHermes(k.deps.ChainID)
if err != nil {
log.Warn().Err(err).Msgf("Could not retrieve hermesID for %s", id.Address)
}

channelAddress, err := k.deps.IdentityChannelCalculator.GetActiveChannelAddress(k.deps.ChainID, id.ToCommonAddress())
if err != nil {
log.Warn().Err(err).Msgf("Could not calculate channel address for %s", id.Address)
}

earnings := k.deps.EarningsProvider.GetEarningsDetailed(k.deps.ChainID, id)
balance := k.deps.BalanceProvider.GetBalance(k.deps.ChainID, id)

stateIdentity := stateEvent.Identity{
Address: id.Address,
RegistrationStatus: status,
ChannelAddress: channelAddress,
Balance: k.deps.BalanceProvider.GetBalance(k.deps.ChainID, id),
Balance: balance,
Earnings: earnings.Total.UnsettledBalance,
EarningsTotal: earnings.Total.LifetimeBalance,
HermesID: hermesID,
Expand Down Expand Up @@ -241,9 +240,10 @@ func (k *Keeper) updateServices() {
// merge in the connection statistics
match, _ := k.getServiceByID(string(v.ID))

priced, err := k.deps.ProposalPricer.EnrichProposalWithPrice(v.Proposal)
proposal := v.CopyProposal()
priced, err := k.deps.ProposalPricer.EnrichProposalWithPrice(proposal)
if err != nil {
log.Warn().Msgf("could not load price for proposal %v(%v)", v.Proposal.ProviderID, v.Proposal.ServiceType)
log.Warn().Msgf("could not load price for proposal %v(%v)", proposal.ProviderID, proposal.ServiceType)
}

prop := contract.NewProposalDTO(priced)
Expand Down
4 changes: 3 additions & 1 deletion eventbus/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ func (b *simplifiedEventBus) Publish(topic string, data interface{}) {

b.mu.RLock()
ids := b.sub[topic]
idsCopy := make([]string, len(ids))
copy(idsCopy, ids)
b.mu.RUnlock()

for _, id := range ids {
for _, id := range idsCopy {
b.bus.Publish(topic+id, data)
}
}
Expand Down
34 changes: 34 additions & 0 deletions eventbus/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package eventbus

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -69,3 +72,34 @@ func TestUnsubscribeMethod(t *testing.T) {
assert.Equal(t, 1, h.val)
assert.Equal(t, 7, h2.val)
}

func Test_simplifiedEventBus_Publish_DataRace(t *testing.T) {
eventBus := New()

fn := func(data string) {}

active := new(atomic.Bool)
active.Store(true)

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

for i := 0; i < 100; i++ {
eventBus.SubscribeWithUID("topic", "1", fn)
eventBus.Publish("topic", "test data")
time.Sleep(time.Millisecond)
}
active.Store(false)
}()
go func() {
defer wg.Done()
for active.Load() == true {
eventBus.UnsubscribeWithUID("topic", "1", fn)
time.Sleep(time.Millisecond)
}
}()
wg.Wait()
}
11 changes: 9 additions & 2 deletions services/wireguard/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/jinzhu/copier"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"

Expand Down Expand Up @@ -161,13 +162,13 @@ func (m *Manager) ProvideConfig(sessionID string, sessionConfig json.RawMessage,
destroy := func() {
log.Info().Msgf("Cleaning up session %s", sessionID)
m.sessionCleanupMu.Lock()
defer m.sessionCleanupMu.Unlock()
_, ok := m.sessionCleanup[sessionID]
if !ok {
log.Info().Msgf("Session '%s' was already cleaned up, returning without changes", sessionID)
return
}
delete(m.sessionCleanup, sessionID)
m.sessionCleanupMu.Unlock()

statsPublisher.stop()

Expand Down Expand Up @@ -273,7 +274,13 @@ func (m *Manager) Stop() error {
defer m.startStopMu.Unlock()

cleanupWg := sync.WaitGroup{}
for k, v := range m.sessionCleanup {

// prevent concurrent iteration and write
sessionCleanupCopy := make(map[string]func())
if err := copier.Copy(&sessionCleanupCopy, m.sessionCleanup); err != nil {
panic(err)
}
for k, v := range sessionCleanupCopy {
cleanupWg.Add(1)
go func(sessionID string, cleanup func()) {
defer cleanupWg.Done()
Expand Down
Loading

0 comments on commit 6720107

Please sign in to comment.