|
1 | 1 | package canary |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "context" |
| 5 | + |
4 | 6 | "go.uber.org/fx" |
| 7 | + "go.uber.org/yarpc" |
5 | 8 |
|
6 | 9 | sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" |
7 | 10 | "github.com/uber/cadence/service/sharddistributor/canary/executors" |
8 | 11 | "github.com/uber/cadence/service/sharddistributor/canary/factory" |
| 12 | + "github.com/uber/cadence/service/sharddistributor/canary/handler" |
| 13 | + "github.com/uber/cadence/service/sharddistributor/canary/pinger" |
9 | 14 | "github.com/uber/cadence/service/sharddistributor/canary/processor" |
10 | 15 | "github.com/uber/cadence/service/sharddistributor/canary/processorephemeral" |
11 | 16 | "github.com/uber/cadence/service/sharddistributor/canary/sharddistributorclient" |
12 | 17 | "github.com/uber/cadence/service/sharddistributor/client/executorclient" |
| 18 | + "github.com/uber/cadence/service/sharddistributor/client/spectatorclient" |
13 | 19 | ) |
14 | 20 |
|
15 | 21 | type NamespacesNames struct { |
@@ -49,5 +55,42 @@ func opts(names NamespacesNames) fx.Option { |
49 | 55 | executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace), |
50 | 56 |
|
51 | 57 | processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}), |
| 58 | + |
| 59 | + spectatorclient.Module(), |
| 60 | + fx.Provide(spectatorclient.NewSpectatorPeerChooser), |
| 61 | + fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, lc fx.Lifecycle) { |
| 62 | + lc.Append(fx.StartStopHook(chooser.Start, chooser.Stop)) |
| 63 | + }), |
| 64 | + |
| 65 | + // Create canary client using the dispatcher's client config |
| 66 | + fx.Provide(func(dispatcher *yarpc.Dispatcher) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient { |
| 67 | + config := dispatcher.ClientConfig("shard-distributor-canary") |
| 68 | + return sharddistributorv1.NewShardDistributorExecutorCanaryAPIYARPCClient(config) |
| 69 | + }), |
| 70 | + |
| 71 | + fx.Provide(func(params pinger.Params) *pinger.Pinger { |
| 72 | + return pinger.NewPinger(params, names.FixedNamespace, 32) |
| 73 | + }), |
| 74 | + fx.Invoke(func(p *pinger.Pinger, lc fx.Lifecycle) { |
| 75 | + lc.Append(fx.StartStopHook(p.Start, p.Stop)) |
| 76 | + }), |
| 77 | + |
| 78 | + // Register canary ping handler to receive ping requests from other executors |
| 79 | + fx.Provide(handler.NewPingHandler), |
| 80 | + fx.Provide(fx.Annotate( |
| 81 | + func(h *handler.PingHandler) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer { |
| 82 | + return h |
| 83 | + }, |
| 84 | + )), |
| 85 | + fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCProcedures()), |
| 86 | + |
| 87 | + fx.Invoke(func(lc fx.Lifecycle, chooser spectatorclient.SpectatorPeerChooserInterface, spectators spectatorclient.Spectators) { |
| 88 | + lc.Append(fx.Hook{ |
| 89 | + OnStart: func(ctx context.Context) error { |
| 90 | + chooser.SetSpectators(spectators) |
| 91 | + return nil |
| 92 | + }, |
| 93 | + }) |
| 94 | + }), |
52 | 95 | ) |
53 | 96 | } |
0 commit comments