@@ -206,12 +206,13 @@ func (jm *JobManager) createDescriptorSourceAndgRPCMethod(
206206}
207207
208208// createJobFunction returns a function that the job scheduler puts on its queue.
209- func (jm * JobManager ) createJobFunction (jc config.JobConfig ) func (ctx context.Context ) error {
209+ func (jm * JobManager ) createJobFunction (jc config.JobConfig , continuous bool ) func (ctx context.Context ) error {
210210 jobLogger := jm .logger .Sublogger (jc .Name )
211211 // To support logging for quick jobs (~ on the seconds schedule), we disable log
212212 // deduplication for job loggers.
213213 jobLogger .NeverDeduplicate ()
214- return func (ctx context.Context ) error {
214+
215+ jobFunc := func (ctx context.Context ) error {
215216 res , err := jm .getResource (jc .Resource )
216217 if err != nil {
217218 jobLogger .CWarnw (jm .ctx , "Could not get resource" , "error" , err .Error ())
@@ -272,6 +273,7 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
272273 jobLogger .CWarnw (jm .ctx , "Job failed" , "error" , err .Error ())
273274 return err
274275 } else if h .Status != nil && h .Status .Err () != nil {
276+ // if job panics, it seems to be captured here.
275277 jobLogger .CWarnw (jm .ctx , "Job failed" , "error" , h .Status .Err ())
276278 return h .Status .Err ()
277279 }
@@ -284,6 +286,34 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
284286 jobLogger .CDebugw (jm .ctx , "Job succeeded" , "response" , response )
285287 return nil
286288 }
289+
290+ return func (ctx context.Context ) error {
291+ var err error
292+ for {
293+ select {
294+ case <- ctx .Done ():
295+ // Job cancelled (e.g. from schedule modification)
296+ return err
297+ case <- jm .ctx .Done ():
298+ // JM shutting down
299+ return err
300+ default :
301+ }
302+ err = jobFunc (ctx )
303+ now := timestamppb .Now ()
304+ if jh , ok := jm .JobHistories .Load (jc .Name ); ok {
305+ if err != nil {
306+ // this includes captured panics (from InvokeRPC).
307+ jh .AddFailure (now )
308+ } else {
309+ jh .AddSuccess (now )
310+ }
311+ }
312+ if ! continuous {
313+ return err
314+ }
315+ }
316+ }
287317}
288318
289319// removeJob removes the job from the scheduler and clears the internal map entry.
@@ -307,32 +337,21 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
307337 return
308338 }
309339
310- var jobType gocron.JobDefinition
311- var jobLimitMode gocron.LimitMode
312- t , err := time .ParseDuration (jc .Schedule )
313- if err != nil {
314- withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
315- jobType = gocron .CronJob (jc .Schedule , withSeconds )
316- jobLimitMode = gocron .LimitModeReschedule
340+ var continuous bool
341+ var jobDefinition gocron.JobDefinition
342+ var jobOptions []gocron.JobOption
343+ if strings .ToLower (jc .Schedule ) == "continuous" {
344+ continuous = true
345+ // used with WithIntervalFromCompletion: if job unexpectedly exits, try to restart later.
346+ // since we capture panics, this is largely unused, but helps reduce scheduler overhead.
347+ jobDefinition = gocron .DurationJob (time .Second * 5 )
348+ jobOptions = append (jobOptions ,
349+ // don't queue up if running
350+ gocron .WithSingletonMode (gocron .LimitModeReschedule ),
351+ gocron .WithIntervalFromCompletion ())
317352 } else {
318- jobType = gocron .DurationJob (t )
319- jobLimitMode = gocron .LimitModeWait
320- }
321-
322- if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
323- jm .JobHistories .Store (jc .Name , & JobHistory {
324- successTimes : ring .New (historyLength ),
325- failureTimes : ring .New (historyLength ),
326- })
327- jm .NumJobHistories .Add (1 )
328- }
329-
330- jobLogger := jm .logger .Sublogger (jc .Name )
331-
332- jobFunc := jm .createJobFunction (jc )
333- j , err := jm .scheduler .NewJob (
334- jobType ,
335- gocron .NewTask (jobFunc ),
353+ // Regular gocron-supported modes:
354+ //
336355 // WithSingletonMode option allows us to perform jobs on the same schedule
337356 // sequentially. This will guarantee that there is only one instance of a particular
338357 // job running at the same time. If a job reaches its schedule while the previous
@@ -361,30 +380,38 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
361380
362381 // It is also important to note that DURATION jobs start relative to when they were
363382 // queued on the job scheduler, while CRON jobs are tied to the physical clock.
364- gocron .WithSingletonMode (jobLimitMode ),
383+ t , err := time .ParseDuration (jc .Schedule )
384+ if err != nil {
385+ // TODO: exit if cron job is also invalid. Currently it's stored as an invalid string and validated at NewJob call.
386+ withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
387+ jobDefinition = gocron .CronJob (jc .Schedule , withSeconds )
388+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeReschedule ))
389+
390+ } else {
391+ jobDefinition = gocron .DurationJob (t )
392+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeWait ))
393+ }
394+
395+ if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
396+ jm .JobHistories .Store (jc .Name , & JobHistory {
397+ successTimes : ring .New (historyLength ),
398+ failureTimes : ring .New (historyLength ),
399+ })
400+ jm .NumJobHistories .Add (1 )
401+ }
402+ }
403+
404+ jobOptions = append (jobOptions ,
365405 gocron .WithName (jc .Name ),
366- gocron .WithContext (jm .ctx ),
367- gocron .WithEventListeners (
368- // May be slightly more accurate to use j.LastRun(), but we don't have direct reference to it here, and we don't want the job to
369- // complete before we can store the returned Job.
370- gocron .AfterJobRuns (func (jobID uuid.UUID , jobName string ) {
371- now := timestamppb .Now ()
372- if jh , ok := jm .JobHistories .Load (jobName ); ok {
373- jh .AddSuccess (now )
374- }
375- }),
376- gocron .AfterJobRunsWithError (func (jobID uuid.UUID , jobName string , err error ) {
377- now := timestamppb .Now ()
378- if jh , ok := jm .JobHistories .Load (jobName ); ok {
379- jh .AddFailure (now )
380- }
381- }),
382- gocron .AfterJobRunsWithPanic (func (jobID uuid.UUID , jobName string , recoverData any ) {
383- now := timestamppb .Now ()
384- if jh , ok := jm .JobHistories .Load (jobName ); ok {
385- jh .AddFailure (now )
386- }
387- })),
406+ gocron .WithContext (jm .ctx ))
407+
408+ jobLogger := jm .logger .Sublogger (jc .Name )
409+
410+ jobFunc := jm .createJobFunction (jc , continuous )
411+ j , err := jm .scheduler .NewJob (
412+ jobDefinition ,
413+ gocron .NewTask (jobFunc ),
414+ jobOptions ... ,
388415 )
389416 if err != nil {
390417 jobLogger .CErrorw (jm .ctx , "Failed to create a new job" , "name" , jc .Name , "error" , err .Error ())
0 commit comments