Skip to content

Commit da4b080

Browse files
committed
chore: add streamer unit tests
1 parent 45bce86 commit da4b080

File tree

4 files changed

+55
-31
lines changed

4 files changed

+55
-31
lines changed

internal/functions/serve/streamer.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,54 @@ package serve
33
import (
44
"context"
55
"os"
6-
"strings"
6+
"time"
77

8+
"github.com/cenkalti/backoff/v4"
89
"github.com/containerd/errdefs"
910
"github.com/docker/docker/api/types/container"
11+
"github.com/go-errors/errors"
1012
"github.com/supabase/cli/internal/utils"
1113
)
1214

1315
type logStreamer struct {
14-
ctx context.Context
15-
cancel context.CancelFunc
16-
ErrCh chan error
16+
ctx context.Context
17+
clock backoff.Clock
18+
Close context.CancelFunc
19+
ErrCh chan error
1720
}
1821

19-
func NewLogStreamer(ctx context.Context) logStreamer {
20-
cancelCtx, cancel := context.WithCancel(ctx)
21-
return logStreamer{
22-
ctx: cancelCtx,
23-
cancel: cancel,
24-
ErrCh: make(chan error, 1),
22+
func NewLogStreamer(ctx context.Context, opts ...func(*logStreamer)) logStreamer {
23+
s := logStreamer{
24+
clock: backoff.SystemClock,
25+
ErrCh: make(chan error, 1),
2526
}
27+
s.ctx, s.Close = context.WithCancel(ctx)
28+
for _, apply := range opts {
29+
apply(&s)
30+
}
31+
return s
2632
}
2733

34+
const (
35+
initialInterval = time.Millisecond * 50
36+
maxElapsedTime = time.Second * 20
37+
)
38+
2839
func (s *logStreamer) Start(containerID string) {
29-
for {
40+
policy := backoff.WithContext(backoff.NewExponentialBackOff(
41+
backoff.WithInitialInterval(initialInterval),
42+
backoff.WithMaxElapsedTime(maxElapsedTime),
43+
backoff.WithClockProvider(s.clock),
44+
), s.ctx)
45+
fetch := func() error {
3046
if err := utils.DockerStreamLogs(s.ctx, containerID, os.Stdout, os.Stderr, func(lo *container.LogsOptions) {
3147
lo.Timestamps = true
32-
}); err != nil &&
33-
!errdefs.IsNotFound(err) &&
34-
!strings.HasSuffix(err.Error(), "exit 137") &&
35-
!strings.HasSuffix(err.Error(), "can not get logs from container which is dead or marked for removal") {
36-
s.ErrCh <- err
37-
break
48+
}); errdefs.IsNotFound(err) || errdefs.IsConflict(err) || errors.Is(err, utils.ErrContainerKilled) {
49+
return err
50+
} else if err != nil {
51+
return &backoff.PermanentError{Err: err}
3852
}
53+
return errors.Errorf("container exited gracefully: %s", containerID)
3954
}
40-
}
41-
42-
func (s *logStreamer) Close() {
43-
if s.cancel != nil {
44-
s.cancel()
45-
}
55+
s.ErrCh <- backoff.Retry(fetch, policy)
4656
}

internal/functions/serve/watcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (s *WatcherIntegrationSetup) CreateFileWatcher() (*debounceFileWatcher, err
8484
return watcher, nil
8585
}
8686

87-
func TestFileWatcherIntegration(t *testing.T) {
87+
func TestFileWatcher(t *testing.T) {
8888
t.Run("detects TypeScript function changes and triggers restart", func(t *testing.T) {
8989
setup := NewWatcherIntegrationSetup(t)
9090
defer setup.Cleanup()

internal/testing/apitest/docker.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package apitest
33
import (
44
"bytes"
55
"fmt"
6+
"io"
67
"net/http"
8+
"strings"
79

810
"github.com/docker/docker/api"
911
"github.com/docker/docker/api/types/container"
@@ -84,9 +86,17 @@ func MockDockerStop(docker *client.Client) {
8486

8587
// Ref: internal/utils/docker.go::DockerRunOnce
8688
func setupDockerLogs(docker *client.Client, containerID, stdout string, exitCode int) error {
89+
err := MockDockerLogsStream(docker, containerID, exitCode, strings.NewReader(stdout))
90+
gock.New(docker.DaemonHost()).
91+
Delete("/v" + docker.ClientVersion() + "/containers/" + containerID).
92+
Reply(http.StatusOK)
93+
return err
94+
}
95+
96+
func MockDockerLogsStream(docker *client.Client, containerID string, exitCode int, r io.Reader) error {
8797
var body bytes.Buffer
8898
writer := stdcopy.NewStdWriter(&body, stdcopy.Stdout)
89-
_, err := writer.Write([]byte(stdout))
99+
_, err := io.Copy(writer, r)
90100
gock.New(docker.DaemonHost()).
91101
Get("/v"+docker.ClientVersion()+"/containers/"+containerID+"/logs").
92102
Reply(http.StatusOK).
@@ -99,9 +109,6 @@ func setupDockerLogs(docker *client.Client, containerID, stdout string, exitCode
99109
State: &container.State{
100110
ExitCode: exitCode,
101111
}}})
102-
gock.New(docker.DaemonHost()).
103-
Delete("/v" + docker.ClientVersion() + "/containers/" + containerID).
104-
Reply(http.StatusOK)
105112
return err
106113
}
107114

internal/utils/docker.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,8 @@ func DockerRunOnceWithConfig(ctx context.Context, config container.Config, hostC
376376
return DockerStreamLogs(ctx, container, stdout, stderr)
377377
}
378378

379+
var ErrContainerKilled = errors.New("exit 137")
380+
379381
func DockerStreamLogs(ctx context.Context, containerId string, stdout, stderr io.Writer, opts ...func(*container.LogsOptions)) error {
380382
logsOptions := container.LogsOptions{
381383
ShowStdout: true,
@@ -399,10 +401,15 @@ func DockerStreamLogs(ctx context.Context, containerId string, stdout, stderr io
399401
if err != nil {
400402
return errors.Errorf("failed to inspect docker container: %w", err)
401403
}
402-
if resp.State.ExitCode > 0 {
403-
return errors.Errorf("error running container: exit %d", resp.State.ExitCode)
404+
switch resp.State.ExitCode {
405+
case 0:
406+
return nil
407+
case 137:
408+
err = ErrContainerKilled
409+
default:
410+
err = errors.Errorf("exit %d", resp.State.ExitCode)
404411
}
405-
return nil
412+
return errors.Errorf("error running container: %w", err)
406413
}
407414

408415
func DockerStreamLogsOnce(ctx context.Context, containerId string, stdout, stderr io.Writer) error {

0 commit comments

Comments
 (0)