@@ -206,12 +206,14 @@ func (jm *JobManager) createDescriptorSourceAndgRPCMethod(
206206}
207207
208208// createJobFunction returns a function that the job scheduler puts on its queue.
209- func (jm * JobManager ) createJobFunction (jc config.JobConfig ) func (ctx context.Context ) error {
209+ func (jm * JobManager ) createJobFunction (jc config.JobConfig , continuous bool ) func (ctx context.Context ) error {
210210 jobLogger := jm .logger .Sublogger (jc .Name )
211211 // To support logging for quick jobs (~ on the seconds schedule), we disable log
212212 // deduplication for job loggers.
213213 jobLogger .NeverDeduplicate ()
214- return func (ctx context.Context ) error {
214+
215+ // using jm.ctx so we interrupt only if JM is shutting down. When changing schedule, let existing jobs complete instead of interrupting.
216+ jobFunc := func (_ context.Context ) error {
215217 res , err := jm .getResource (jc .Resource )
216218 if err != nil {
217219 jobLogger .CWarnw (jm .ctx , "Could not get resource" , "error" , err .Error ())
@@ -272,6 +274,7 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
272274 jobLogger .CWarnw (jm .ctx , "Job failed" , "name" , jc .Name , "error" , err .Error ())
273275 return err
274276 } else if h .Status != nil && h .Status .Err () != nil {
277+ // if job panics, it seems to be captured here.
275278 jobLogger .CWarnw (jm .ctx , "Job failed" , "name" , jc .Name , "error" , h .Status .Err ())
276279 return h .Status .Err ()
277280 }
@@ -285,6 +288,34 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
285288 jobLogger .CDebugw (jm .ctx , "Job succeeded" , "name" , jc .Name , "response" , response )
286289 return nil
287290 }
291+
292+ return func (ctx context.Context ) error {
293+ var err error
294+ for {
295+ select {
296+ case <- ctx .Done ():
297+ // Job cancelled (e.g. from schedule modification)
298+ return err
299+ case <- jm .ctx .Done ():
300+ // JM shutting down
301+ return err
302+ default :
303+ }
304+ err = jobFunc (ctx )
305+ now := timestamppb .Now ()
306+ if jh , ok := jm .JobHistories .Load (jc .Name ); ok {
307+ if err != nil {
308+ // this includes captured panics (from InvokeRPC).
309+ jh .AddFailure (now )
310+ } else {
311+ jh .AddSuccess (now )
312+ }
313+ }
314+ if ! continuous {
315+ return err
316+ }
317+ }
318+ }
288319}
289320
290321// removeJob removes the job from the scheduler and clears the internal map entry.
@@ -308,32 +339,21 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
308339 return
309340 }
310341
311- var jobType gocron.JobDefinition
312- var jobLimitMode gocron.LimitMode
313- t , err := time .ParseDuration (jc .Schedule )
314- if err != nil {
315- withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
316- jobType = gocron .CronJob (jc .Schedule , withSeconds )
317- jobLimitMode = gocron .LimitModeReschedule
342+ var continuous bool
343+ var jobDefinition gocron.JobDefinition
344+ var jobOptions []gocron.JobOption
345+ if strings .ToLower (jc .Schedule ) == "continuous" {
346+ continuous = true
347+ // used with WithIntervalFromCompletion: if job unexpectedly exits, try to restart later.
348+ // since we capture panics, this is largely unused, but helps reduce scheduler overhead.
349+ jobDefinition = gocron .DurationJob (time .Second * 5 )
350+ jobOptions = append (jobOptions ,
351+ // don't queue up if running
352+ gocron .WithSingletonMode (gocron .LimitModeReschedule ),
353+ gocron .WithIntervalFromCompletion ())
318354 } else {
319- jobType = gocron .DurationJob (t )
320- jobLimitMode = gocron .LimitModeWait
321- }
322-
323- if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
324- jm .JobHistories .Store (jc .Name , & JobHistory {
325- successTimes : ring .New (historyLength ),
326- failureTimes : ring .New (historyLength ),
327- })
328- jm .NumJobHistories .Add (1 )
329- }
330-
331- jobLogger := jm .logger .Sublogger (jc .Name )
332-
333- jobFunc := jm .createJobFunction (jc )
334- j , err := jm .scheduler .NewJob (
335- jobType ,
336- gocron .NewTask (jobFunc ),
355+ // Regular gocron-supported modes:
356+ //
337357 // WithSingletonMode option allows us to perform jobs on the same schedule
338358 // sequentially. This will guarantee that there is only one instance of a particular
339359 // job running at the same time. If a job reaches its schedule while the previous
@@ -362,30 +382,37 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
362382
363383 // It is also important to note that DURATION jobs start relative to when they were
364384 // queued on the job scheduler, while CRON jobs are tied to the physical clock.
365- gocron .WithSingletonMode (jobLimitMode ),
385+ t , err := time .ParseDuration (jc .Schedule )
386+ if err != nil {
387+ // TODO: exit if cron job is also invalid. Currently it's stored as an invalid string and validated at NewJob call.
388+ withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
389+ jobDefinition = gocron .CronJob (jc .Schedule , withSeconds )
390+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeReschedule ))
391+ } else {
392+ jobDefinition = gocron .DurationJob (t )
393+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeWait ))
394+ }
395+ }
396+
397+ jobOptions = append (jobOptions ,
366398 gocron .WithName (jc .Name ),
367- gocron .WithContext (jm .ctx ),
368- gocron .WithEventListeners (
369- // May be slightly more accurate to use j.LastRun(), but we don't have direct reference to it here, and we don't want the job to
370- // complete before we can store the returned Job.
371- gocron .AfterJobRuns (func (jobID uuid.UUID , jobName string ) {
372- now := timestamppb .Now ()
373- if jh , ok := jm .JobHistories .Load (jobName ); ok {
374- jh .AddSuccess (now )
375- }
376- }),
377- gocron .AfterJobRunsWithError (func (jobID uuid.UUID , jobName string , err error ) {
378- now := timestamppb .Now ()
379- if jh , ok := jm .JobHistories .Load (jobName ); ok {
380- jh .AddFailure (now )
381- }
382- }),
383- gocron .AfterJobRunsWithPanic (func (jobID uuid.UUID , jobName string , recoverData any ) {
384- now := timestamppb .Now ()
385- if jh , ok := jm .JobHistories .Load (jobName ); ok {
386- jh .AddFailure (now )
387- }
388- })),
399+ gocron .WithContext (jm .ctx ))
400+
401+ jobLogger := jm .logger .Sublogger (jc .Name )
402+
403+ if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
404+ jm .JobHistories .Store (jc .Name , & JobHistory {
405+ successTimes : ring .New (historyLength ),
406+ failureTimes : ring .New (historyLength ),
407+ })
408+ jm .NumJobHistories .Add (1 )
409+ }
410+
411+ jobFunc := jm .createJobFunction (jc , continuous )
412+ j , err := jm .scheduler .NewJob (
413+ jobDefinition ,
414+ gocron .NewTask (jobFunc ),
415+ jobOptions ... ,
389416 )
390417 if err != nil {
391418 jobLogger .CErrorw (jm .ctx , "Failed to create a new job" , "name" , jc .Name , "error" , err .Error ())
0 commit comments