Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions service/sharddistributor/canary/pinger/canary_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

118 changes: 118 additions & 0 deletions service/sharddistributor/canary/pinger/pinger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package pinger

import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"go.uber.org/fx"
"go.uber.org/yarpc"
"go.uber.org/zap"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
)

//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient

const (
pingInterval = 1 * time.Second
pingJitterCoeff = 0.1 // 10% jitter
pingTimeout = 5 * time.Second
)

// Pinger periodically pings shard owners in the fixed namespace
type Pinger struct {
logger *zap.Logger
timeSource clock.TimeSource
canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
namespace string
numShards int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

// Params are the parameters for creating a Pinger
type Params struct {
fx.In

Logger *zap.Logger
TimeSource clock.TimeSource
CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
}

// NewPinger creates a new Pinger for the fixed namespace
func NewPinger(params Params, namespace string, numShards int) *Pinger {
return &Pinger{
logger: params.Logger,
timeSource: params.TimeSource,
canaryClient: params.CanaryClient,
namespace: namespace,
numShards: numShards,
}
}

// Start begins the periodic ping loop
func (p *Pinger) Start(ctx context.Context) {
p.logger.Info("Starting canary pinger", zap.String("namespace", p.namespace), zap.Int("num_shards", p.numShards))
p.ctx, p.cancel = context.WithCancel(context.WithoutCancel(ctx))
p.wg.Add(1)
go p.pingLoop()
}

// Stop stops the ping loop
func (p *Pinger) Stop() {
if p.cancel != nil {
p.cancel()
}
p.wg.Wait()
}

func (p *Pinger) pingLoop() {
defer p.wg.Done()

ticker := p.timeSource.NewTicker(backoff.JitDuration(pingInterval, pingJitterCoeff))
defer ticker.Stop()

for {
select {
case <-p.ctx.Done():
p.logger.Info("Pinger context done, stopping")
return
case <-ticker.Chan():
p.pingRandomShard()
ticker.Reset(backoff.JitDuration(pingInterval, pingJitterCoeff))
}
}
}

// Pings a random shard in the namespace and logs the results
func (p *Pinger) pingRandomShard() {
shardNum := rand.Intn(p.numShards)
shardKey := fmt.Sprintf("%d", shardNum)

request := &sharddistributorv1.PingRequest{
ShardKey: shardKey,
Namespace: p.namespace,
}

ctx, cancel := context.WithTimeout(p.ctx, pingTimeout)
defer cancel()

response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace))
if err != nil {
p.logger.Error("Failed to ping shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.Error(err))
}

// Verify response
if !response.GetOwnsShard() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do we expect this? is it in case spectator client in the peerchooser has outdated info?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - it's also a test. This should rarely happen, so if canary sees this a lot we have problems.

p.logger.Warn("Executor does not own shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
}

p.logger.Info("Successfully pinged shard owner", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
}
97 changes: 97 additions & 0 deletions service/sharddistributor/canary/pinger/pinger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package pinger

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/common/clock"
)

func TestPingerStartStop(t *testing.T) {
defer goleak.VerifyNone(t)

ctrl := gomock.NewController(t)
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)

pinger := NewPinger(Params{
Logger: zap.NewNop(),
TimeSource: clock.NewRealTimeSource(),
CanaryClient: mockClient,
}, "test-ns", 10)

pinger.Start(context.Background())
pinger.Stop()
}

func TestPingerPingRandomShard(t *testing.T) {
defer goleak.VerifyNone(t)

cases := []struct {
name string
setupClientMock func(*MockShardDistributorExecutorCanaryAPIYARPCClient)
expectedLog string
}{
{
name: "owns shard",
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
mockClient.EXPECT().Ping(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&sharddistributorv1.PingResponse{
OwnsShard: true,
ExecutorId: "127.0.0.1:7953",
}, nil)
},
expectedLog: "Successfully pinged shard owner",
},
{
name: "does not own shard",
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
mockClient.EXPECT().
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&sharddistributorv1.PingResponse{
OwnsShard: false,
ExecutorId: "127.0.0.1:7953",
}, nil)
},
expectedLog: "Executor does not own shard",
},
{
name: "RPC error",
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
mockClient.EXPECT().
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, errors.New("network error"))
},
expectedLog: "Failed to ping shard",
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
zapCore, logs := observer.New(zap.InfoLevel)
logger := zap.New(zapCore)

pinger := NewPinger(Params{
Logger: logger,
TimeSource: clock.NewRealTimeSource(),
CanaryClient: mockClient,
}, "test-ns", 10)
pinger.ctx = context.Background()

tt.setupClientMock(mockClient)

pinger.pingRandomShard()

assert.Equal(t, 1, logs.FilterMessage(tt.expectedLog).Len())
})
}
}
Loading