@@ -2,51 +2,58 @@ package processorephemeral
22
33import (
44 "context"
5+ "fmt"
56 "sync"
67 "time"
78
89 "github.com/google/uuid"
910 "go.uber.org/fx"
11+ "go.uber.org/yarpc"
1012 "go.uber.org/zap"
1113
12- "github.com/uber/cadence/client/ sharddistributor"
14+ sharddistributorv1 "github.com/uber/cadence/.gen/proto/ sharddistributor/v1 "
1315 "github.com/uber/cadence/common/clock"
14- "github.com/uber/cadence/common/types "
16+ "github.com/uber/cadence/service/sharddistributor/client/spectatorclient "
1517)
1618
19+ //go:generate mockgen -package $GOPACKAGE -destination canary_client_mock_test.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient
20+
1721const (
1822 shardCreationInterval = 1 * time .Second
1923)
2024
2125// ShardCreator creates shards at regular intervals for ephemeral canary testing
2226type ShardCreator struct {
23- logger * zap.Logger
24- timeSource clock.TimeSource
25- shardDistributor sharddistributor.Client
27+ logger * zap.Logger
28+ timeSource clock.TimeSource
29+ spectators map [string ]spectatorclient.Spectator // namespace -> spectator
30+ canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
31+ namespaces []string
2632
2733 stopChan chan struct {}
2834 goRoutineWg sync.WaitGroup
29- namespaces []string
3035}
3136
3237// ShardCreatorParams contains the dependencies needed to create a ShardCreator
3338type ShardCreatorParams struct {
3439 fx.In
3540
36- Logger * zap.Logger
37- TimeSource clock.TimeSource
38- ShardDistributor sharddistributor.Client
41+ Logger * zap.Logger
42+ TimeSource clock.TimeSource
43+ Spectators map [string ]spectatorclient.Spectator
44+ CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
3945}
4046
4147// NewShardCreator creates a new ShardCreator instance with the given parameters and namespace
4248func NewShardCreator (params ShardCreatorParams , namespaces []string ) * ShardCreator {
4349 return & ShardCreator {
44- logger : params .Logger ,
45- timeSource : params .TimeSource ,
46- shardDistributor : params .ShardDistributor ,
47- stopChan : make (chan struct {}),
48- goRoutineWg : sync.WaitGroup {},
49- namespaces : namespaces ,
50+ logger : params .Logger ,
51+ timeSource : params .TimeSource ,
52+ spectators : params .Spectators ,
53+ canaryClient : params .CanaryClient ,
54+ stopChan : make (chan struct {}),
55+ goRoutineWg : sync.WaitGroup {},
56+ namespaces : namespaces ,
5057 }
5158}
5259
@@ -92,16 +99,83 @@ func (s *ShardCreator) process(ctx context.Context) {
9299 for _ , namespace := range s .namespaces {
93100 shardKey := uuid .New ().String ()
94101 s .logger .Info ("Creating shard" , zap .String ("shardKey" , shardKey ), zap .String ("namespace" , namespace ))
95- response , err := s .shardDistributor .GetShardOwner (ctx , & types.GetShardOwnerRequest {
96- ShardKey : shardKey ,
97- Namespace : namespace ,
98- })
102+
103+ // Get spectator for this namespace
104+ spectator , ok := s .spectators [namespace ]
105+ if ! ok {
106+ s .logger .Warn ("No spectator for namespace, skipping shard creation" ,
107+ zap .String ("namespace" , namespace ),
108+ zap .String ("shardKey" , shardKey ))
109+ continue
110+ }
111+
112+ // Create shard and get owner via spectator (spectator will create if not exists)
113+ owner , err := spectator .GetShardOwner (ctx , shardKey )
99114 if err != nil {
100- s .logger .Error ("create shard failed" , zap .Error (err ))
115+ s .logger .Error ("Failed to get/create shard owner" ,
116+ zap .Error (err ),
117+ zap .String ("namespace" , namespace ),
118+ zap .String ("shardKey" , shardKey ))
101119 continue
102120 }
103- s .logger .Info ("shard created" , zap .String ("shardKey" , shardKey ), zap .String ("shardOwner" , response .Owner ), zap .String ("namespace" , response .Namespace ))
121+
122+ s .logger .Info ("Shard created, got owner from spectator" ,
123+ zap .String ("shardKey" , shardKey ),
124+ zap .String ("namespace" , namespace ),
125+ zap .String ("executor_id" , owner .ExecutorID ))
126+
127+ // Now ping the owner to verify
128+ if err := s .pingShardOwner (ctx , owner , namespace , shardKey ); err != nil {
129+ s .logger .Error ("Failed to ping shard owner" ,
130+ zap .Error (err ),
131+ zap .String ("namespace" , namespace ),
132+ zap .String ("shardKey" , shardKey ))
133+ }
104134 }
105135 }
106136 }
107137}
138+
139+ func (s * ShardCreator ) pingShardOwner (ctx context.Context , owner * spectatorclient.ShardOwner , namespace , shardKey string ) error {
140+ s .logger .Debug ("Pinging shard owner after creation" ,
141+ zap .String ("namespace" , namespace ),
142+ zap .String ("shardKey" , shardKey ),
143+ zap .String ("expected_executor_id" , owner .ExecutorID ))
144+
145+ // Create ping request
146+ request := & sharddistributorv1.PingRequest {
147+ ShardKey : shardKey ,
148+ Namespace : namespace ,
149+ }
150+
151+ // SIMPLE CANARY CODE: Just pass the shard key and namespace!
152+ // The SpectatorPeerChooser library code handles routing to the right executor
153+ response , err := s .canaryClient .Ping (ctx , request , yarpc .WithShardKey (shardKey ), yarpc .WithHeader (spectatorclient .NamespaceHeader , namespace ))
154+ if err != nil {
155+ return fmt .Errorf ("ping rpc failed: %w" , err )
156+ }
157+
158+ // Verify response matches the owner we got from spectator
159+ if response .GetExecutorId () != owner .ExecutorID {
160+ s .logger .Warn ("Executor ID mismatch" ,
161+ zap .String ("namespace" , namespace ),
162+ zap .String ("shardKey" , shardKey ),
163+ zap .String ("expected" , owner .ExecutorID ),
164+ zap .String ("actual" , response .GetExecutorId ()))
165+ }
166+
167+ if ! response .GetOwnsShard () {
168+ s .logger .Warn ("Executor does not own shard" ,
169+ zap .String ("namespace" , namespace ),
170+ zap .String ("shardKey" , shardKey ),
171+ zap .String ("executor_id" , response .GetExecutorId ()))
172+ return fmt .Errorf ("executor %s does not own shard %s" , response .GetExecutorId (), shardKey )
173+ }
174+
175+ s .logger .Info ("Successfully verified shard owner after creation" ,
176+ zap .String ("namespace" , namespace ),
177+ zap .String ("shardKey" , shardKey ),
178+ zap .String ("executor_id" , response .GetExecutorId ()))
179+
180+ return nil
181+ }
0 commit comments