Skip to content

Commit 9d3dc8a

Browse files
authored
feat(shard-distributor): add canary ping handler for executor health checks (#7486)
Depends on #7475 being merged **What changed?** Added server-side ping handler for canary health checks in the shard distributor. The handler responds to ping requests from other executors, verifying shard ownership. **Why?** Enables executor-to-executor health verification in the canary system. The ping handler allows executors to check if peer executors own specific shards and are functioning correctly. **How did you test it?** Unit tests **Potential risks** **Release notes** **Documentation Changes** --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 7c89623 commit 9d3dc8a

File tree

2 files changed

+203
-0
lines changed

2 files changed

+203
-0
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
6+
"go.uber.org/fx"
7+
"go.uber.org/zap"
8+
9+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
10+
"github.com/uber/cadence/service/sharddistributor/canary/processor"
11+
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
12+
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
13+
)
14+
15+
// PingHandler handles ping requests to verify executor ownership of shards
16+
type PingHandler struct {
17+
logger *zap.Logger
18+
executorsFixed map[string]executorclient.Executor[*processor.ShardProcessor] // namespace -> executor
19+
executorsEphemeral map[string]executorclient.Executor[*processorephemeral.ShardProcessor] // namespace -> executor
20+
}
21+
22+
// Params are the parameters for creating a PingHandler
23+
type Params struct {
24+
fx.In
25+
26+
Logger *zap.Logger
27+
28+
ExecutorsFixed []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"`
29+
ExecutorsEphemeral []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"`
30+
}
31+
32+
// NewPingHandler creates a new PingHandler
33+
func NewPingHandler(params Params) *PingHandler {
34+
// Create maps of executors for quick lookup
35+
executorsFixed := make(map[string]executorclient.Executor[*processor.ShardProcessor])
36+
for _, executor := range params.ExecutorsFixed {
37+
executorsFixed[executor.GetNamespace()] = executor
38+
}
39+
executorsEphemeral := make(map[string]executorclient.Executor[*processorephemeral.ShardProcessor])
40+
for _, executor := range params.ExecutorsEphemeral {
41+
executorsEphemeral[executor.GetNamespace()] = executor
42+
}
43+
44+
// Return the handler
45+
return &PingHandler{
46+
logger: params.Logger,
47+
executorsFixed: executorsFixed,
48+
executorsEphemeral: executorsEphemeral,
49+
}
50+
}
51+
52+
// Ping handles ping requests to check shard ownership
53+
func (h *PingHandler) Ping(ctx context.Context, request *sharddistributorv1.PingRequest) (*sharddistributorv1.PingResponse, error) {
54+
h.logger.Info("Received ping request",
55+
zap.String("shard_key", request.GetShardKey()),
56+
zap.String("namespace", request.GetNamespace()))
57+
58+
namespace := request.GetNamespace()
59+
60+
// Check fixed executors
61+
if executor, found := h.executorsFixed[namespace]; found {
62+
return checkOwnerShipAndLog(ctx, executor, request, h), nil
63+
}
64+
65+
// Check ephemeral executors
66+
if executor, found := h.executorsEphemeral[namespace]; found {
67+
return checkOwnerShipAndLog(ctx, executor, request, h), nil
68+
}
69+
70+
// Namespace not found
71+
h.logger.Warn("Namespace executor not found",
72+
zap.String("namespace", namespace))
73+
74+
return &sharddistributorv1.PingResponse{
75+
ExecutorId: "",
76+
OwnsShard: false,
77+
ShardKey: request.GetShardKey(),
78+
}, nil
79+
}
80+
81+
func checkOwnerShipAndLog[T executorclient.ShardProcessor](ctx context.Context, executor executorclient.Executor[T], request *sharddistributorv1.PingRequest, h *PingHandler) *sharddistributorv1.PingResponse {
82+
// We just check that we have a processor for the shard
83+
_, err := executor.GetShardProcess(ctx, request.GetShardKey())
84+
ownshard := err == nil
85+
86+
metadata := executor.GetMetadata()
87+
executorID := getExecutorID(metadata)
88+
89+
response := &sharddistributorv1.PingResponse{
90+
ExecutorId: executorID,
91+
OwnsShard: ownshard,
92+
ShardKey: request.GetShardKey(),
93+
}
94+
95+
h.logger.Info("Responding to ping",
96+
zap.String("shard_key", request.GetShardKey()),
97+
zap.Bool("owns_shard", ownshard),
98+
zap.String("executor_id", executorID))
99+
100+
return response
101+
}
102+
103+
func getExecutorID(metadata map[string]string) string {
104+
if addr, ok := metadata["grpc_address"]; ok && addr != "" {
105+
return addr
106+
}
107+
return ""
108+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
"go.uber.org/mock/gomock"
11+
"go.uber.org/zap"
12+
13+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
14+
"github.com/uber/cadence/service/sharddistributor/canary/processor"
15+
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
16+
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
17+
)
18+
19+
func TestPingHandler_Ping(t *testing.T) {
20+
tests := []struct {
21+
name string
22+
namespace string
23+
shardKey string
24+
setup func(*gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor])
25+
wantID string
26+
wantOwns bool
27+
}{
28+
{
29+
name: "fixed executor owns shard",
30+
namespace: "ns1",
31+
shardKey: "shard-1",
32+
setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) {
33+
exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl)
34+
exec.EXPECT().GetNamespace().Return("ns1").AnyTimes()
35+
exec.EXPECT().GetShardProcess(gomock.Any(), "shard-1").Return(&processor.ShardProcessor{}, nil)
36+
exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7953"})
37+
return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil
38+
},
39+
wantID: "127.0.0.1:7953",
40+
wantOwns: true,
41+
},
42+
{
43+
name: "fixed executor does not own shard",
44+
namespace: "ns1",
45+
shardKey: "shard-2",
46+
setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) {
47+
exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl)
48+
exec.EXPECT().GetNamespace().Return("ns1").AnyTimes()
49+
exec.EXPECT().GetShardProcess(gomock.Any(), "shard-2").Return(nil, errors.New("not found"))
50+
exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7954"})
51+
return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil
52+
},
53+
wantID: "127.0.0.1:7954",
54+
wantOwns: false,
55+
},
56+
{
57+
name: "ephemeral executor owns shard",
58+
namespace: "ns2",
59+
shardKey: "shard-3",
60+
setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) {
61+
exec := executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl)
62+
exec.EXPECT().GetNamespace().Return("ns2").AnyTimes()
63+
exec.EXPECT().GetShardProcess(gomock.Any(), "shard-3").Return(&processorephemeral.ShardProcessor{}, nil)
64+
exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7955"})
65+
return nil, []executorclient.Executor[*processorephemeral.ShardProcessor]{exec}
66+
},
67+
wantID: "127.0.0.1:7955",
68+
wantOwns: true,
69+
},
70+
}
71+
72+
for _, tt := range tests {
73+
t.Run(tt.name, func(t *testing.T) {
74+
ctrl := gomock.NewController(t)
75+
defer ctrl.Finish()
76+
77+
fixed, ephemeral := tt.setup(ctrl)
78+
handler := NewPingHandler(Params{
79+
Logger: zap.NewNop(),
80+
ExecutorsFixed: fixed,
81+
ExecutorsEphemeral: ephemeral,
82+
})
83+
84+
resp, err := handler.Ping(context.Background(), &sharddistributorv1.PingRequest{
85+
Namespace: tt.namespace,
86+
ShardKey: tt.shardKey,
87+
})
88+
89+
require.NoError(t, err)
90+
assert.Equal(t, tt.wantID, resp.ExecutorId)
91+
assert.Equal(t, tt.wantOwns, resp.OwnsShard)
92+
assert.Equal(t, tt.shardKey, resp.ShardKey)
93+
})
94+
}
95+
}

0 commit comments

Comments
 (0)