Skip to content

Commit

Permalink
Improve performance of remote exec in runnerv2 (#724)
Browse files Browse the repository at this point in the history
This PR refactors the `runnerv2service` package to speed up the
performance of executing programs producing large output.

### Test

`cat` a large file and pipe the output to `/dev/null`. As the execution
goes through the server, the execution involves marshaling and
unmarshaling to Protobuf.

#### This PR

```
time ./runme beta run --remote generate | cat - >/dev/null
./runme beta run --remote generate  0.70s user 0.64s system 33% cpu 3.973 total
cat - > /dev/null  0.01s user 0.06s system 1% cpu 3.972 total
```

#### Main

```
time ./runme beta run --remote generate | cat - >/dev/null
./runme beta run --remote generate  5.58s user 6.89s system 146% cpu 8.537 total
cat - > /dev/null  0.21s user 1.04s system 14% cpu 8.536 total
```

We have a speed increase of over 2x.
  • Loading branch information
adambabik authored Jan 21, 2025
1 parent 11c7d9c commit 150494d
Show file tree
Hide file tree
Showing 15 changed files with 711 additions and 707 deletions.
6 changes: 6 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,9 @@ echo "Tag A,B"
```sh {"tag":"a,b,c","id":"01HF7BT3HD84GWTQB8ZY0GBA06","name":"c"}
echo "Tag A,B,C"
```

## Large files

```sh {"name": "generate"}
gunzip --stdout ./internal/command/testdata/users_1m.json.gz
```
12 changes: 7 additions & 5 deletions internal/cmd/beta/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,13 @@ func runCodeBlock(

cfg.Mode = runnerv2.CommandMode_COMMAND_MODE_CLI

execInfo := &rcontext.ExecutionInfo{
KnownName: block.Name(),
KnownID: block.ID(),
}
ctx = rcontext.ContextWithExecutionInfo(ctx, execInfo)
ctx = rcontext.WithExecutionInfo(
ctx,
&rcontext.ExecutionInfo{
KnownName: block.Name(),
KnownID: block.ID(),
},
)

cmd, err := factory.Build(cfg, options)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/owl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,12 +698,12 @@ func (s *Store) LoadEnvs(source string, envs ...string) error {
return nil
}

func (s *Store) Update(context context.Context, newOrUpdated, deleted []string) error {
func (s *Store) Update(ctx context.Context, newOrUpdated, deleted []string) error {
s.mu.Lock()
defer s.mu.Unlock()

execRef := "[execution]"
if execInfo, ok := context.Value(rcontext.ExecutionInfoKey).(*rcontext.ExecutionInfo); ok {
if execInfo, ok := rcontext.ExecutionInfoFromContext(ctx); ok {
execRef = fmt.Sprintf("#%s", execInfo.KnownID)
if execInfo.KnownName != "" {
execRef = fmt.Sprintf("#%s", execInfo.KnownName)
Expand Down
13 changes: 9 additions & 4 deletions internal/runner/context/exec_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package runner

import "context"

type runnerContextKey struct{}
type contextKey struct{ string }

var ExecutionInfoKey = &runnerContextKey{}
var executionInfoKey = &contextKey{"ExecutionInfo"}

type ExecutionInfo struct {
ExecContext string
Expand All @@ -13,6 +13,11 @@ type ExecutionInfo struct {
RunID string
}

func ContextWithExecutionInfo(ctx context.Context, execInfo *ExecutionInfo) context.Context {
return context.WithValue(ctx, ExecutionInfoKey, execInfo)
func WithExecutionInfo(ctx context.Context, execInfo *ExecutionInfo) context.Context {
return context.WithValue(ctx, executionInfoKey, execInfo)
}

func ExecutionInfoFromContext(ctx context.Context) (*ExecutionInfo, bool) {
execInfo, ok := ctx.Value(executionInfoKey).(*ExecutionInfo)
return execInfo, ok
}
5 changes: 2 additions & 3 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error
KnownName: req.GetKnownName(),
KnownID: req.GetKnownId(),
}
ctx := rcontext.ContextWithExecutionInfo(srv.Context(), execInfo)
ctx := rcontext.WithExecutionInfo(srv.Context(), execInfo)

if req.KnownId != "" {
logger = logger.With(zap.String("knownID", req.KnownId))
Expand Down Expand Up @@ -351,9 +351,8 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error
}

cmdCtx := ctx

if req.Background {
cmdCtx = rcontext.ContextWithExecutionInfo(context.Background(), execInfo)
cmdCtx = rcontext.WithExecutionInfo(context.Background(), execInfo)
}

if err := cmd.StartWithOpts(cmdCtx, &startOpts{}); err != nil {
Expand Down
84 changes: 84 additions & 0 deletions internal/runnerv2service/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package runnerv2service

import (
"bytes"
"io"
"sync"
"sync/atomic"

"github.com/pkg/errors"
)

const (
// msgBufferSize limits the size of data chunks
// sent by the handler to clients. It's smaller
// intentionally as typically the messages are
// small.
// In the future, it might be worth to implement
// variable-sized buffers.
msgBufferSize = 16 * 1024 * 1024 // 16 MiB
)

// buffer is a thread-safe buffer that returns EOF
// only when it's closed.
type buffer struct {
mu *sync.Mutex
// +checklocks:mu
b *bytes.Buffer
closed *atomic.Bool
close chan struct{}
more chan struct{}
}

var _ io.WriteCloser = (*buffer)(nil)

func newBuffer(size int) *buffer {
return &buffer{
mu: &sync.Mutex{},
b: bytes.NewBuffer(make([]byte, 0, size)),
closed: &atomic.Bool{},
close: make(chan struct{}),
more: make(chan struct{}),
}
}

func (b *buffer) Write(p []byte) (int, error) {
if b.closed.Load() {
return 0, errors.New("closed")
}

b.mu.Lock()
n, err := b.b.Write(p)
b.mu.Unlock()

select {
case b.more <- struct{}{}:
default:
}

return n, err
}

func (b *buffer) Close() error {
if b.closed.CompareAndSwap(false, true) {
close(b.close)
}
return nil
}

func (b *buffer) Read(p []byte) (int, error) {
b.mu.Lock()
n, err := b.b.Read(p)
b.mu.Unlock()

if err != nil && errors.Is(err, io.EOF) && !b.closed.Load() {
select {
case <-b.more:
case <-b.close:
return n, io.EOF
}
return n, nil
}

return n, err
}
30 changes: 30 additions & 0 deletions internal/runnerv2service/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package runnerv2service

import (
"github.com/stateful/runme/v3/internal/session"
runnerv2 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v2"
"github.com/stateful/runme/v3/pkg/project"
)

func convertSessionToProtoSession(sess *session.Session) *runnerv2.Session {
return &runnerv2.Session{
Id: sess.ID,
Env: sess.GetAllEnv(),
// Metadata: sess.Metadata,
}
}

// TODO(adamb): this function should not return nil project and nil error at the same time.
func convertProtoProjectToProject(runnerProj *runnerv2.Project) (*project.Project, error) {
if runnerProj == nil {
return nil, nil
}

opts := project.DefaultProjectOptions[:]

if runnerProj.EnvLoadOrder != nil {
opts = append(opts, project.WithEnvFilesReadOrder(runnerProj.EnvLoadOrder))
}

return project.NewDirProject(runnerProj.Root, opts...)
}
Loading

0 comments on commit 150494d

Please sign in to comment.