Skip to content

Commit

Permalink
add better job state tracking to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
dmah42 committed Jun 1, 2022
1 parent af00448 commit 25d8ef5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
12 changes: 11 additions & 1 deletion api/sprinkle/sprinkle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,20 @@ message RunResponse { int64 job_id = 1; }
message JobRequest { int64 id = 1; }

message JobResponse {
enum State {
STATE_UNKNOWN = 0;
STATE_PENDING = 1;
STATE_RUNNING = 3;
STATE_COMPLETE = 4;
}

int64 start_time = 1;
bool exited = 2;
int64 end_time = 5;
State state = 6;
bool success = 3;
RUsage rusage = 4;

reserved 2; // bool exited = 2
}

message JobsRequest {}
Expand Down
19 changes: 14 additions & 5 deletions cmd/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func init() {

type job struct {
start time.Time
end time.Time
// TODO: replace with reference to binary/job.. see golang/groupcache
cmd *exec.Cmd
stdout, stderr string
Expand Down Expand Up @@ -147,6 +148,10 @@ func (s *workerServer) Run(_ context.Context, req *pb.RunRequest) (*pb.RunRespon
j := jobs.jobs[id]
jobs.RUnlock()

if err := j.cmd.Wait(); err != nil {
fmt.Println(err)
}

out, err := ioutil.ReadAll(stdout)
if err != nil {
glog.Error(err)
Expand All @@ -163,12 +168,9 @@ func (s *workerServer) Run(_ context.Context, req *pb.RunRequest) (*pb.RunRespon
j.stderr = string(out)
}

if err := j.cmd.Wait(); err != nil {
fmt.Println(err)
}

glog.Infof("Marking job %d as complete", id)
j.complete = true
j.end = time.Now()

jobs.Lock()
jobs.jobs[id] = j
Expand All @@ -185,11 +187,18 @@ func (s *workerServer) Job(_ context.Context, req *pb.JobRequest) (*pb.JobRespon

resp := &pb.JobResponse{
StartTime: job.start.Unix(),
State: pb.JobResponse_STATE_UNKNOWN,
}
if job.cmd.ProcessState != nil {
resp.Exited = job.cmd.ProcessState.Exited()
resp.Success = job.cmd.ProcessState.Success()

if job.cmd.ProcessState.Exited() {
resp.EndTime = job.end.Unix()
resp.State = pb.JobResponse_STATE_COMPLETE
} else {
resp.State = pb.JobResponse_STATE_RUNNING
}

su := job.cmd.ProcessState.SysUsage().(*syscall.Rusage)
if su != nil {
resp.Rusage = &pb.RUsage{
Expand Down

0 comments on commit 25d8ef5

Please sign in to comment.