Skip to content

Commit 094a538

Browse files
nurzhan-saktaganovKaymeKaydex
authored andcommitted
Router: get rid of nameToReplicasetMutex, use atomic instead
1 parent 0975367 commit 094a538

File tree

5 files changed

+59
-56
lines changed

5 files changed

+59
-56
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGES:
44
* Slog provider moved to providers directory.
55
* More strict check of vshard.storage.call response.
66
* Bump go-tarantool from v2.3.0 to v2.3.1.
7+
* Get rid of nameToReplicasetMutex, use atomic instead.
78

89
## v2.0.5
910

api_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,6 @@ import (
99
"github.com/vmihailenco/msgpack/v5/msgpcode"
1010
)
1111

12-
var emptyRouter = &Router{
13-
cfg: Config{
14-
TotalBucketCount: uint64(10),
15-
Loggerf: emptyLogfProvider,
16-
Metrics: emptyMetricsProvider,
17-
},
18-
}
19-
2012
func TestVshardMode_String_NotEmpty(t *testing.T) {
2113
t.Parallel()
2214
require.NotEmpty(t, ReadMode.String())
@@ -25,6 +17,14 @@ func TestVshardMode_String_NotEmpty(t *testing.T) {
2517

2618
func TestRouter_RouterRouteAll(t *testing.T) {
2719
t.Parallel()
20+
var emptyRouter = &Router{
21+
cfg: Config{
22+
TotalBucketCount: uint64(10),
23+
Loggerf: emptyLogfProvider,
24+
Metrics: emptyMetricsProvider,
25+
},
26+
}
27+
emptyRouter.setEmptyNameToReplicaset()
2828
m := emptyRouter.RouteAll()
2929
require.Empty(t, m)
3030
}

topology.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
var (
1212
ErrReplicasetExists = fmt.Errorf("replicaset already exists")
1313
ErrReplicasetNotExists = fmt.Errorf("replicaset not exists")
14+
15+
ErrConcurrentTopologyChangeDetected = fmt.Errorf("concurrent topology change detected")
1416
)
1517

1618
// TopologyController is an entity that allows you to interact with the topology.
@@ -33,18 +35,21 @@ func copyMap[K comparable, V any](m map[K]V) map[K]V {
3335
return copy
3436
}
3537

36-
func (r *Router) getNameToReplicaset() map[string]*Replicaset {
37-
r.nameToReplicasetMutex.RLock()
38-
nameToReplicasetRef := r.nameToReplicaset
39-
r.nameToReplicasetMutex.RUnlock()
38+
func (r *Router) setEmptyNameToReplicaset() {
39+
var nameToReplicasetRef map[string]*Replicaset
40+
_ = r.swapNameToReplicaset(nil, &nameToReplicasetRef)
41+
}
4042

41-
return nameToReplicasetRef
43+
func (r *Router) swapNameToReplicaset(old, new *map[string]*Replicaset) error {
44+
if swapped := r.nameToReplicaset.CompareAndSwap(old, new); !swapped {
45+
return ErrConcurrentTopologyChangeDetected
46+
}
47+
return nil
4248
}
4349

44-
func (r *Router) setNameToReplicaset(nameToReplicasetNew map[string]*Replicaset) {
45-
r.nameToReplicasetMutex.Lock()
46-
r.nameToReplicaset = nameToReplicasetNew
47-
r.nameToReplicasetMutex.Unlock()
50+
func (r *Router) getNameToReplicaset() map[string]*Replicaset {
51+
ptr := r.nameToReplicaset.Load()
52+
return *ptr
4853
}
4954

5055
func (r *Router) Topology() TopologyController {
@@ -121,9 +126,8 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
121126
return err
122127
}
123128

124-
nameToReplicasetOld := r.getNameToReplicaset()
125-
126-
if _, ok := nameToReplicasetOld[rsInfo.Name]; ok {
129+
nameToReplicasetOldPtr := r.nameToReplicaset.Load()
130+
if _, ok := (*nameToReplicasetOldPtr)[rsInfo.Name]; ok {
127131
return ErrReplicasetExists
128132
}
129133

@@ -168,14 +172,14 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
168172
}
169173

170174
// Create an entirely new map object
171-
nameToReplicasetNew := copyMap(nameToReplicasetOld)
175+
nameToReplicasetNew := copyMap(*nameToReplicasetOldPtr)
172176
nameToReplicasetNew[rsInfo.Name] = replicaset // add when conn is ready
173177

174-
// We could detect concurrent access to the TopologyController interface
175-
// by comparing references to r.idToReplicaset and idToReplicasetOld.
176-
// But it requires reflection which I prefer to avoid.
177-
// See: https://stackoverflow.com/questions/58636694/how-to-know-if-2-go-maps-reference-the-same-data.
178-
r.setNameToReplicaset(nameToReplicasetNew)
178+
if err = r.swapNameToReplicaset(nameToReplicasetOldPtr, &nameToReplicasetNew); err != nil {
179+
// replicaset has not added, so just close it
180+
_ = replicaset.conn.Close()
181+
return err
182+
}
179183

180184
return nil
181185
}
@@ -198,18 +202,19 @@ func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetI
198202
func (r *Router) RemoveReplicaset(ctx context.Context, rsName string) []error {
199203
r.log().Debugf(ctx, "Trying to remove replicaset %s from router topology", rsName)
200204

201-
nameToReplicasetOld := r.getNameToReplicaset()
202-
203-
rs := nameToReplicasetOld[rsName]
205+
nameToReplicasetOldPtr := r.nameToReplicaset.Load()
206+
rs := (*nameToReplicasetOldPtr)[rsName]
204207
if rs == nil {
205208
return []error{ErrReplicasetNotExists}
206209
}
207210

208211
// Create an entirely new map object
209-
nameToReplicasetNew := copyMap(nameToReplicasetOld)
212+
nameToReplicasetNew := copyMap(*nameToReplicasetOldPtr)
210213
delete(nameToReplicasetNew, rsName)
211214

212-
r.setNameToReplicaset(nameToReplicasetNew)
215+
if err := r.swapNameToReplicaset(nameToReplicasetOldPtr, &nameToReplicasetNew); err != nil {
216+
return []error{err}
217+
}
213218

214219
return rs.conn.CloseGraceful()
215220
}

topology_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ func TestController_AddInstance(t *testing.T) {
2323

2424
t.Run("no such replicaset", func(t *testing.T) {
2525
router := Router{
26-
nameToReplicaset: map[string]*Replicaset{},
2726
cfg: Config{
2827
Loggerf: emptyLogfProvider,
2928
},
3029
}
30+
router.setEmptyNameToReplicaset()
3131

3232
err := router.Topology().AddInstance(ctx, uuid.New().String(), InstanceInfo{
3333
Addr: "127.0.0.1:8060",
@@ -38,11 +38,11 @@ func TestController_AddInstance(t *testing.T) {
3838

3939
t.Run("invalid instance info", func(t *testing.T) {
4040
router := Router{
41-
nameToReplicaset: map[string]*Replicaset{},
4241
cfg: Config{
4342
Loggerf: emptyLogfProvider,
4443
},
4544
}
45+
router.setEmptyNameToReplicaset()
4646

4747
err := router.Topology().AddInstance(ctx, uuid.New().String(), InstanceInfo{})
4848
require.True(t, errors.Is(err, ErrInvalidInstanceInfo))
@@ -54,11 +54,11 @@ func TestController_RemoveInstance_NoSuchReplicaset(t *testing.T) {
5454
ctx := context.Background()
5555

5656
router := Router{
57-
nameToReplicaset: map[string]*Replicaset{},
5857
cfg: Config{
5958
Loggerf: emptyLogfProvider,
6059
},
6160
}
61+
router.setEmptyNameToReplicaset()
6262

6363
err := router.Topology().RemoveInstance(ctx, uuid.New().String(), "")
6464
require.True(t, errors.Is(err, ErrReplicasetNotExists))
@@ -81,15 +81,15 @@ func TestController_RemoveInstance_NoReplicasetNameProvided(t *testing.T) {
8181
mp.On("Remove", mock.Anything).Return(nil)
8282

8383
router := Router{
84-
nameToReplicaset: map[string]*Replicaset{
85-
"replicaset_1": {
86-
conn: mp,
87-
},
88-
},
8984
cfg: Config{
9085
Loggerf: emptyLogfProvider,
9186
},
9287
}
88+
_ = router.swapNameToReplicaset(nil, &map[string]*Replicaset{
89+
"replicaset_1": {
90+
conn: mp,
91+
},
92+
})
9393

9494
err := router.Topology().RemoveInstance(ctx, "", instanceName)
9595
require.NoError(t, err)
@@ -106,13 +106,13 @@ func TestController_RemoveReplicaset(t *testing.T) {
106106
mPool.On("CloseGraceful").Return(nil)
107107

108108
router := Router{
109-
nameToReplicaset: map[string]*Replicaset{
110-
uuidToRemove.String(): {conn: mPool},
111-
},
112109
cfg: Config{
113110
Loggerf: emptyLogfProvider,
114111
},
115112
}
113+
_ = router.swapNameToReplicaset(nil, &map[string]*Replicaset{
114+
uuidToRemove.String(): {conn: mPool},
115+
})
116116

117117
t.Run("no such replicaset", func(t *testing.T) {
118118
t.Parallel()
@@ -132,13 +132,13 @@ func TestRouter_AddReplicaset_AlreadyExists(t *testing.T) {
132132
alreadyExistingRsName := uuid.New().String()
133133

134134
router := Router{
135-
nameToReplicaset: map[string]*Replicaset{
136-
alreadyExistingRsName: {conn: nil},
137-
},
138135
cfg: Config{
139136
Loggerf: emptyLogfProvider,
140137
},
141138
}
139+
_ = router.swapNameToReplicaset(nil, &map[string]*Replicaset{
140+
alreadyExistingRsName: {conn: nil},
141+
})
142142

143143
// Test that such replicaset already exists
144144
err := router.AddReplicaset(ctx, ReplicasetInfo{Name: alreadyExistingRsName}, []InstanceInfo{})
@@ -153,13 +153,13 @@ func TestRouter_AddReplicaset_InvalidReplicaset(t *testing.T) {
153153
alreadyExistingRsName := uuid.New().String()
154154

155155
router := Router{
156-
nameToReplicaset: map[string]*Replicaset{
157-
alreadyExistingRsName: {conn: nil},
158-
},
159156
cfg: Config{
160157
Loggerf: emptyLogfProvider,
161158
},
162159
}
160+
_ = router.swapNameToReplicaset(nil, &map[string]*Replicaset{
161+
alreadyExistingRsName: {conn: nil},
162+
})
163163

164164
// Test that such replicaset already exists
165165
rsInfo := ReplicasetInfo{}

vshard.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package vshard_router //nolint:revive
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"sync/atomic"
87
"time"
98

@@ -30,15 +29,13 @@ type routeMap = []atomic.Pointer[Replicaset]
3029
type Router struct {
3130
cfg Config
3231

33-
// nameToReplicasetMutex guards not the map itself, but the variable idToReplicaset.
34-
// nameToReplicaset is an immutable object by our convention.
32+
// nameToReplicaset is an atomic pointer, that points to an immutable object by our convention.
3533
// Whenever we add or remove a replicaset, we create a new map object.
36-
// nameToReplicaset can be modified only by TopologyController methods.
34+
// Object under nameToReplicaset can be replaced to a new one only by TopologyController methods.
3735
// Assuming that we rarely add or remove some replicaset,
3836
// it should be the simplest and most efficient way of handling concurrent access.
3937
// Additionally, we can safely iterate over a map because it never changes.
40-
nameToReplicasetMutex sync.RWMutex
41-
nameToReplicaset map[string]*Replicaset
38+
nameToReplicaset atomic.Pointer[map[string]*Replicaset]
4239

4340
routeMap atomic.Pointer[routeMap]
4441

@@ -204,10 +201,10 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
204201
}
205202

206203
router := &Router{
207-
cfg: cfg,
208-
nameToReplicaset: make(map[string]*Replicaset),
204+
cfg: cfg,
209205
}
210206
router.setEmptyRouteMap()
207+
router.setEmptyNameToReplicaset()
211208

212209
err = cfg.TopologyProvider.Init(router.Topology())
213210
if err != nil {

0 commit comments

Comments
 (0)