@@ -2,51 +2,58 @@ package processorephemeral
22
33import (
44 "context"
5+ "fmt"
56 "sync"
67 "time"
78
89 "github.com/google/uuid"
910 "go.uber.org/fx"
11+ "go.uber.org/yarpc"
1012 "go.uber.org/zap"
1113
12- "github.com/uber/cadence/client/ sharddistributor"
14+ sharddistributorv1 "github.com/uber/cadence/.gen/proto/ sharddistributor/v1 "
1315 "github.com/uber/cadence/common/clock"
14- "github.com/uber/cadence/common/types "
16+ "github.com/uber/cadence/service/sharddistributor/client/spectatorclient "
1517)
1618
19+ //go:generate mockgen -package $GOPACKAGE -destination canary_client_mock_test.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient
20+
1721const (
1822 shardCreationInterval = 1 * time .Second
1923)
2024
2125// ShardCreator creates shards at regular intervals for ephemeral canary testing
2226type ShardCreator struct {
23- logger * zap.Logger
24- timeSource clock.TimeSource
25- shardDistributor sharddistributor.Client
27+ logger * zap.Logger
28+ timeSource clock.TimeSource
29+ spectators * spectatorclient.Spectators
30+ canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
31+ namespaces []string
2632
2733 stopChan chan struct {}
2834 goRoutineWg sync.WaitGroup
29- namespaces []string
3035}
3136
3237// ShardCreatorParams contains the dependencies needed to create a ShardCreator
3338type ShardCreatorParams struct {
3439 fx.In
3540
36- Logger * zap.Logger
37- TimeSource clock.TimeSource
38- ShardDistributor sharddistributor.Client
41+ Logger * zap.Logger
42+ TimeSource clock.TimeSource
43+ Spectators * spectatorclient.Spectators
44+ CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
3945}
4046
4147// NewShardCreator creates a new ShardCreator instance with the given parameters and namespace
4248func NewShardCreator (params ShardCreatorParams , namespaces []string ) * ShardCreator {
4349 return & ShardCreator {
44- logger : params .Logger ,
45- timeSource : params .TimeSource ,
46- shardDistributor : params .ShardDistributor ,
47- stopChan : make (chan struct {}),
48- goRoutineWg : sync.WaitGroup {},
49- namespaces : namespaces ,
50+ logger : params .Logger ,
51+ timeSource : params .TimeSource ,
52+ spectators : params .Spectators ,
53+ canaryClient : params .CanaryClient ,
54+ stopChan : make (chan struct {}),
55+ goRoutineWg : sync.WaitGroup {},
56+ namespaces : namespaces ,
5057 }
5158}
5259
@@ -92,16 +99,84 @@ func (s *ShardCreator) process(ctx context.Context) {
9299 for _ , namespace := range s .namespaces {
93100 shardKey := uuid .New ().String ()
94101 s .logger .Info ("Creating shard" , zap .String ("shardKey" , shardKey ), zap .String ("namespace" , namespace ))
95- response , err := s .shardDistributor .GetShardOwner (ctx , & types.GetShardOwnerRequest {
96- ShardKey : shardKey ,
97- Namespace : namespace ,
98- })
102+
103+ // Get spectator for this namespace
104+ spectator , err := s .spectators .ForNamespace (namespace )
105+ if err != nil {
106+ s .logger .Warn ("No spectator for namespace, skipping shard creation" ,
107+ zap .String ("namespace" , namespace ),
108+ zap .String ("shardKey" , shardKey ),
109+ zap .Error (err ))
110+ continue
111+ }
112+
113+ // Create shard and get owner via spectator (spectator will create if not exists)
114+ owner , err := spectator .GetShardOwner (ctx , shardKey )
99115 if err != nil {
100- s .logger .Error ("create shard failed" , zap .Error (err ))
116+ s .logger .Error ("Failed to get/create shard owner" ,
117+ zap .Error (err ),
118+ zap .String ("namespace" , namespace ),
119+ zap .String ("shardKey" , shardKey ))
101120 continue
102121 }
103- s .logger .Info ("shard created" , zap .String ("shardKey" , shardKey ), zap .String ("shardOwner" , response .Owner ), zap .String ("namespace" , response .Namespace ))
122+
123+ s .logger .Info ("Shard created, got owner from spectator" ,
124+ zap .String ("shardKey" , shardKey ),
125+ zap .String ("namespace" , namespace ),
126+ zap .String ("executor_id" , owner .ExecutorID ))
127+
128+ // Now ping the owner to verify
129+ if err := s .pingShardOwner (ctx , owner , namespace , shardKey ); err != nil {
130+ s .logger .Error ("Failed to ping shard owner" ,
131+ zap .Error (err ),
132+ zap .String ("namespace" , namespace ),
133+ zap .String ("shardKey" , shardKey ))
134+ }
104135 }
105136 }
106137 }
107138}
139+
140+ func (s * ShardCreator ) pingShardOwner (ctx context.Context , owner * spectatorclient.ShardOwner , namespace , shardKey string ) error {
141+ s .logger .Debug ("Pinging shard owner after creation" ,
142+ zap .String ("namespace" , namespace ),
143+ zap .String ("shardKey" , shardKey ),
144+ zap .String ("expected_executor_id" , owner .ExecutorID ))
145+
146+ // Create ping request
147+ request := & sharddistributorv1.PingRequest {
148+ ShardKey : shardKey ,
149+ Namespace : namespace ,
150+ }
151+
152+ // SIMPLE CANARY CODE: Just pass the shard key and namespace!
153+ // The SpectatorPeerChooser library code handles routing to the right executor
154+ response , err := s .canaryClient .Ping (ctx , request , yarpc .WithShardKey (shardKey ), yarpc .WithHeader (spectatorclient .NamespaceHeader , namespace ))
155+ if err != nil {
156+ return fmt .Errorf ("ping rpc failed: %w" , err )
157+ }
158+
159+ // Verify response matches the owner we got from spectator
160+ if response .GetExecutorId () != owner .ExecutorID {
161+ s .logger .Warn ("Executor ID mismatch" ,
162+ zap .String ("namespace" , namespace ),
163+ zap .String ("shardKey" , shardKey ),
164+ zap .String ("expected" , owner .ExecutorID ),
165+ zap .String ("actual" , response .GetExecutorId ()))
166+ }
167+
168+ if ! response .GetOwnsShard () {
169+ s .logger .Warn ("Executor does not own shard" ,
170+ zap .String ("namespace" , namespace ),
171+ zap .String ("shardKey" , shardKey ),
172+ zap .String ("executor_id" , response .GetExecutorId ()))
173+ return fmt .Errorf ("executor %s does not own shard %s" , response .GetExecutorId (), shardKey )
174+ }
175+
176+ s .logger .Info ("Successfully verified shard owner after creation" ,
177+ zap .String ("namespace" , namespace ),
178+ zap .String ("shardKey" , shardKey ),
179+ zap .String ("executor_id" , response .GetExecutorId ()))
180+
181+ return nil
182+ }
0 commit comments