Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New internals #534

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions internal/providers/core/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package core

import (
"context"
"fmt"
)

type ResourceContext interface {
context.Context

NewController(ctx context.Context, typ string) (ResourceController, error)
GetResourceType(ctx context.Context, iri string) (typ string, err error)

BeginCreate(ctx context.Context, typ string, spec string) (opID string, err error)
EndCreate(ctx context.Context, opID string, iri string, err error) error

BeginUpdate(ctx context.Context, iri string, patch string) (opID string, err error)
EndUpdate(ctx context.Context, opID string, err error) error

BeginDelete(ctx context.Context, iri string) (opID string, err error)
EndDelete(ctx context.Context, opID string, err error) error
}

type Resource struct {
ctrl ResourceController
iri string
}

type ResourceController interface {
Create(ctx context.Context, spec string) (iri string, err error)
LoadIRI(iri string) error
Exists(ctx context.Context) (bool, error)
Update(ctx context.Context, patch string) error
Delete(ctx context.Context) error
}

func CreateResource(ctx ResourceContext, typ string, spec string) (res *Resource, err error) {
ctrl, err := ctx.NewController(ctx, typ)
if err != nil {
return nil, fmt.Errorf("creating %q controller: %w", typ, err)
}
res = &Resource{
ctrl: ctrl,
}

opID, beginErr := ctx.BeginCreate(ctx, typ, spec)
if beginErr != nil {
return nil, fmt.Errorf("beginning: %w", beginErr)
}
defer func() {
if endErr := ctx.EndCreate(ctx, opID, res.iri, err); endErr != nil {
if err == nil {
err = fmt.Errorf("ending: %w", endErr)
}
}
}()

res.iri, err = res.ctrl.Create(ctx, spec)
return
}

func GetResource(ctx ResourceContext, iri string) (*Resource, error) {
typ, err := ctx.GetResourceType(ctx, iri)
if err != nil {
return nil, fmt.Errorf("getting resource type: %w", err)
}

ctrl, err := ctx.NewController(ctx, typ)
if err != nil {
return nil, fmt.Errorf("creating %q controller: %w", typ, err)
}

res := &Resource{
iri: iri,
ctrl: ctrl,
}
return res, nil
}

func (res *Resource) Update(ctx ResourceContext, spec string) (err error) {
opID, beginErr := ctx.BeginUpdate(ctx, res.iri, spec)
if beginErr != nil {
return fmt.Errorf("beginning: %w", beginErr)
}
defer func() {
if endErr := ctx.EndUpdate(ctx, opID, err); endErr != nil {
if err == nil {
err = fmt.Errorf("ending: %w", endErr)
}
}
}()

err = res.ctrl.Update(ctx, spec)
return
}

func (res *Resource) Delete(ctx ResourceContext) (err error) {
opID, beginErr := ctx.BeginDelete(ctx, res.iri)
if beginErr != nil {
return fmt.Errorf("beginning: %w", beginErr)
}
defer func() {
if endErr := ctx.EndDelete(ctx, opID, err); endErr != nil {
if err == nil {
err = fmt.Errorf("ending: %w", endErr)
}
}
}()

err = res.ctrl.Delete(ctx)
return
}
133 changes: 133 additions & 0 deletions internal/providers/unix/process/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package process

import (
"context"
"errors"
"fmt"
"os"
"sort"
"strconv"
"strings"
"syscall"

"github.com/deref/exo/internal/util/jsonutil"
"github.com/deref/exo/internal/util/osutil"
)

type Resource struct {
Pid int
}

type Start struct {
// Absolute path to working directory.
Directory string `json:"directory"`
// Absolute path to program to execute.
Program string `json:"program"`
// Command line arguments.
Arguments []string `json:"arguments"`
// Complete environment given to the process.
Environment map[string]string `json:"environment"`
// File paths to attach. First three are generally stdin, stdout, and stderr.
Files []OpenFile `json:"files"`
}

type OpenFile struct {
Path string `json:"path"`
Flag int `json:"flag"`
Perm int `json:"perm"`
}

func (res *Resource) Create(ctx context.Context, spec string) (iri string, err error) {
var start Start
if err := jsonutil.UnmarshalString(spec, &start); err != nil {
return "", fmt.Errorf("unmarshalling spec: %w", err)
}

argv := append([]string{start.Program}, start.Arguments...)

envKeys := make([]string, 0, len(start.Environment))
for k := range start.Environment {
envKeys = append(envKeys, k)
}
sort.Strings(envKeys)
env := make([]string, len(start.Environment))
for i, k := range envKeys {
env[i] = fmt.Sprintf("%s=%s", k, start.Environment[k])
}

files := make([]*os.File, len(start.Files))
for fd, startFile := range start.Files {
f, err := os.OpenFile(startFile.Path, startFile.Flag, os.FileMode(startFile.Perm))
if err != nil {
return "", fmt.Errorf("opening fd %d: %w", fd, err)
}
files[fd] = f
}

proc, err := os.StartProcess(argv[0], argv, &os.ProcAttr{
Dir: start.Directory,
Env: env,
Files: files,
Sys: &syscall.SysProcAttr{
Setsid: true,
},
})
if err != nil {
return "", err
}

pid := proc.Pid
iri = MakeIRI(pid)
return iri, nil
}

func (res *Resource) LoadIRI(ctx context.Context, iri string) error {
pid, err := ParseIRI(iri)
if err != nil {
return fmt.Errorf("invalid process iri: %w", err)
}
res.Pid = pid
return nil
}

func MakeIRI(pid int) string {
return fmt.Sprintf("exo:unix:process/%d", pid) // XXX uri scheme ok here?
}

func ParseIRI(iri string) (pid int, err error) {
parts := strings.SplitN(iri, "/", 2)
if len(parts) != 2 {
return 0, fmt.Errorf("expected /")
}
pid64, err := strconv.ParseInt(parts[1], 10, 32)
return int(pid64), err
}

func (res *Resource) Exists(ctx context.Context) (bool, error) {
return osutil.IsValidPid(res.Pid), nil
}

func (res *Resource) Update(ctx context.Context, patch string) error {
return errors.New("unsupported patch")
}

func (res *Resource) Delete(ctx context.Context) error {
if err := osutil.SignalGroup(res.Pid, os.Interrupt); err != nil {
return fmt.Errorf("interrupting: %w", err)
}

ok := make(chan struct{})
go func() {
select {
case <-ctx.Done():
_ = osutil.KillGroup(res.Pid)
case <-ok:
}
}()

if _, err := osutil.WaitProcess(res.Pid); err != nil {
return fmt.Errorf("waiting: %w", err)
}
close(ok)
return nil
}