Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
35beba9
add JobManager() to Robot interface
aldenh-viam Nov 14, 2025
b13884f
jobFunc: return err instead of only logging
aldenh-viam Nov 18, 2025
98f67fe
basic history tracker
aldenh-viam Nov 18, 2025
4fa3333
GetMachineStatus: include job histories
aldenh-viam Nov 18, 2025
eaad9a5
add JobStatuses to MachineStatus
aldenh-viam Nov 19, 2025
e25e16a
pass ctx to job (auto-cancel waiting jobs on shutdown, instead of err…
aldenh-viam Nov 18, 2025
ca0501d
add job history test
aldenh-viam Nov 19, 2025
503e408
retreive job histories in MachineStatus instead of GetMachineStatus; …
aldenh-viam Nov 20, 2025
93f3154
test flaky job; test panic capture
aldenh-viam Nov 21, 2025
0f355cd
lower job succeeded & triggered logs to debug; renamed triggered to a…
aldenh-viam Nov 18, 2025
834ee04
support continuous schedule
aldenh-viam Nov 18, 2025
668f3dd
support "log_configuration" levels for jobs
aldenh-viam Nov 18, 2025
ecba4ff
add continuous mode test
aldenh-viam Nov 19, 2025
df48add
add log level switching test
aldenh-viam Nov 19, 2025
cdae0a8
add LogConfiguration proto conversions test
aldenh-viam Nov 20, 2025
99fb6b5
revert "Job triggered" -> "Job added" message
aldenh-viam Nov 21, 2025
38d29f4
add note about DoCommand panic
aldenh-viam Nov 24, 2025
baba43f
Merge branch 'main' into jobcontinuous
aldenh-viam Nov 24, 2025
073552e
go mod tidy
aldenh-viam Nov 24, 2025
84ad603
faster test
aldenh-viam Nov 24, 2025
07e47a0
add ticket for TODO
aldenh-viam Nov 25, 2025
44edcc1
Merge branch 'main' into jobcontinuous
aldenh-viam Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,11 +1275,12 @@ type JobConfig struct {

// JobConfigData is the job config data that gets marshaled/unmarshaled.
type JobConfigData struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
Resource string `json:"resource"`
Method string `json:"method"`
Command map[string]any `json:"command,omitempty"`
Name string `json:"name"`
Schedule string `json:"schedule"`
Resource string `json:"resource"`
Method string `json:"method"`
Command map[string]any `json:"command,omitempty"`
LogConfiguration *resource.LogConfig `json:"log_configuration,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since LogConfig's are now shared between jobs and resources, maybe resource is no longer the correct package.

}

// MarshalJSON marshals out this config.
Expand Down
12 changes: 12 additions & 0 deletions config/proto_conversions.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there are tests associated with this file; worth adding a test there for roundtripping.

Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,9 @@ func JobsConfigToProto(jc *JobConfig) (*pb.JobConfig, error) {
}
protoConfig.Command = command
}
if jc.LogConfiguration != nil {
protoConfig.LogConfiguration = &pb.LogConfiguration{Level: strings.ToLower(jc.LogConfiguration.Level.String())}
}
Comment on lines +1036 to +1037
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

App will need an RDK bump (containing API bump) after this PR is merged for the web editor to support LogConfiguration.

return protoConfig, nil
}

Expand All @@ -1049,5 +1052,14 @@ func JobsConfigFromProto(proto *pb.JobConfig, _ logging.Logger) (*JobConfig, err
if proto.Command != nil {
jobConfig.Command = proto.Command.AsMap()
}
if proto.LogConfiguration != nil {
if proto.GetLogConfiguration() != nil {
level, err := logging.LevelFromString(proto.GetLogConfiguration().Level)
if err != nil {
level = logging.INFO
}
jobConfig.LogConfiguration = &resource.LogConfig{Level: level}
}
}
return jobConfig, nil
}
5 changes: 5 additions & 0 deletions config/proto_conversions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,12 +1106,17 @@ func TestJobsConfigProtoConversions(t *testing.T) {
Command: map[string]any{
"arg1": true,
},
LogConfiguration: &resource.LogConfig{
Level: logging.DEBUG,
},
},
}

proto, err = JobsConfigToProto(&testJobConfigCommand)
test.That(t, err, test.ShouldBeNil)
test.That(t, proto.LogConfiguration.Level, test.ShouldEqual, "debug")
out, err = JobsConfigFromProto(proto, logger)
test.That(t, err, test.ShouldBeNil)
test.That(t, out.LogConfiguration.Level, test.ShouldEqual, logging.DEBUG)
test.That(t, *out, test.ShouldResemble, testJobConfigCommand)
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/go-audio/audio v1.0.0
github.com/go-audio/transforms v0.0.0-20180121090939-51830ccc35a5
github.com/go-audio/wav v1.1.0
github.com/go-co-op/gocron/v2 v2.16.2
github.com/go-co-op/gocron/v2 v2.18.0
github.com/go-git/go-git/v5 v5.16.2
github.com/go-gl/mathgl v1.0.0
github.com/go-nlopt/nlopt v0.0.0-20230219125344-443d3362dcb5
Expand Down Expand Up @@ -84,7 +84,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
go.viam.com/api v0.1.490
go.viam.com/api v0.1.493
go.viam.com/test v1.2.4
go.viam.com/utils v0.3.1
goji.io v2.0.2+incompatible
Expand Down Expand Up @@ -274,7 +274,7 @@ require (
github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
github.com/srikrsna/protoc-gen-gotag v0.6.2 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/stretchr/testify v1.11.1 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ github.com/go-audio/wav v1.1.0 h1:jQgLtbqBzY7G+BM8fXF7AHUk1uHUviWS4X39d5rsL2g=
github.com/go-audio/wav v1.1.0/go.mod h1:mpe9qfwbScEbkd8uybLuIpTgHyrISw/OTuvjUW2iGtE=
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/go-co-op/gocron/v2 v2.16.2 h1:r08P663ikXiulLT9XaabkLypL/W9MoCIbqgQoAutyX4=
github.com/go-co-op/gocron/v2 v2.16.2/go.mod h1:4YTLGCCAH75A5RlQ6q+h+VacO7CgjkgP0EJ+BEOXRSI=
github.com/go-co-op/gocron/v2 v2.18.0 h1:DS3Uhru66q1jy/5f9V0itmi3cLXcn2b7N+duGfgT7gU=
github.com/go-co-op/gocron/v2 v2.18.0/go.mod h1:Zii6he+Zfgy5W9B+JKk/KwejFOW0kZTFvHtwIpR4aBI=
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI=
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic=
github.com/go-git/go-billy/v5 v5.6.2 h1:6Q86EsPXMa7c3YZ3aLAQsMA0VlWmy43r6FHqa/UNbRM=
Expand Down Expand Up @@ -972,8 +972,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
Expand Down Expand Up @@ -1106,8 +1106,8 @@ go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.viam.com/api v0.1.490 h1:4ARXkNJOcsHFukj/LH+kDn94nClSMLTjTlXBb+S8X+4=
go.viam.com/api v0.1.490/go.mod h1:p/am76zx8SZ74V/F4rEAYQIpHaaLUwJgY2q3Uw3FIWk=
go.viam.com/api v0.1.493 h1:VlOaSQkV2TYGecPHCygSs66vdojBDX3otLCGpLHRM9o=
go.viam.com/api v0.1.493/go.mod h1:p/am76zx8SZ74V/F4rEAYQIpHaaLUwJgY2q3Uw3FIWk=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.3.1 h1:Px0I6PzOAecXHfa4xGv1Z9dtO1214fVmtMMABLANZDc=
Expand Down
199 changes: 199 additions & 0 deletions robot/impl/jobmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/pion/mediadevices/pkg/prop"
"go.uber.org/zap/zapcore"
"go.viam.com/test"
"go.viam.com/utils"
"go.viam.com/utils/testutils"
Expand Down Expand Up @@ -71,6 +72,91 @@ func TestJobManagerDurationAndCronFromJson(t *testing.T) {
})
}

func TestLogLevelChange(t *testing.T) {
// This is created at debug level
logger, logs := logging.NewObservedTestLogger(t)

fakeSensorComponent := []resource.Config{
{
Model: resource.DefaultModelFamily.WithModel("fake"),
Name: "sensor",
API: sensor.API,
},
}

cfg := &config.Config{
Components: fakeSensorComponent,
Jobs: []config.JobConfig{
{
config.JobConfigData{
Name: "fake sensor",
Schedule: "1s",
Resource: "sensor",
Method: "GetReadings",
},
},
},
}
cfgWarn := &config.Config{
Components: fakeSensorComponent,
Jobs: []config.JobConfig{
{
config.JobConfigData{
Name: "fake sensor",
Schedule: "1s",
Resource: "sensor",
Method: "GetReadings",
LogConfiguration: &resource.LogConfig{
Level: logging.WARN,
},
},
},
},
}
cfgDebug := &config.Config{
Components: fakeSensorComponent,
Jobs: []config.JobConfig{
{
config.JobConfigData{
Name: "fake sensor",
Schedule: "1s",
Resource: "sensor",
Method: "GetReadings",
LogConfiguration: &resource.LogConfig{
Level: logging.DEBUG,
},
},
},
},
}
ctx, ctxCancelFunc := context.WithCancel(context.Background())
defer ctxCancelFunc()
lr := setupLocalRobot(t, ctx, cfg, logger)

time.Sleep(7 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any way to remove sleeps here?

test.That(t, logs.FilterMessage("Job triggered").FilterLevelExact(zapcore.DebugLevel).Len(),
test.ShouldBeGreaterThan, 2)
test.That(t, logs.FilterMessage("Job succeeded").FilterLevelExact(zapcore.DebugLevel).Len(),
test.ShouldBeGreaterThan, 2)

lr.Reconfigure(ctx, cfgWarn)
logs.TakeAll()
time.Sleep(7 * time.Second)
// update will let the previous job iteration complete first, so may be 1 or 0.
test.That(t, logs.FilterMessage("Job triggered").FilterLevelExact(zapcore.DebugLevel).Len(),
test.ShouldBeLessThanOrEqualTo, 1)
test.That(t, logs.FilterMessage("Job succeeded").FilterLevelExact(zapcore.DebugLevel).Len(),
test.ShouldBeLessThanOrEqualTo, 1)

lr.Reconfigure(ctx, cfgDebug)
logs.TakeAll()
time.Sleep(7 * time.Second)
test.That(t, logs.FilterMessage("Job triggered").FilterLevelExact(zapcore.DebugLevel).Len(),
test.ShouldBeGreaterThan, 2)
test.That(t, logs.FilterMessage("Job succeeded").FilterLevelExact(zapcore.DebugLevel).Len(),
test.ShouldBeGreaterThan, 2)
}

func TestJobManagerHistory(t *testing.T) {
logger := logging.NewTestLogger(t)

Expand Down Expand Up @@ -228,6 +314,119 @@ func TestJobManagerHistory(t *testing.T) {
})
}

// Test continuous mode, include switching to and from.
func TestJobContinuousSchedule(t *testing.T) {
logger := logging.NewTestLogger(t)

fakeSensorComponent := []resource.Config{
{
Model: resource.DefaultModelFamily.WithModel("fake"),
Name: "sensor",
API: sensor.API,
},
}

cfg := &config.Config{
Components: fakeSensorComponent,
Jobs: []config.JobConfig{
{
config.JobConfigData{
Name: "fake sensor",
Schedule: "1s",
Resource: "sensor",
Method: "GetReadings",
},
},
},
}
cfgCron := &config.Config{
Components: fakeSensorComponent,
Jobs: []config.JobConfig{
{
config.JobConfigData{
Name: "fake sensor",
Schedule: "*/1 * * * * *",
Resource: "sensor",
Method: "GetReadings",
},
},
},
}
cfgContinuous := &config.Config{
Components: fakeSensorComponent,
Jobs: []config.JobConfig{
{
config.JobConfigData{
Name: "fake sensor",
Schedule: "continuous",
Resource: "sensor",
Method: "GetReadings",
},
},
},
}
ctx, ctxCancelFunc := context.WithCancel(context.Background())
defer ctxCancelFunc()
lr := setupLocalRobot(t, ctx, cfg, logger)
o, _, addr := robottestutils.CreateBaseOptionsAndListener(t)
err := lr.StartWeb(ctx, o)
test.That(t, err, test.ShouldBeNil)
robotClient, err := rclient.New(ctx, addr, logger)
test.That(t, err, test.ShouldBeNil)
defer robotClient.Close(ctx)

// Start running in 1s duration mode. Expect last 2 latest success timestamps differ by > 900ms
testutils.WaitForAssertionWithSleep(t, time.Second, 10, func(tb testing.TB) {
tb.Helper()
ms, err := robotClient.MachineStatus(ctx)
test.That(tb, err, test.ShouldBeNil)
test.That(tb, len(ms.JobStatuses), test.ShouldEqual, 1)
jh, ok := ms.JobStatuses["fake sensor"]
test.That(tb, ok, test.ShouldBeTrue)
successes := jh.RecentSuccessfulRuns
test.That(tb, len(successes), test.ShouldBeGreaterThanOrEqualTo, 2)
if len(successes) >= 2 {
test.That(tb, successes[len(successes)-1].Sub(successes[len(successes)-2]), test.ShouldBeGreaterThan, 900*time.Millisecond)
}
})

// Switch from duration to continuous. Should run more than 10x. Expect latest success timestamp - earliest < 900ms
lr.Reconfigure(ctx, cfgContinuous)
testutils.WaitForAssertionWithSleep(t, time.Second, 10, func(tb testing.TB) {
tb.Helper()
ms, err := robotClient.MachineStatus(ctx)
test.That(tb, err, test.ShouldBeNil)
test.That(tb, len(ms.JobStatuses), test.ShouldEqual, 1)
jh, ok := ms.JobStatuses["fake sensor"]
test.That(tb, ok, test.ShouldBeTrue)
successes := jh.RecentSuccessfulRuns
test.That(tb, ok, test.ShouldBeTrue)
// increase this if bumping history size
test.That(tb, len(successes), test.ShouldBeGreaterThanOrEqualTo, 10)
if len(successes) >= 2 {
test.That(tb, successes[len(successes)-1].Sub(successes[0]), test.ShouldBeLessThan, 900*time.Millisecond)
}
})

// Switch from continuous to 1s cron. Expect last 2 latest success timestamps differ by > 900ms
lr.Reconfigure(ctx, cfgCron)
testutils.WaitForAssertionWithSleep(t, time.Second, 10, func(tb testing.TB) {
tb.Helper()
ms, err := robotClient.MachineStatus(ctx)
test.That(tb, err, test.ShouldBeNil)
test.That(tb, len(ms.JobStatuses), test.ShouldEqual, 1)
jh, ok := ms.JobStatuses["fake sensor"]
test.That(tb, ok, test.ShouldBeTrue)
successes := jh.RecentSuccessfulRuns
test.That(tb, ok, test.ShouldBeTrue)
// History still contains runs from prev. If stored size is 10, we still expect 10 here.
test.That(tb, len(successes), test.ShouldBeGreaterThanOrEqualTo, 10)
if len(successes) >= 2 {
test.That(tb, successes[len(successes)-1].Sub(successes[len(successes)-2]), test.ShouldBeGreaterThan, 900*time.Millisecond)
}
})
}

func TestJobManagerConfigChanges(t *testing.T) {
logger := logging.NewTestLogger(t)
model := resource.DefaultModelFamily.WithModel(utils.RandomAlphaString(8))
Expand Down
Loading
Loading