Skip to content

Commit 30fc04e

Browse files
committed
Cleaned up the pinger and it's tests
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 47fc464 commit 30fc04e

File tree

2 files changed

+67
-109
lines changed

2 files changed

+67
-109
lines changed

service/sharddistributor/canary/pinger/pinger.go

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,6 @@ func (p *Pinger) pingLoop() {
7979
ticker := p.timeSource.NewTicker(backoff.JitDuration(pingInterval, pingJitterCoeff))
8080
defer ticker.Stop()
8181

82-
p.logger.Info("Starting canary pinger",
83-
zap.String("namespace", p.namespace),
84-
zap.Int("num_shards", p.numShards))
85-
8682
for {
8783
select {
8884
case <-p.ctx.Done():
@@ -95,48 +91,28 @@ func (p *Pinger) pingLoop() {
9591
}
9692
}
9793

94+
// Pings a random shard in the namespace and logs the results
9895
func (p *Pinger) pingRandomShard() {
99-
// Pick a random shard number
10096
shardNum := rand.Intn(p.numShards)
10197
shardKey := fmt.Sprintf("%d", shardNum)
10298

103-
if err := p.pingShard(shardKey); err != nil {
104-
p.logger.Error("Failed to ping shard",
105-
zap.String("namespace", p.namespace),
106-
zap.String("shard_key", shardKey),
107-
zap.Error(err))
108-
}
109-
}
110-
111-
func (p *Pinger) pingShard(shardKey string) error {
112-
// Create ping request
11399
request := &sharddistributorv1.PingRequest{
114100
ShardKey: shardKey,
115101
Namespace: p.namespace,
116102
}
117103

118-
// Create context with deadline for the RPC call
119104
ctx, cancel := context.WithTimeout(p.ctx, pingTimeout)
120105
defer cancel()
121106

122107
response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace))
123108
if err != nil {
124-
return fmt.Errorf("ping rpc failed: %w", err)
109+
p.logger.Error("Failed to ping shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.Error(err))
125110
}
126111

127112
// Verify response
128113
if !response.GetOwnsShard() {
129-
p.logger.Warn("Executor does not own shard",
130-
zap.String("namespace", p.namespace),
131-
zap.String("shard_key", shardKey),
132-
zap.String("executor_id", response.GetExecutorId()))
133-
return fmt.Errorf("executor %s does not own shard %s", response.GetExecutorId(), shardKey)
114+
p.logger.Warn("Executor does not own shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
134115
}
135116

136-
p.logger.Info("Successfully pinged shard owner",
137-
zap.String("namespace", p.namespace),
138-
zap.String("shard_key", shardKey),
139-
zap.String("executor_id", response.GetExecutorId()))
140-
141-
return nil
117+
p.logger.Info("Successfully pinged shard owner", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
142118
}

service/sharddistributor/canary/pinger/pinger_test.go

Lines changed: 63 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66
"testing"
77

88
"github.com/stretchr/testify/assert"
9-
"github.com/stretchr/testify/require"
109
"go.uber.org/goleak"
1110
"go.uber.org/mock/gomock"
1211
"go.uber.org/zap"
12+
"go.uber.org/zap/zaptest/observer"
1313

1414
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
1515
"github.com/uber/cadence/common/clock"
@@ -31,85 +31,67 @@ func TestPingerStartStop(t *testing.T) {
3131
pinger.Stop()
3232
}
3333

34-
func TestPingShard_Success(t *testing.T) {
35-
ctrl := gomock.NewController(t)
36-
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
37-
38-
pinger := NewPinger(Params{
39-
Logger: zap.NewNop(),
40-
TimeSource: clock.NewRealTimeSource(),
41-
CanaryClient: mockClient,
42-
}, "test-ns", 10)
43-
pinger.ctx, pinger.cancel = context.WithCancel(context.Background())
44-
defer pinger.cancel()
45-
46-
mockClient.EXPECT().
47-
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
48-
Return(&sharddistributorv1.PingResponse{
49-
OwnsShard: true,
50-
ExecutorId: "127.0.0.1:7953",
51-
}, nil)
52-
53-
err := pinger.pingShard("5")
54-
assert.NoError(t, err)
55-
}
56-
57-
func TestPingShard_DoesNotOwnShard(t *testing.T) {
58-
ctrl := gomock.NewController(t)
59-
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
60-
61-
pinger := NewPinger(Params{
62-
Logger: zap.NewNop(),
63-
TimeSource: clock.NewRealTimeSource(),
64-
CanaryClient: mockClient,
65-
}, "test-ns", 10)
66-
pinger.ctx, pinger.cancel = context.WithCancel(context.Background())
67-
defer pinger.cancel()
68-
69-
mockClient.EXPECT().
70-
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
71-
Return(&sharddistributorv1.PingResponse{
72-
OwnsShard: false,
73-
ExecutorId: "127.0.0.1:7953",
74-
}, nil)
75-
76-
err := pinger.pingShard("5")
77-
assert.Error(t, err)
78-
assert.Contains(t, err.Error(), "does not own shard")
79-
}
80-
81-
func TestPingShard_RPCError(t *testing.T) {
82-
ctrl := gomock.NewController(t)
83-
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
84-
85-
pinger := NewPinger(Params{
86-
Logger: zap.NewNop(),
87-
TimeSource: clock.NewRealTimeSource(),
88-
CanaryClient: mockClient,
89-
}, "test-ns", 10)
90-
pinger.ctx, pinger.cancel = context.WithCancel(context.Background())
91-
defer pinger.cancel()
92-
93-
mockClient.EXPECT().
94-
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
95-
Return(nil, errors.New("network error"))
96-
97-
err := pinger.pingShard("5")
98-
assert.Error(t, err)
99-
assert.Contains(t, err.Error(), "ping rpc failed")
100-
}
101-
102-
func TestNewPinger(t *testing.T) {
103-
ctrl := gomock.NewController(t)
104-
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
105-
106-
pinger := NewPinger(Params{
107-
Logger: zap.NewNop(),
108-
TimeSource: clock.NewRealTimeSource(),
109-
CanaryClient: mockClient,
110-
}, "test-ns", 100)
34+
func TestPingerPingRandomShard(t *testing.T) {
35+
defer goleak.VerifyNone(t)
11136

112-
require.NotNil(t, pinger)
113-
assert.Equal(t, "test-ns", pinger.namespace)
114-
assert.Equal(t, 100, pinger.numShards)
37+
cases := []struct {
38+
name string
39+
setupClientMock func(*MockShardDistributorExecutorCanaryAPIYARPCClient)
40+
expectedLog string
41+
}{
42+
{
43+
name: "owns shard",
44+
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
45+
mockClient.EXPECT().Ping(gomock.Any(), gomock.Any(), gomock.Any()).
46+
Return(&sharddistributorv1.PingResponse{
47+
OwnsShard: true,
48+
ExecutorId: "127.0.0.1:7953",
49+
}, nil)
50+
},
51+
expectedLog: "Successfully pinged shard owner",
52+
},
53+
{
54+
name: "does not own shard",
55+
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
56+
mockClient.EXPECT().
57+
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
58+
Return(&sharddistributorv1.PingResponse{
59+
OwnsShard: false,
60+
ExecutorId: "127.0.0.1:7953",
61+
}, nil)
62+
},
63+
expectedLog: "Executor does not own shard",
64+
},
65+
{
66+
name: "RPC error",
67+
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
68+
mockClient.EXPECT().
69+
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
70+
Return(nil, errors.New("network error"))
71+
},
72+
expectedLog: "Failed to ping shard",
73+
},
74+
}
75+
76+
for _, tt := range cases {
77+
t.Run(tt.name, func(t *testing.T) {
78+
ctrl := gomock.NewController(t)
79+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
80+
zapCore, logs := observer.New(zap.InfoLevel)
81+
logger := zap.New(zapCore)
82+
83+
pinger := NewPinger(Params{
84+
Logger: logger,
85+
TimeSource: clock.NewRealTimeSource(),
86+
CanaryClient: mockClient,
87+
}, "test-ns", 10)
88+
pinger.ctx = context.Background()
89+
90+
tt.setupClientMock(mockClient)
91+
92+
pinger.pingRandomShard()
93+
94+
assert.Equal(t, 1, logs.FilterMessage(tt.expectedLog).Len())
95+
})
96+
}
11597
}

0 commit comments

Comments
 (0)