Skip to content

Commit

Permalink
refactor out enequeue all jobs function
Browse files Browse the repository at this point in the history
  • Loading branch information
gaffo committed Jun 10, 2024
1 parent 27fb7fe commit 1982e22
Showing 1 changed file with 54 additions and 45 deletions.
99 changes: 54 additions & 45 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,55 +180,20 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error
go p.execFunc(ctx, r, s, wg)()
}
}

// wgReturn := sync.WaitGroup{}
wg.Add(1)

// Now we gotta kick off all of the states to their correct queue
{
// Lock on R
//mutR.Lock()
//defer mutR.Unlock()
for _, job := range r.Jobs {
// If it's in a terminal state, skip
if p.stateMap[job.State].Terminal {
continue
}

p.sendJob(job)
}
}
p.enqueueAllJobs(r)

// Make a central processor and start it
go func() {
for rtn := range p.returnChan {
j := rtn.Job

// Send the new kicks if any
if rtn.KickRequests != nil {
for _, k := range rtn.KickRequests {
// Add the new job to the state
newJob := Job[JC]{
Id: fmt.Sprintf("%s->%d", j.Id, len(r.Jobs)),
C: k.C,
State: k.State,
}
// Add it to r
r.Jobs[newJob.Id] = newJob

nextState, ok := p.stateMap[newJob.State]
if !ok {
log.Fatal(p.invalidStateError(newJob.State))
}
// If it's terminal, we're done with this job
if !nextState.Terminal {
// We need to get the chan for the next one
nextChan := p.stateChan[nextState.TriggerState]
// Send the job to the next chan
nextChan <- newJob
continue
}
}
}
p.kickJobs(rtn, j, r)

// Update the job
r.Jobs[j.Id] = j
Expand Down Expand Up @@ -284,13 +249,9 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error
if !shutdown {
continue
}
//log.Println("All jobs are terminal state, shutting down")
// close all of the channels
for _, c := range p.stateChan {
close(c)
}
// close ourselves down
close(p.returnChan)

p.shutdown()

break
}
wg.Done()
Expand All @@ -302,6 +263,54 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error
return nil
}

func (p *Processor[AC, OC, JC]) enqueueAllJobs(r *Run[OC, JC]) {
for _, job := range r.Jobs {
// If it's in a terminal state, skip
if p.stateMap[job.State].Terminal {
continue
}

p.sendJob(job)
}
}

func (p *Processor[AC, OC, JC]) shutdown() {
// close all of the channels
for _, c := range p.stateChan {
close(c)
}
// close ourselves down
close(p.returnChan)
}

func (p *Processor[AC, OC, JC]) kickJobs(rtn Return[JC], j Job[JC], r *Run[OC, JC]) {
if rtn.KickRequests != nil {
for _, k := range rtn.KickRequests {
// Add the new job to the state
newJob := Job[JC]{
Id: fmt.Sprintf("%s->%d", j.Id, len(r.Jobs)),
C: k.C,
State: k.State,
}
// Add it to r
r.Jobs[newJob.Id] = newJob

nextState, ok := p.stateMap[newJob.State]
if !ok {
log.Fatal(p.invalidStateError(newJob.State))
}
// If it's terminal, we're done with this job
if !nextState.Terminal {
// We need to get the chan for the next one
nextChan := p.stateChan[nextState.TriggerState]
// Send the job to the next chan
nextChan <- newJob
continue
}
}
}
}

func (p *Processor[AC, OC, JC]) sendJob(job Job[JC]) {
p.stateChan[job.State] <- job
}
Expand Down

0 comments on commit 1982e22

Please sign in to comment.