Skip to content

Commit 34ba5cf

Browse files
terutrueian
andcommitted
feat: support connection lifetime for single client (#727)
* feat: add connection lifetime option to single client * Remove mutex and timer flag for connection lifetime timer * Retry wire accquition when failed to stop connection lifetime timer * Add timer test to pipe * Add test for reseting timer and stopping timer when using pool * Remove p.StopTimer() from p.Close() Co-authored-by: Rueian <[email protected]> * Forced to retry when errConnExpired * Remove hasConnLftm and check resps[0] to retry for multi cmds * Recover connection lifetime error in the middle of calls * Fix the handling of connection lifetime error of DoMultiCache * perf: apply fieldaligments Signed-off-by: Rueian <[email protected]> --------- Signed-off-by: Rueian <[email protected]> Co-authored-by: Rueian <[email protected]>
1 parent ee3375d commit 34ba5cf

File tree

11 files changed

+374
-10
lines changed

11 files changed

+374
-10
lines changed

client.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type singleClient struct {
1515
stop uint32
1616
cmd Builder
1717
retry bool
18+
hasLftm bool
1819
DisableCache bool
1920
}
2021

@@ -32,11 +33,11 @@ func newSingleClient(opt *ClientOption, prev conn, connFn connFn, retryer retryH
3233
if err := conn.Dial(); err != nil {
3334
return nil, err
3435
}
35-
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, retryer), nil
36+
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, retryer, opt.ConnLifetime > 0), nil
3637
}
3738

38-
func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool, retryer retryHandler) *singleClient {
39-
return &singleClient{cmd: builder, conn: conn, retry: retry, retryHandler: retryer, DisableCache: disableCache}
39+
func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool, retryer retryHandler, hasLftm bool) *singleClient {
40+
return &singleClient{cmd: builder, conn: conn, retry: retry, retryHandler: retryer, hasLftm: hasLftm, DisableCache: disableCache}
4041
}
4142

4243
func (c *singleClient) B() Builder {
@@ -47,6 +48,9 @@ func (c *singleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult)
4748
attempts := 1
4849
retry:
4950
resp = c.conn.Do(ctx, cmd)
51+
if resp.Error() == errConnExpired {
52+
goto retry
53+
}
5054
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) {
5155
shouldRetry := c.retryHandler.WaitOrSkipRetry(
5256
ctx, attempts, cmd, resp.Error(),
@@ -86,6 +90,22 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [
8690
attempts := 1
8791
retry:
8892
resps = c.conn.DoMulti(ctx, multi...).s
93+
if c.hasLftm {
94+
var ml []Completed
95+
recover:
96+
ml = ml[:0]
97+
for i, resp := range resps {
98+
if resp.Error() == errConnExpired {
99+
ml = multi[i:]
100+
break
101+
}
102+
}
103+
if len(ml) > 0 {
104+
rs := c.conn.DoMulti(ctx, ml...).s
105+
resps = append(resps[:len(resps)-len(rs)], rs...)
106+
goto recover
107+
}
108+
}
89109
if c.retry && allReadOnly(multi) {
90110
for i, resp := range resps {
91111
if c.isRetryable(resp.Error(), ctx) {
@@ -114,6 +134,22 @@ func (c *singleClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
114134
attempts := 1
115135
retry:
116136
resps = c.conn.DoMultiCache(ctx, multi...).s
137+
if c.hasLftm {
138+
var ml []CacheableTTL
139+
recover:
140+
ml = ml[:0]
141+
for i, resp := range resps {
142+
if resp.Error() == errConnExpired {
143+
ml = multi[i:]
144+
break
145+
}
146+
}
147+
if len(ml) > 0 {
148+
rs := c.conn.DoMultiCache(ctx, ml...).s
149+
resps = append(resps[:len(resps)-len(rs)], rs...)
150+
goto recover
151+
}
152+
}
117153
if c.retry {
118154
for i, resp := range resps {
119155
if c.isRetryable(resp.Error(), ctx) {
@@ -139,6 +175,9 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura
139175
attempts := 1
140176
retry:
141177
resp = c.conn.DoCache(ctx, cmd, ttl)
178+
if resp.Error() == errConnExpired {
179+
goto retry
180+
}
142181
if c.retry && c.isRetryable(resp.Error(), ctx) {
143182
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
144183
if shouldRetry {
@@ -156,6 +195,9 @@ func (c *singleClient) Receive(ctx context.Context, subscribe Completed, fn func
156195
attempts := 1
157196
retry:
158197
err = c.conn.Receive(ctx, subscribe, fn)
198+
if err == errConnExpired {
199+
goto retry
200+
}
159201
if c.retry {
160202
if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) {
161203
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)

client_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,130 @@ func TestSingleClientLoadingRetry(t *testing.T) {
14241424
})
14251425
}
14261426

1427+
func TestSingleClientConnLifetime(t *testing.T) {
1428+
defer ShouldNotLeaked(SetupLeakDetection())
1429+
1430+
setup := func() (*singleClient, *mockConn) {
1431+
m := &mockConn{}
1432+
client, err := newSingleClient(
1433+
&ClientOption{InitAddress: []string{""}, ConnLifetime: 5 * time.Second},
1434+
m,
1435+
func(dst string, opt *ClientOption) conn { return m },
1436+
newRetryer(defaultRetryDelayFn),
1437+
)
1438+
if err != nil {
1439+
t.Fatalf("unexpected err %v", err)
1440+
}
1441+
return client, m
1442+
}
1443+
1444+
t.Run("Do", func(t *testing.T) {
1445+
client, m := setup()
1446+
m.DoFn = func(cmd Completed) RedisResult {
1447+
return newResult(strmsg('+', "OK"), nil)
1448+
}
1449+
if v, err := client.Do(context.Background(), client.B().Get().Key("Do").Build()).ToString(); err != nil || v != "OK" {
1450+
t.Fatalf("unexpected response %v %v", v, err)
1451+
}
1452+
})
1453+
1454+
t.Run("DoMulti", func(t *testing.T) {
1455+
client, m := setup()
1456+
m.DoMultiFn = func(multi ...Completed) *redisresults {
1457+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1458+
}
1459+
if v, err := client.DoMulti(context.Background(), client.B().Get().Key("Do").Build())[0].ToString(); err != nil || v != "OK" {
1460+
t.Fatalf("unexpected response %v %v", v, err)
1461+
}
1462+
})
1463+
1464+
t.Run("DoMulti ConnLifetime - at the head of processing", func(t *testing.T) {
1465+
client, m := setup()
1466+
attempts := 0
1467+
m.DoMultiFn = func(multi ...Completed) *redisresults {
1468+
attempts++
1469+
if attempts == 1 {
1470+
return &redisresults{s: []RedisResult{newErrResult(errConnExpired)}}
1471+
}
1472+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1473+
}
1474+
if v, err := client.DoMulti(context.Background(), client.B().Get().Key("Do").Build())[0].ToString(); err != nil || v != "OK" {
1475+
t.Fatalf("unexpected response %v %v", v, err)
1476+
}
1477+
})
1478+
1479+
t.Run("DoMulti ConnLifetime in the middle of processing", func(t *testing.T) {
1480+
client, m := setup()
1481+
attempts := 0
1482+
m.DoMultiFn = func(multi ...Completed) *redisresults {
1483+
attempts++
1484+
if attempts == 1 {
1485+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil), newErrResult(errConnExpired)}}
1486+
}
1487+
// recover the failure of the first call
1488+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1489+
}
1490+
resps := client.DoMulti(context.Background(), client.B().Get().Key("Do").Build(), client.B().Get().Key("Do").Build())
1491+
if len(resps) != 2 {
1492+
t.Errorf("unexpected response length %v", len(resps))
1493+
}
1494+
for _, resp := range resps {
1495+
if v, err := resp.ToString(); err != nil || v != "OK" {
1496+
t.Fatalf("unexpected response %v %v", v, err)
1497+
}
1498+
}
1499+
})
1500+
1501+
t.Run("DoMultiCache", func(t *testing.T) {
1502+
client, m := setup()
1503+
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
1504+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1505+
}
1506+
cmd := client.B().Get().Key("Do").Cache()
1507+
if v, err := client.DoMultiCache(context.Background(), CT(cmd, 0))[0].ToString(); err != nil || v != "OK" {
1508+
t.Fatalf("unexpected response %v %v", v, err)
1509+
}
1510+
})
1511+
1512+
t.Run("DoMultiCache ConnLifetime - at the head of processing", func(t *testing.T) {
1513+
client, m := setup()
1514+
attempts := 0
1515+
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
1516+
attempts++
1517+
if attempts == 1 {
1518+
return &redisresults{s: []RedisResult{newErrResult(errConnExpired)}}
1519+
}
1520+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1521+
}
1522+
cmd := client.B().Get().Key("Do").Cache()
1523+
if v, err := client.DoMultiCache(context.Background(), CT(cmd, 0))[0].ToString(); err != nil || v != "OK" {
1524+
t.Fatalf("unexpected response %v %v", v, err)
1525+
}
1526+
})
1527+
1528+
t.Run("DoMultiCache ConnLifetime in the middle of processing", func(t *testing.T) {
1529+
client, m := setup()
1530+
attempts := 0
1531+
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
1532+
attempts++
1533+
if attempts == 1 {
1534+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil), newErrResult(errConnExpired)}}
1535+
}
1536+
// recover the failure of the first call
1537+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1538+
}
1539+
resps := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("Do").Cache(), 0), CT(client.B().Get().Key("Do").Cache(), 0))
1540+
if len(resps) != 2 {
1541+
t.Errorf("unexpected response length %v", len(resps))
1542+
}
1543+
for _, resp := range resps {
1544+
if v, err := resp.ToString(); err != nil || v != "OK" {
1545+
t.Fatalf("unexpected response %v %v", v, err)
1546+
}
1547+
}
1548+
})
1549+
}
1550+
14271551
func BenchmarkSingleClient_DoCache(b *testing.B) {
14281552
ctx := context.Background()
14291553
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}, Dialer: net.Dialer{KeepAlive: -1}})

cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,7 @@ func (c *clusterClient) Nodes() map[string]Client {
11991199
disableCache := c.opt != nil && c.opt.DisableCache
12001200
for addr, cc := range c.conns {
12011201
if !cc.hidden {
1202-
_nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler)
1202+
_nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler, false)
12031203
}
12041204
}
12051205
c.mu.RUnlock()

mux_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,8 @@ type mockWire struct {
11311131
VersionFn func() int
11321132
ErrorFn func() error
11331133
CloseFn func()
1134+
StopTimerFn func() bool
1135+
ResetTimerFn func() bool
11341136

11351137
CleanSubscriptionsFn func()
11361138
SetPubSubHooksFn func(hooks PubSubHooks) <-chan error
@@ -1205,6 +1207,20 @@ func (m *mockWire) SetOnCloseHook(fn func(error)) {
12051207
}
12061208
}
12071209

1210+
func (m *mockWire) StopTimer() bool {
1211+
if m.StopTimerFn != nil {
1212+
return m.StopTimerFn()
1213+
}
1214+
return true
1215+
}
1216+
1217+
func (m *mockWire) ResetTimer() bool {
1218+
if m.ResetTimerFn != nil {
1219+
return m.ResetTimerFn()
1220+
}
1221+
return true
1222+
}
1223+
12081224
func (m *mockWire) Info() map[string]RedisMessage {
12091225
if m.InfoFn != nil {
12101226
return m.InfoFn()

pipe.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type wire interface {
5555
CleanSubscriptions()
5656
SetPubSubHooks(hooks PubSubHooks) <-chan error
5757
SetOnCloseHook(fn func(error))
58+
StopTimer() bool
59+
ResetTimer() bool
5860
}
5961

6062
var _ wire = (*pipe)(nil)
@@ -77,11 +79,13 @@ type pipe struct {
7779
psubs *subs // pubsub pmessage subscriptions
7880
pingTimer *time.Timer // timer for background ping
7981
info map[string]RedisMessage
82+
lftmTimer *time.Timer // lifetime timer
8083
timeout time.Duration
8184
pinggap time.Duration
8285
maxFlushDelay time.Duration
83-
r2mu sync.Mutex
8486
wrCounter atomic.Uint64
87+
lftm time.Duration // lifetime
88+
r2mu sync.Mutex
8589
version int32
8690
blcksig int32
8791
state int32
@@ -328,6 +332,10 @@ func _newPipe(ctx context.Context, connFn func(context.Context) (net.Conn, error
328332
p.backgroundPing()
329333
}
330334
}
335+
if option.ConnLifetime > 0 {
336+
p.lftm = option.ConnLifetime
337+
p.lftmTimer = time.AfterFunc(option.ConnLifetime, p.expired)
338+
}
331339
return p, nil
332340
}
333341

@@ -344,6 +352,7 @@ func (p *pipe) _exit(err error) {
344352
p.error.CompareAndSwap(nil, &errs{error: err})
345353
atomic.CompareAndSwapInt32(&p.state, 1, 2) // stop accepting new requests
346354
_ = p.conn.Close() // force both read & write goroutine to exit
355+
p.StopTimer()
347356
p.clhks.Load().(func(error))(err)
348357
}
349358

@@ -495,6 +504,9 @@ func (p *pipe) _backgroundRead() (err error) {
495504

496505
defer func() {
497506
resp := newErrResult(err)
507+
if e := p.Error(); e == errConnExpired {
508+
resp = newErrResult(e)
509+
}
498510
if err != nil && ff < len(multi) {
499511
for ; ff < len(resps); ff++ {
500512
resps[ff] = resp
@@ -1633,6 +1645,25 @@ func (p *pipe) Close() {
16331645
p.r2mu.Unlock()
16341646
}
16351647

1648+
func (p *pipe) StopTimer() bool {
1649+
if p.lftmTimer == nil {
1650+
return true
1651+
}
1652+
return p.lftmTimer.Stop()
1653+
}
1654+
1655+
func (p *pipe) ResetTimer() bool {
1656+
if p.lftmTimer == nil || p.Error() != nil {
1657+
return true
1658+
}
1659+
return p.lftmTimer.Reset(p.lftm)
1660+
}
1661+
1662+
func (p *pipe) expired() {
1663+
p.error.CompareAndSwap(nil, errExpired)
1664+
p.Close()
1665+
}
1666+
16361667
type pshks struct {
16371668
hooks PubSubHooks
16381669
close chan error
@@ -1672,6 +1703,9 @@ const (
16721703
)
16731704

16741705
var cacheMark = &(RedisMessage{})
1675-
var errClosing = &errs{error: ErrClosing}
1706+
var (
1707+
errClosing = &errs{error: ErrClosing}
1708+
errExpired = &errs{error: errConnExpired}
1709+
)
16761710

16771711
type errs struct{ error }

0 commit comments

Comments
 (0)