Skip to content

Commit ae1cf2b

Browse files
committed
We now use the interface correctly to create and ping shards in one go
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent c96e7cb commit ae1cf2b

File tree

4 files changed

+42
-138
lines changed

4 files changed

+42
-138
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package pinger
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
8+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
9+
"go.uber.org/yarpc"
10+
"go.uber.org/zap"
11+
)
12+
13+
const (
14+
pingTimeout = 5 * time.Second
15+
)
16+
17+
func PingShard(ctx context.Context, canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient, logger *zap.Logger, namespace, shardKey string) {
18+
request := &sharddistributorv1.PingRequest{
19+
ShardKey: shardKey,
20+
Namespace: namespace,
21+
}
22+
23+
ctx, cancel := context.WithTimeout(ctx, pingTimeout)
24+
defer cancel()
25+
26+
response, err := canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, namespace))
27+
if err != nil {
28+
logger.Error("Failed to ping shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.Error(err))
29+
}
30+
31+
// Verify response
32+
if !response.GetOwnsShard() {
33+
logger.Warn("Executor does not own shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
34+
}
35+
36+
logger.Info("Successfully pinged shard owner", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
37+
}

service/sharddistributor/canary/pinger/pinger.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,18 @@ import (
88
"time"
99

1010
"go.uber.org/fx"
11-
"go.uber.org/yarpc"
1211
"go.uber.org/zap"
1312

1413
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
1514
"github.com/uber/cadence/common/backoff"
1615
"github.com/uber/cadence/common/clock"
17-
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1816
)
1917

2018
//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient
2119

2220
const (
2321
pingInterval = 1 * time.Second
2422
pingJitterCoeff = 0.1 // 10% jitter
25-
pingTimeout = 5 * time.Second
2623
)
2724

2825
// Pinger periodically pings shard owners in the fixed namespace
@@ -96,23 +93,5 @@ func (p *Pinger) pingRandomShard() {
9693
shardNum := rand.Intn(p.numShards)
9794
shardKey := fmt.Sprintf("%d", shardNum)
9895

99-
request := &sharddistributorv1.PingRequest{
100-
ShardKey: shardKey,
101-
Namespace: p.namespace,
102-
}
103-
104-
ctx, cancel := context.WithTimeout(p.ctx, pingTimeout)
105-
defer cancel()
106-
107-
response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace))
108-
if err != nil {
109-
p.logger.Error("Failed to ping shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.Error(err))
110-
}
111-
112-
// Verify response
113-
if !response.GetOwnsShard() {
114-
p.logger.Warn("Executor does not own shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
115-
}
116-
117-
p.logger.Info("Successfully pinged shard owner", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
96+
PingShard(p.ctx, p.canaryClient, p.logger, p.namespace, shardKey)
11897
}

service/sharddistributor/canary/processorephemeral/shardcreator.go

Lines changed: 2 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,16 @@ package processorephemeral
22

33
import (
44
"context"
5-
"fmt"
65
"sync"
76
"time"
87

98
"github.com/google/uuid"
109
"go.uber.org/fx"
11-
"go.uber.org/yarpc"
1210
"go.uber.org/zap"
1311

1412
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
1513
"github.com/uber/cadence/common/clock"
16-
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
14+
"github.com/uber/cadence/service/sharddistributor/canary/pinger"
1715
)
1816

1917
//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock_test.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient
@@ -26,7 +24,6 @@ const (
2624
type ShardCreator struct {
2725
logger *zap.Logger
2826
timeSource clock.TimeSource
29-
spectators *spectatorclient.Spectators
3027
canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
3128
namespaces []string
3229

@@ -40,7 +37,6 @@ type ShardCreatorParams struct {
4037

4138
Logger *zap.Logger
4239
TimeSource clock.TimeSource
43-
Spectators *spectatorclient.Spectators
4440
CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
4541
}
4642

@@ -49,7 +45,6 @@ func NewShardCreator(params ShardCreatorParams, namespaces []string) *ShardCreat
4945
return &ShardCreator{
5046
logger: params.Logger,
5147
timeSource: params.TimeSource,
52-
spectators: params.Spectators,
5348
canaryClient: params.CanaryClient,
5449
stopChan: make(chan struct{}),
5550
goRoutineWg: sync.WaitGroup{},
@@ -100,83 +95,8 @@ func (s *ShardCreator) process(ctx context.Context) {
10095
shardKey := uuid.New().String()
10196
s.logger.Info("Creating shard", zap.String("shardKey", shardKey), zap.String("namespace", namespace))
10297

103-
// Get spectator for this namespace
104-
spectator, err := s.spectators.ForNamespace(namespace)
105-
if err != nil {
106-
s.logger.Warn("No spectator for namespace, skipping shard creation",
107-
zap.String("namespace", namespace),
108-
zap.String("shardKey", shardKey),
109-
zap.Error(err))
110-
continue
111-
}
112-
113-
// Create shard and get owner via spectator (spectator will create if not exists)
114-
owner, err := spectator.GetShardOwner(ctx, shardKey)
115-
if err != nil {
116-
s.logger.Error("Failed to get/create shard owner",
117-
zap.Error(err),
118-
zap.String("namespace", namespace),
119-
zap.String("shardKey", shardKey))
120-
continue
121-
}
122-
123-
s.logger.Info("Shard created, got owner from spectator",
124-
zap.String("shardKey", shardKey),
125-
zap.String("namespace", namespace),
126-
zap.String("executor_id", owner.ExecutorID))
127-
128-
// Now ping the owner to verify
129-
if err := s.pingShardOwner(ctx, owner, namespace, shardKey); err != nil {
130-
s.logger.Error("Failed to ping shard owner",
131-
zap.Error(err),
132-
zap.String("namespace", namespace),
133-
zap.String("shardKey", shardKey))
134-
}
98+
pinger.PingShard(ctx, s.canaryClient, s.logger, namespace, shardKey)
13599
}
136100
}
137101
}
138102
}
139-
140-
func (s *ShardCreator) pingShardOwner(ctx context.Context, owner *spectatorclient.ShardOwner, namespace, shardKey string) error {
141-
s.logger.Debug("Pinging shard owner after creation",
142-
zap.String("namespace", namespace),
143-
zap.String("shardKey", shardKey),
144-
zap.String("expected_executor_id", owner.ExecutorID))
145-
146-
// Create ping request
147-
request := &sharddistributorv1.PingRequest{
148-
ShardKey: shardKey,
149-
Namespace: namespace,
150-
}
151-
152-
// SIMPLE CANARY CODE: Just pass the shard key and namespace!
153-
// The SpectatorPeerChooser library code handles routing to the right executor
154-
response, err := s.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, namespace))
155-
if err != nil {
156-
return fmt.Errorf("ping rpc failed: %w", err)
157-
}
158-
159-
// Verify response matches the owner we got from spectator
160-
if response.GetExecutorId() != owner.ExecutorID {
161-
s.logger.Warn("Executor ID mismatch",
162-
zap.String("namespace", namespace),
163-
zap.String("shardKey", shardKey),
164-
zap.String("expected", owner.ExecutorID),
165-
zap.String("actual", response.GetExecutorId()))
166-
}
167-
168-
if !response.GetOwnsShard() {
169-
s.logger.Warn("Executor does not own shard",
170-
zap.String("namespace", namespace),
171-
zap.String("shardKey", shardKey),
172-
zap.String("executor_id", response.GetExecutorId()))
173-
return fmt.Errorf("executor %s does not own shard %s", response.GetExecutorId(), shardKey)
174-
}
175-
176-
s.logger.Info("Successfully verified shard owner after creation",
177-
zap.String("namespace", namespace),
178-
zap.String("shardKey", shardKey),
179-
zap.String("executor_id", response.GetExecutorId()))
180-
181-
return nil
182-
}

service/sharddistributor/canary/processorephemeral/shardcreator_test.go

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,40 +11,18 @@ import (
1111

1212
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
1313
"github.com/uber/cadence/common/clock"
14-
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1514
)
1615

17-
func TestShardCreator_Lifecycle(t *testing.T) {
16+
func TestShardCreator_PingsShards(t *testing.T) {
1817
goleak.VerifyNone(t)
1918

2019
logger := zaptest.NewLogger(t)
2120
timeSource := clock.NewMockedTimeSource()
2221
ctrl := gomock.NewController(t)
2322

2423
namespace := "test-namespace"
25-
26-
mockSpectator := spectatorclient.NewMockSpectator(ctrl)
2724
mockCanaryClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
2825

29-
// First call fails - no ping should happen
30-
firstCall := mockSpectator.EXPECT().
31-
GetShardOwner(gomock.Any(), gomock.Any()).
32-
DoAndReturn(func(ctx interface{}, shardKey string) (*spectatorclient.ShardOwner, error) {
33-
assert.NotEmpty(t, shardKey)
34-
return nil, assert.AnError
35-
})
36-
37-
// Second call succeeds - ping should happen
38-
mockSpectator.EXPECT().
39-
GetShardOwner(gomock.Any(), gomock.Any()).
40-
DoAndReturn(func(ctx interface{}, shardKey string) (*spectatorclient.ShardOwner, error) {
41-
assert.NotEmpty(t, shardKey)
42-
return &spectatorclient.ShardOwner{
43-
ExecutorID: "executor-1",
44-
}, nil
45-
}).
46-
After(firstCall)
47-
4826
// Ping happens after successful GetShardOwner
4927
mockCanaryClient.EXPECT().
5028
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
@@ -57,27 +35,17 @@ func TestShardCreator_Lifecycle(t *testing.T) {
5735
}, nil
5836
})
5937

60-
spectators := map[string]spectatorclient.Spectator{
61-
namespace: mockSpectator,
62-
}
63-
6438
params := ShardCreatorParams{
6539
Logger: logger,
6640
TimeSource: timeSource,
67-
Spectators: spectators,
6841
CanaryClient: mockCanaryClient,
6942
}
7043

7144
creator := NewShardCreator(params, []string{namespace})
7245
creator.Start()
7346

47+
// Wait for the goroutine to start and do it's ping
7448
timeSource.BlockUntil(1)
75-
76-
// First cycle - GetShardOwner fails, no ping
77-
timeSource.Advance(shardCreationInterval + 100*time.Millisecond)
78-
time.Sleep(10 * time.Millisecond)
79-
80-
// Second cycle - GetShardOwner succeeds, ping happens
8149
timeSource.Advance(shardCreationInterval + 100*time.Millisecond)
8250
time.Sleep(10 * time.Millisecond)
8351

0 commit comments

Comments
 (0)