Skip to content

Commit c96e7cb

Browse files
committed
Wire up canary ping/pong components in module
Integrate all canary components into the module: - Create SpectatorPeerChooser and manage its lifecycle - Provide canary client using the dispatcher - Create and start the pinger component - Register ping handler as a YARPC server - Wire spectators to peer chooser on startup This connects all the pieces needed for executor-to-executor canary testing via ping/pong requests. Dependencies: - Requires SpectatorPeerChooser - Requires PingHandler - Requires Pinger component Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 75d6de8 commit c96e7cb

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

service/sharddistributor/canary/module.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package canary
22

33
import (
4+
"context"
5+
46
"go.uber.org/fx"
7+
"go.uber.org/yarpc"
58

69
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
710
"github.com/uber/cadence/service/sharddistributor/canary/executors"
811
"github.com/uber/cadence/service/sharddistributor/canary/factory"
12+
"github.com/uber/cadence/service/sharddistributor/canary/handler"
13+
"github.com/uber/cadence/service/sharddistributor/canary/pinger"
914
"github.com/uber/cadence/service/sharddistributor/canary/processor"
1015
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
1116
"github.com/uber/cadence/service/sharddistributor/canary/sharddistributorclient"
1217
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
18+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1319
)
1420

1521
type NamespacesNames struct {
@@ -49,5 +55,42 @@ func opts(names NamespacesNames) fx.Option {
4955
executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace),
5056

5157
processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}),
58+
59+
spectatorclient.Module(),
60+
fx.Provide(spectatorclient.NewSpectatorPeerChooser),
61+
fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, lc fx.Lifecycle) {
62+
lc.Append(fx.StartStopHook(chooser.Start, chooser.Stop))
63+
}),
64+
65+
// Create canary client using the dispatcher's client config
66+
fx.Provide(func(dispatcher *yarpc.Dispatcher) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient {
67+
config := dispatcher.ClientConfig("shard-distributor-canary")
68+
return sharddistributorv1.NewShardDistributorExecutorCanaryAPIYARPCClient(config)
69+
}),
70+
71+
fx.Provide(func(params pinger.Params) *pinger.Pinger {
72+
return pinger.NewPinger(params, names.FixedNamespace, 32)
73+
}),
74+
fx.Invoke(func(p *pinger.Pinger, lc fx.Lifecycle) {
75+
lc.Append(fx.StartStopHook(p.Start, p.Stop))
76+
}),
77+
78+
// Register canary ping handler to receive ping requests from other executors
79+
fx.Provide(handler.NewPingHandler),
80+
fx.Provide(fx.Annotate(
81+
func(h *handler.PingHandler) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer {
82+
return h
83+
},
84+
)),
85+
fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCProcedures()),
86+
87+
fx.Invoke(func(lc fx.Lifecycle, chooser spectatorclient.SpectatorPeerChooserInterface, spectators spectatorclient.Spectators) {
88+
lc.Append(fx.Hook{
89+
OnStart: func(ctx context.Context) error {
90+
chooser.SetSpectators(spectators)
91+
return nil
92+
},
93+
})
94+
}),
5295
)
5396
}

service/sharddistributor/canary/module_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ func TestModule(t *testing.T) {
2323
// Create mocks
2424
ctrl := gomock.NewController(t)
2525
mockClientConfig := transporttest.NewMockClientConfig(ctrl)
26-
outbound := grpc.NewTransport().NewOutbound(yarpctest.NewFakePeerList())
26+
transport := grpc.NewTransport()
27+
outbound := transport.NewOutbound(yarpctest.NewFakePeerList())
2728

2829
mockClientConfig.EXPECT().Caller().Return("test-executor").Times(2)
2930
mockClientConfig.EXPECT().Service().Return("shard-distributor").Times(2)
@@ -43,6 +44,11 @@ func TestModule(t *testing.T) {
4344
},
4445
}
4546

47+
// Create a mock dispatcher
48+
dispatcher := yarpc.NewDispatcher(yarpc.Config{
49+
Name: "test-canary",
50+
})
51+
4652
// Create a test app with the library, check that it starts and stops
4753
fxtest.New(t,
4854
fx.Supply(
@@ -52,6 +58,8 @@ func TestModule(t *testing.T) {
5258
fx.Annotate(mockClientConfigProvider, fx.As(new(yarpc.ClientConfig))),
5359
zaptest.NewLogger(t),
5460
config,
61+
transport,
62+
dispatcher,
5563
),
5664
Module(NamespacesNames{FixedNamespace: "shard-distributor-canary", EphemeralNamespace: "shard-distributor-canary-ephemeral", ExternalAssignmentNamespace: "test-external-assignment", SharddistributorServiceName: "cadence-shard-distributor"}),
5765
).RequireStart().RequireStop()

0 commit comments

Comments
 (0)