@@ -90,10 +90,13 @@ func (w Worker) Execute(j *models.LifecycleResourceJob) error {
90
90
}
91
91
92
92
// consumeChannels is a goroutine that listens to the golang channels and processes the events
93
- func (w Worker ) consumeChannels (LifecycleResourceJobsStatusChannel chan string , LifecyclePlacementJobsStatusChannel chan string ) {
93
+ func (w Worker ) consumeChannels (ctx context. Context , LifecycleResourceJobsStatusChannel chan string , LifecyclePlacementJobsStatusChannel chan string ) {
94
94
WorkerLoop:
95
95
for {
96
96
select {
97
+ case <- ctx .Done ():
98
+ log .Logger .Warn ("Context cancelled, exiting consumeChannels worker" )
99
+ return
97
100
case msg := <- LifecycleResourceJobsStatusChannel :
98
101
id , err := strconv .Atoi (msg )
99
102
if err != nil {
@@ -200,7 +203,7 @@ WorkerLoop:
200
203
}
201
204
}
202
205
203
- func (w Worker ) WatchLifecycleDBChannels () error {
206
+ func (w Worker ) WatchLifecycleDBChannels (ctx context. Context ) error {
204
207
205
208
// Create channels for resource lifecycle events
206
209
LifecycleResourceJobsStatusChannel := make (chan string )
@@ -213,55 +216,60 @@ func (w Worker) WatchLifecycleDBChannels() error {
213
216
return err
214
217
}
215
218
219
+ ctx , cancel := context .WithCancel (ctx )
220
+ // In case this goroutine stop, stop all workers and restart it
221
+ defer func () {
222
+ // Log that we are restarting
223
+ log .Logger .Warn ("Restarting worker WatchLifecycleDBChannels and its workers" )
224
+ cancel ()
225
+ // sleep for 5 seconds before restarting
226
+ time .Sleep (5 * time .Second )
227
+
228
+ go w .WatchLifecycleDBChannels (context .Background ())
229
+ }()
230
+
231
+ conn , err := w .Dbpool .Acquire (context .Background ())
232
+ if err != nil {
233
+ log .Logger .Error ("Error acquiring connection" , "error" , err )
234
+ return err
235
+ }
236
+ defer conn .Release ()
237
+
238
+ channels := []string {
239
+ "lifecycle_placement_jobs_status_channel" ,
240
+ "lifecycle_resource_jobs_status_channel" ,
241
+ }
242
+ for _ , pgChan := range channels {
243
+ _ , err = conn .Exec (context .Background (), fmt .Sprintf ("LISTEN %s" , pgChan ))
244
+ if err != nil {
245
+ log .Logger .Error ("Error listening to the channel" , "channel" , pgChan , "error" , err )
246
+ return err
247
+ }
248
+ log .Logger .Info ("Listening to channel" , "channel" , pgChan )
249
+ }
250
+
216
251
// Create go routines to listen to the Golang channels
217
252
for i := 0 ; i < workers ; i ++ {
218
- go w .consumeChannels (LifecycleResourceJobsStatusChannel , LifecyclePlacementJobsStatusChannel )
253
+ go w .consumeChannels (ctx , LifecycleResourceJobsStatusChannel , LifecyclePlacementJobsStatusChannel )
219
254
}
220
255
221
- // Listen to the DB channels and publish to the Golang channels
222
- MainLoop:
223
256
for {
224
- time .Sleep (1 * time .Second )
225
- conn , err := w .Dbpool .Acquire (context .Background ())
226
- defer conn .Release ()
227
-
257
+ notification , err := conn .Conn ().WaitForNotification (context .Background ())
228
258
if err != nil {
229
- log .Logger .Error ("Error acquiring connection" , "error" , err )
230
- continue
231
- }
232
-
233
- channels := []string {
234
- "lifecycle_placement_jobs_status_channel" ,
235
- "lifecycle_resource_jobs_status_channel" ,
236
- }
237
- for _ , pgChan := range channels {
238
- _ , err = conn .Exec (context .Background (), fmt .Sprintf ("LISTEN %s" , pgChan ))
239
- if err != nil {
240
- log .Logger .Error ("Error listening to the channel" , "channel" , pgChan , "error" , err )
241
- continue MainLoop
242
- }
243
- log .Logger .Info ("Listening to channel" , "channel" , pgChan )
259
+ log .Logger .Error ("Error while listening to the channel" , "error" , err )
260
+ return err
244
261
}
245
262
246
- for {
247
- notification , err := conn .Conn ().WaitForNotification (context .Background ())
248
- if err != nil {
249
- log .Logger .Error ("Error while listening to the channel" , "error" , err )
250
- break MainLoop // Restart pool acquisition
251
- }
252
-
253
- log .Logger .Debug ("Notification received" , "PID" , notification .PID , "Channel" , notification .Channel , "Payload" , notification .Payload )
263
+ log .Logger .Debug ("Notification received" , "PID" , notification .PID , "Channel" , notification .Channel , "Payload" , notification .Payload )
254
264
255
- switch notification .Channel {
256
- case "lifecycle_placement_jobs_status_channel" :
257
- LifecyclePlacementJobsStatusChannel <- notification .Payload
265
+ switch notification .Channel {
266
+ case "lifecycle_placement_jobs_status_channel" :
267
+ LifecyclePlacementJobsStatusChannel <- notification .Payload
258
268
259
- case "lifecycle_resource_jobs_status_channel" :
260
- LifecycleResourceJobsStatusChannel <- notification .Payload
261
- }
269
+ case "lifecycle_resource_jobs_status_channel" :
270
+ LifecycleResourceJobsStatusChannel <- notification .Payload
262
271
}
263
272
}
264
- return nil
265
273
}
266
274
267
275
// NewWorker creates a new worker
0 commit comments