Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions go/example/task/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,5 @@ func main() {
tasks.MustRegister(square)
tasks.MustRegister(addSquares)

err := tasks.Start()
if err != nil {
panic(err)
}
tasks.Start()
}
10 changes: 8 additions & 2 deletions go/pkg/tasks/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ type Config struct {
SocketPath string `split_words:"true" required:"true"`
}

func Start() error {
func Start() {
if err := StartE(); err != nil {
panic(err)
}
}

func StartE() error {
ctx := context.Background()

var cfg Config
Expand All @@ -24,6 +30,6 @@ func Start() error {
case "register":
return Register(ctx, cfg.SocketPath)
default:
return fmt.Errorf("unrecognized mode: %s", cfg.Mode)
return fmt.Errorf("unrecognized RENDER_SDK_MODE %q", cfg.Mode)
}
}
57 changes: 30 additions & 27 deletions go/pkg/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,46 +53,49 @@ type TaskContext = task.TaskContext
type Options = task.Options
type Retry = task.Retry

func Run(ctx context.Context, unixSocketPath string) (_err error) {
// Run is the entrypoint for executing a task. It initiates the callback client,
// and ensures that errors and panics are reported correctly
func Run(ctx context.Context, unixSocketPath string) error {
callbackerClient, err := uds.NewCallbackClient(unixSocketPath)
if err != nil {
return fmt.Errorf("failed to create callbacker: %w", err)
return fmt.Errorf("failed to create callback client at %s: %w", unixSocketPath, err)
}
if taskErr := executeTaskWithRecovery(ctx, callbackerClient); taskErr != nil {
resp, err := callbackerClient.PostCallbackWithResponse(ctx, callbackapi.PostCallbackJSONRequestBody{
Error: taskErr,
})
if err != nil || (resp != nil && resp.StatusCode() != 200) {
return fmt.Errorf("failed to report error to callback client; original error is %s; error from callback is %w", taskErr.Details, err)
}
}
return nil
}

// executeTaskWithRecovery calls executeTask, and wraps the error returned
// (if any) in a *callbackapi.TaskError. If executeTask panics, then the panic
// message and the stack trace get passed upwards.
func executeTaskWithRecovery(ctx context.Context, callbackerClient *callbackapi.ClientWithResponses) (_taskErr *callbackapi.TaskError) {
defer func() {
var callbackTaskErr *callbackapi.TaskError

if r := recover(); r != nil {
stackTrace := string(debug.Stack())
callbackTaskErr = &callbackapi.TaskError{
_taskErr = &callbackapi.TaskError{
Details: fmt.Sprintf("task panicked: %v", r),
StackTrace: &stackTrace,
}
} else if _err != nil {
callbackTaskErr = &callbackapi.TaskError{
Details: fmt.Sprintf("task failed: %v", _err),
}
_err = nil
} else {
// No error, nothing to do
return
}
}()

resp, callbackErr := callbackerClient.PostCallbackWithResponse(ctx, callbackapi.PostCallbackJSONRequestBody{
Error: callbackTaskErr,
})
// todo better handle errs?
if callbackErr != nil {
_err = callbackErr
return
if err := executeTask(ctx, callbackerClient); err != nil {
return &callbackapi.TaskError{
Details: fmt.Sprintf("task failed: %v", err),
}
if resp.StatusCode() != 200 {
_err = fmt.Errorf("callback failed with status code %d", resp.StatusCode())
return
}
}()
}
return nil
}

executor := executor.NewExecutor(taskSingleton, callbackerClient)
// executeTask executes a single task, returning idiomatic errors.
func executeTask(ctx context.Context, callbackerClient *callbackapi.ClientWithResponses) error {
execer := executor.NewExecutor(taskSingleton, callbackerClient)

inputResp, err := callbackerClient.GetInputWithResponse(ctx)
if err != nil {
Expand All @@ -110,7 +113,7 @@ func Run(ctx context.Context, unixSocketPath string) (_err error) {
}

// We use this to avoid idempotency checks by the server adapter
err = executor.Execute(ctx, taskName, input...)
err = execer.Execute(ctx, taskName, input...)
if err != nil {
return err
}
Expand Down