@@ -205,12 +205,14 @@ func (jm *JobManager) createDescriptorSourceAndgRPCMethod(
205205}
206206
207207// createJobFunction returns a function that the job scheduler puts on its queue.
208- func (jm * JobManager ) createJobFunction (jc config.JobConfig ) func (ctx context.Context ) error {
208+ func (jm * JobManager ) createJobFunction (jc config.JobConfig , continuous bool ) func (ctx context.Context ) error {
209209 jobLogger := jm .logger .Sublogger (jc .Name )
210210 // To support logging for quick jobs (~ on the seconds schedule), we disable log
211211 // deduplication for job loggers.
212212 jobLogger .NeverDeduplicate ()
213- return func (ctx context.Context ) error {
213+
214+ // using jm.ctx so we interrupt only if JM is shutting down. When changing schedule, let existing jobs complete instead of interrupting.
215+ jobFunc := func (_ context.Context ) error {
214216 res , err := jm .getResource (jc .Resource )
215217 if err != nil {
216218 jobLogger .CWarnw (jm .ctx , "Could not get resource" , "error" , err .Error ())
@@ -271,6 +273,7 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
271273 jobLogger .CWarnw (jm .ctx , "Job failed" , "name" , jc .Name , "error" , err .Error ())
272274 return err
273275 } else if h .Status != nil && h .Status .Err () != nil {
276+ // if job panics, it seems to be captured here.
274277 jobLogger .CWarnw (jm .ctx , "Job failed" , "name" , jc .Name , "error" , h .Status .Err ())
275278 return h .Status .Err ()
276279 }
@@ -284,6 +287,34 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
284287 jobLogger .CDebugw (jm .ctx , "Job succeeded" , "name" , jc .Name , "response" , response )
285288 return nil
286289 }
290+
291+ return func (ctx context.Context ) error {
292+ var err error
293+ for {
294+ select {
295+ case <- ctx .Done ():
296+ // Job cancelled (e.g. from schedule modification)
297+ return err
298+ case <- jm .ctx .Done ():
299+ // JM shutting down
300+ return err
301+ default :
302+ }
303+ err = jobFunc (ctx )
304+ now := time .Now ()
305+ if jh , ok := jm .JobHistories .Load (jc .Name ); ok {
306+ if err != nil {
307+ // this includes captured panics (from InvokeRPC).
308+ jh .AddFailure (now )
309+ } else {
310+ jh .AddSuccess (now )
311+ }
312+ }
313+ if ! continuous {
314+ return err
315+ }
316+ }
317+ }
287318}
288319
289320// removeJob removes the job from the scheduler and clears the internal map entry.
@@ -307,32 +338,21 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
307338 return
308339 }
309340
310- var jobType gocron.JobDefinition
311- var jobLimitMode gocron.LimitMode
312- t , err := time .ParseDuration (jc .Schedule )
313- if err != nil {
314- withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
315- jobType = gocron .CronJob (jc .Schedule , withSeconds )
316- jobLimitMode = gocron .LimitModeReschedule
341+ var continuous bool
342+ var jobDefinition gocron.JobDefinition
343+ var jobOptions []gocron.JobOption
344+ if strings .ToLower (jc .Schedule ) == "continuous" {
345+ continuous = true
346+ // used with WithIntervalFromCompletion: if job unexpectedly exits, try to restart later.
347+ // since we capture panics, this is largely unused, but helps reduce scheduler overhead.
348+ jobDefinition = gocron .DurationJob (time .Second * 5 )
349+ jobOptions = append (jobOptions ,
350+ // don't queue up if running
351+ gocron .WithSingletonMode (gocron .LimitModeReschedule ),
352+ gocron .WithIntervalFromCompletion ())
317353 } else {
318- jobType = gocron .DurationJob (t )
319- jobLimitMode = gocron .LimitModeWait
320- }
321-
322- if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
323- jm .JobHistories .Store (jc .Name , & JobHistory {
324- successTimes : ring .New (historyLength ),
325- failureTimes : ring .New (historyLength ),
326- })
327- jm .NumJobHistories .Add (1 )
328- }
329-
330- jobLogger := jm .logger .Sublogger (jc .Name )
331-
332- jobFunc := jm .createJobFunction (jc )
333- j , err := jm .scheduler .NewJob (
334- jobType ,
335- gocron .NewTask (jobFunc ),
354+ // Regular gocron-supported modes:
355+ //
336356 // WithSingletonMode option allows us to perform jobs on the same schedule
337357 // sequentially. This will guarantee that there is only one instance of a particular
338358 // job running at the same time. If a job reaches its schedule while the previous
@@ -361,30 +381,37 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
361381
362382 // It is also important to note that DURATION jobs start relative to when they were
363383 // queued on the job scheduler, while CRON jobs are tied to the physical clock.
364- gocron .WithSingletonMode (jobLimitMode ),
384+ t , err := time .ParseDuration (jc .Schedule )
385+ if err != nil {
386+ // TODO: exit if cron job is also invalid. Currently it's stored as an invalid string and validated at NewJob call.
387+ withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
388+ jobDefinition = gocron .CronJob (jc .Schedule , withSeconds )
389+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeReschedule ))
390+ } else {
391+ jobDefinition = gocron .DurationJob (t )
392+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeWait ))
393+ }
394+ }
395+
396+ jobOptions = append (jobOptions ,
365397 gocron .WithName (jc .Name ),
366- gocron .WithContext (jm .ctx ),
367- gocron .WithEventListeners (
368- // May be slightly more accurate to use j.LastRun(), but we don't have direct reference to it here, and we don't want the job to
369- // complete before we can store the returned Job.
370- gocron .AfterJobRuns (func (jobID uuid.UUID , jobName string ) {
371- now := time .Now ()
372- if jh , ok := jm .JobHistories .Load (jobName ); ok {
373- jh .AddSuccess (now )
374- }
375- }),
376- gocron .AfterJobRunsWithError (func (jobID uuid.UUID , jobName string , err error ) {
377- now := time .Now ()
378- if jh , ok := jm .JobHistories .Load (jobName ); ok {
379- jh .AddFailure (now )
380- }
381- }),
382- gocron .AfterJobRunsWithPanic (func (jobID uuid.UUID , jobName string , recoverData any ) {
383- now := time .Now ()
384- if jh , ok := jm .JobHistories .Load (jobName ); ok {
385- jh .AddFailure (now )
386- }
387- })),
398+ gocron .WithContext (jm .ctx ))
399+
400+ jobLogger := jm .logger .Sublogger (jc .Name )
401+
402+ if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
403+ jm .JobHistories .Store (jc .Name , & JobHistory {
404+ successTimes : ring .New (historyLength ),
405+ failureTimes : ring .New (historyLength ),
406+ })
407+ jm .NumJobHistories .Add (1 )
408+ }
409+
410+ jobFunc := jm .createJobFunction (jc , continuous )
411+ j , err := jm .scheduler .NewJob (
412+ jobDefinition ,
413+ gocron .NewTask (jobFunc ),
414+ jobOptions ... ,
388415 )
389416 if err != nil {
390417 jobLogger .CErrorw (jm .ctx , "Failed to create a new job" , "name" , jc .Name , "error" , err .Error ())
0 commit comments