@@ -206,12 +206,14 @@ func (jm *JobManager) createDescriptorSourceAndgRPCMethod(
206206}
207207
208208// createJobFunction returns a function that the job scheduler puts on its queue.
209- func (jm * JobManager ) createJobFunction (jc config.JobConfig ) func (ctx context.Context ) error {
209+ func (jm * JobManager ) createJobFunction (jc config.JobConfig , continuous bool ) func (ctx context.Context ) error {
210210 jobLogger := jm .logger .Sublogger (jc .Name )
211211 // To support logging for quick jobs (~ on the seconds schedule), we disable log
212212 // deduplication for job loggers.
213213 jobLogger .NeverDeduplicate ()
214- return func (ctx context.Context ) error {
214+
215+ // using jm.ctx so we interrupt only if JM is shutting down. When changing schedule, let existing jobs complete instead of interrupting.
216+ jobFunc := func (_ context.Context ) error {
215217 res , err := jm .getResource (jc .Resource )
216218 if err != nil {
217219 jobLogger .CWarnw (jm .ctx , "Could not get resource" , "error" , err .Error ())
@@ -272,6 +274,7 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
272274 jobLogger .CWarnw (jm .ctx , "Job failed" , "name" , jc .Name , "error" , err .Error ())
273275 return err
274276 } else if h .Status != nil && h .Status .Err () != nil {
277+ // if job panics, it seems to be captured here.
275278 jobLogger .CWarnw (jm .ctx , "Job failed" , "name" , jc .Name , "error" , h .Status .Err ())
276279 return h .Status .Err ()
277280 }
@@ -286,6 +289,34 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func(ctx context.Co
286289 //jm.logger.Infof("done ctx=%v, jmctx=%v", ctx.Err(), jm.ctx.Err())
287290 return nil
288291 }
292+
293+ return func (ctx context.Context ) error {
294+ var err error
295+ for {
296+ select {
297+ case <- ctx .Done ():
298+ // Job cancelled (e.g. from schedule modification)
299+ return err
300+ case <- jm .ctx .Done ():
301+ // JM shutting down
302+ return err
303+ default :
304+ }
305+ err = jobFunc (ctx )
306+ now := timestamppb .Now ()
307+ if jh , ok := jm .JobHistories .Load (jc .Name ); ok {
308+ if err != nil {
309+ // this includes captured panics (from InvokeRPC).
310+ jh .AddFailure (now )
311+ } else {
312+ jh .AddSuccess (now )
313+ }
314+ }
315+ if ! continuous {
316+ return err
317+ }
318+ }
319+ }
289320}
290321
291322// removeJob removes the job from the scheduler and clears the internal map entry.
@@ -309,32 +340,21 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
309340 return
310341 }
311342
312- var jobType gocron.JobDefinition
313- var jobLimitMode gocron.LimitMode
314- t , err := time .ParseDuration (jc .Schedule )
315- if err != nil {
316- withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
317- jobType = gocron .CronJob (jc .Schedule , withSeconds )
318- jobLimitMode = gocron .LimitModeReschedule
343+ var continuous bool
344+ var jobDefinition gocron.JobDefinition
345+ var jobOptions []gocron.JobOption
346+ if strings .ToLower (jc .Schedule ) == "continuous" {
347+ continuous = true
348+ // used with WithIntervalFromCompletion: if job unexpectedly exits, try to restart later.
349+ // since we capture panics, this is largely unused, but helps reduce scheduler overhead.
350+ jobDefinition = gocron .DurationJob (time .Second * 5 )
351+ jobOptions = append (jobOptions ,
352+ // don't queue up if running
353+ gocron .WithSingletonMode (gocron .LimitModeReschedule ),
354+ gocron .WithIntervalFromCompletion ())
319355 } else {
320- jobType = gocron .DurationJob (t )
321- jobLimitMode = gocron .LimitModeWait
322- }
323-
324- if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
325- jm .JobHistories .Store (jc .Name , & JobHistory {
326- successTimes : ring .New (historyLength ),
327- failureTimes : ring .New (historyLength ),
328- })
329- jm .NumJobHistories .Add (1 )
330- }
331-
332- jobLogger := jm .logger .Sublogger (jc .Name )
333-
334- jobFunc := jm .createJobFunction (jc )
335- j , err := jm .scheduler .NewJob (
336- jobType ,
337- gocron .NewTask (jobFunc ),
356+ // Regular gocron-supported modes:
357+ //
338358 // WithSingletonMode option allows us to perform jobs on the same schedule
339359 // sequentially. This will guarantee that there is only one instance of a particular
340360 // job running at the same time. If a job reaches its schedule while the previous
@@ -363,30 +383,38 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
363383
364384 // It is also important to note that DURATION jobs start relative to when they were
365385 // queued on the job scheduler, while CRON jobs are tied to the physical clock.
366- gocron .WithSingletonMode (jobLimitMode ),
386+ t , err := time .ParseDuration (jc .Schedule )
387+ if err != nil {
388+ // TODO: exit if cron job is also invalid. Currently it's stored as an invalid string and validated at NewJob call.
389+ withSeconds := len (strings .Split (jc .Schedule , " " )) >= 6
390+ jobDefinition = gocron .CronJob (jc .Schedule , withSeconds )
391+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeReschedule ))
392+
393+ } else {
394+ jobDefinition = gocron .DurationJob (t )
395+ jobOptions = append (jobOptions , gocron .WithSingletonMode (gocron .LimitModeWait ))
396+ }
397+ }
398+
399+ jobOptions = append (jobOptions ,
367400 gocron .WithName (jc .Name ),
368- gocron .WithContext (jm .ctx ),
369- gocron .WithEventListeners (
370- // May be slightly more accurate to use j.LastRun(), but we don't have direct reference to it here, and we don't want the job to
371- // complete before we can store the returned Job.
372- gocron .AfterJobRuns (func (jobID uuid.UUID , jobName string ) {
373- now := timestamppb .Now ()
374- if jh , ok := jm .JobHistories .Load (jobName ); ok {
375- jh .AddSuccess (now )
376- }
377- }),
378- gocron .AfterJobRunsWithError (func (jobID uuid.UUID , jobName string , err error ) {
379- now := timestamppb .Now ()
380- if jh , ok := jm .JobHistories .Load (jobName ); ok {
381- jh .AddFailure (now )
382- }
383- }),
384- gocron .AfterJobRunsWithPanic (func (jobID uuid.UUID , jobName string , recoverData any ) {
385- now := timestamppb .Now ()
386- if jh , ok := jm .JobHistories .Load (jobName ); ok {
387- jh .AddFailure (now )
388- }
389- })),
401+ gocron .WithContext (jm .ctx ))
402+
403+ jobLogger := jm .logger .Sublogger (jc .Name )
404+
405+ if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
406+ jm .JobHistories .Store (jc .Name , & JobHistory {
407+ successTimes : ring .New (historyLength ),
408+ failureTimes : ring .New (historyLength ),
409+ })
410+ jm .NumJobHistories .Add (1 )
411+ }
412+
413+ jobFunc := jm .createJobFunction (jc , continuous )
414+ j , err := jm .scheduler .NewJob (
415+ jobDefinition ,
416+ gocron .NewTask (jobFunc ),
417+ jobOptions ... ,
390418 )
391419 if err != nil {
392420 jobLogger .CErrorw (jm .ctx , "Failed to create a new job" , "name" , jc .Name , "error" , err .Error ())
0 commit comments