Skip to content

feat: implement file watcher for serving functions #3710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
56 changes: 48 additions & 8 deletions internal/functions/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"strconv"
"strings"

"github.com/docker/cli/cli/compose/loader"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/go-connections/nat"
"github.com/go-errors/errors"
Expand Down Expand Up @@ -46,6 +48,7 @@ func (mode InspectMode) toFlag() string {
type RuntimeOption struct {
InspectMode *InspectMode
InspectMain bool
fileWatcher *debounceFileWatcher
}

func (i *RuntimeOption) toArgs() []string {
Expand All @@ -68,6 +71,36 @@ const (
var mainFuncEmbed string

func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error {
watcher, err := NewDebounceFileWatcher()
if err != nil {
return err
}
go watcher.Start()
defer watcher.Close()
// TODO: refactor this to edge runtime service
runtimeOption.fileWatcher = watcher
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
return err
}
streamer := NewLogStreamer(ctx)
go streamer.Start(utils.EdgeRuntimeId)
defer streamer.Close()
for {
select {
case <-ctx.Done():
fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir))
return ctx.Err()
case <-watcher.RestartCh:
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
return err
}
case err := <-streamer.ErrCh:
return err
}
}
}

func restartEdgeRuntime(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error {
// 1. Sanity checks.
if err := flags.LoadConfig(fsys); err != nil {
return err
Expand All @@ -84,14 +117,7 @@ func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPa
dbUrl := fmt.Sprintf("postgresql://postgres:postgres@%s:5432/postgres", utils.DbAliases[0])
// 3. Serve and log to console
fmt.Fprintln(os.Stderr, "Setting up Edge Functions runtime...")
if err := ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys); err != nil {
return err
}
if err := utils.DockerStreamLogs(ctx, utils.EdgeRuntimeId, os.Stdout, os.Stderr); err != nil {
return err
}
fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir))
return nil
return ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys)
}

func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, dbUrl string, runtimeOption RuntimeOption, fsys afero.Fs) error {
Expand Down Expand Up @@ -131,6 +157,19 @@ func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool,
if err != nil {
return err
}
if watcher := runtimeOption.fileWatcher; watcher != nil {
var watchPaths []string
for _, b := range binds {
if spec, err := loader.ParseVolume(b); err != nil {
return errors.Errorf("failed to parse docker volume: %w", err)
} else if spec.Type == string(mount.TypeBind) {
watchPaths = append(watchPaths, spec.Source)
}
}
if err := watcher.SetWatchPaths(watchPaths, fsys); err != nil {
return err
}
}
env = append(env, "SUPABASE_INTERNAL_FUNCTIONS_CONFIG="+functionsConfigString)
// 3. Parse entrypoint script
cmd := append([]string{
Expand Down Expand Up @@ -215,6 +254,7 @@ func populatePerFunctionConfigs(cwd, importMapPath string, noVerifyJWT *bool, fs
for slug, fc := range functionsConfig {
if !fc.Enabled {
fmt.Fprintln(os.Stderr, "Skipped serving Function:", slug)
delete(functionsConfig, slug)
continue
}
modules, err := deploy.GetBindMounts(cwd, utils.FunctionsDir, "", fc.Entrypoint, fc.ImportMap, fsys)
Expand Down
79 changes: 71 additions & 8 deletions internal/functions/serve/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package serve

import (
"context"
"embed"
"net/http"
"os"
"path/filepath"
"strings"
"testing"

"github.com/docker/docker/api/types/container"
Expand All @@ -17,13 +18,20 @@ import (
"github.com/supabase/cli/pkg/cast"
)

var (
//go:embed testdata/config.toml
testConfig []byte
//go:embed testdata/*
testdata embed.FS
)

func TestServeCommand(t *testing.T) {
t.Run("serves all functions", func(t *testing.T) {
// Setup in-memory fs
fsys := afero.NewMemMapFs()
require.NoError(t, utils.InitConfig(utils.InitParams{ProjectId: "test"}, fsys))
require.NoError(t, afero.WriteFile(fsys, utils.ConfigPath, testConfig, 0644))
require.NoError(t, afero.WriteFile(fsys, utils.FallbackEnvFilePath, []byte{}, 0644))
require.NoError(t, afero.WriteFile(fsys, utils.FallbackImportMapPath, []byte{}, 0644))
require.NoError(t, afero.WriteFile(fsys, utils.FallbackImportMapPath, []byte("{}"), 0644))
// Setup mock docker
require.NoError(t, apitest.MockDocker(utils.Docker))
defer gock.OffAll()
Expand All @@ -36,11 +44,11 @@ func TestServeCommand(t *testing.T) {
Delete("/v" + utils.Docker.ClientVersion() + "/containers/" + containerId).
Reply(http.StatusOK)
apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.EdgeRuntime.Image), containerId)
require.NoError(t, apitest.MockDockerLogs(utils.Docker, containerId, "success"))
// Run test
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerId, 1, strings.NewReader("failed")))
// Run test with timeout context
err := Run(context.Background(), "", nil, "", RuntimeOption{}, fsys)
// Check error
assert.NoError(t, err)
assert.ErrorContains(t, err, "error running container: exit 1")
assert.Empty(t, apitest.ListUnmatchedRequests())
})

Expand Down Expand Up @@ -88,7 +96,6 @@ func TestServeCommand(t *testing.T) {
})

t.Run("throws error on missing import map", func(t *testing.T) {
utils.CurrentDirAbs = "/"
// Setup in-memory fs
fsys := afero.NewMemMapFs()
require.NoError(t, utils.InitConfig(utils.InitParams{ProjectId: "test"}, fsys))
Expand All @@ -105,6 +112,62 @@ func TestServeCommand(t *testing.T) {
// Run test
err := Run(context.Background(), ".env", cast.Ptr(true), "import_map.json", RuntimeOption{}, fsys)
// Check error
assert.ErrorIs(t, err, os.ErrNotExist)
assert.ErrorContains(t, err, "failed to resolve relative path:")
})
}

func TestServeFunctions(t *testing.T) {
require.NoError(t, utils.Config.Load("testdata/config.toml", testdata))
utils.UpdateDockerIds()

t.Run("runs inspect mode", func(t *testing.T) {
// Setup in-memory fs
fsys := afero.FromIOFS{FS: testdata}
// Setup mock docker
require.NoError(t, apitest.MockDocker(utils.Docker))
defer gock.OffAll()
apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.EdgeRuntime.Image), utils.EdgeRuntimeId)
// Run test
err := ServeFunctions(context.Background(), "", nil, "", "", RuntimeOption{
InspectMode: cast.Ptr(InspectModeRun),
InspectMain: true,
}, fsys)
// Check error
assert.NoError(t, err)
assert.Empty(t, apitest.ListUnmatchedRequests())
})

t.Run("parses env file", func(t *testing.T) {
envPath := "/project/.env"
// Setup in-memory fs
fsys := afero.NewMemMapFs()
require.NoError(t, utils.WriteFile(envPath, []byte(`
DATABASE_URL=postgresql://localhost:5432/test
API_KEY=secret123
DEBUG=true
`), fsys))
// Run test
env, err := parseEnvFile(envPath, fsys)
// Check error
assert.NoError(t, err)
assert.ElementsMatch(t, []string{
"DATABASE_URL=postgresql://localhost:5432/test",
"API_KEY=secret123",
"DEBUG=true",
}, env)
})

t.Run("parses function config", func(t *testing.T) {
// Setup in-memory fs
fsys := afero.FromIOFS{FS: testdata}
// Run test
binds, configString, err := populatePerFunctionConfigs("/", "", nil, fsys)
// Check error
assert.NoError(t, err)
assert.ElementsMatch(t, []string{
"supabase_edge_runtime_test:/root/.cache/deno:rw",
"/supabase/functions/:/supabase/functions/:ro",
}, binds)
assert.Equal(t, `{"hello":{"verifyJWT":true,"entrypointPath":"testdata/functions/hello/index.ts","staticFiles":["testdata/image.png"]}}`, configString)
})
}
47 changes: 47 additions & 0 deletions internal/functions/serve/streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package serve

import (
"context"
"os"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/containerd/errdefs"
"github.com/docker/docker/api/types/container"
"github.com/go-errors/errors"
"github.com/supabase/cli/internal/utils"
)

type logStreamer struct {
ctx context.Context
Close context.CancelFunc
ErrCh chan error
}

func NewLogStreamer(ctx context.Context) logStreamer {
cancelCtx, cancel := context.WithCancel(ctx)
return logStreamer{
ctx: cancelCtx,
Close: cancel,
ErrCh: make(chan error, 1),
}
}

// Used by unit tests
var retryInterval = time.Millisecond * 400

func (s *logStreamer) Start(containerID string) {
// Retry indefinitely until stream is closed
policy := backoff.WithContext(backoff.NewConstantBackOff(retryInterval), s.ctx)
fetch := func() error {
if err := utils.DockerStreamLogs(s.ctx, containerID, os.Stdout, os.Stderr, func(lo *container.LogsOptions) {
lo.Timestamps = true
}); errdefs.IsNotFound(err) || errdefs.IsConflict(err) || errors.Is(err, utils.ErrContainerKilled) {
return err
} else if err != nil {
return &backoff.PermanentError{Err: err}
}
return errors.Errorf("container exited gracefully: %s", containerID)
}
s.ErrCh <- backoff.Retry(fetch, policy)
}
91 changes: 91 additions & 0 deletions internal/functions/serve/streamer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package serve

import (
"context"
"net/http"
"strings"
"testing"
"time"

"github.com/h2non/gock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/supabase/cli/internal/testing/apitest"
"github.com/supabase/cli/internal/utils"
)

func TestLogStreamer(t *testing.T) {
containerID := "test-container"
retryInterval = 0

t.Run("streams logs from container", func(t *testing.T) {
// Setup mock docker
require.NoError(t, apitest.MockDocker(utils.Docker))
defer gock.OffAll()
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader("")))
// Run test
streamer := NewLogStreamer(context.Background())
streamer.Start(containerID)
// Check error
select {
case err := <-streamer.ErrCh:
assert.ErrorContains(t, err, "error running container: exit 1")
case <-time.After(2 * time.Second):
assert.Fail(t, "missing error signal from closing")
}
assert.Empty(t, apitest.ListUnmatchedRequests())
})

t.Run("retries on container exit", func(t *testing.T) {
// Setup mock docker
require.NoError(t, apitest.MockDocker(utils.Docker))
defer gock.OffAll()
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 0, strings.NewReader("")))
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 137, strings.NewReader("")))
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader("")))
// Run test
streamer := NewLogStreamer(context.Background())
streamer.Start(containerID)
// Check error
select {
case err := <-streamer.ErrCh:
assert.ErrorContains(t, err, "error running container: exit 1")
case <-time.After(2 * time.Second):
assert.Fail(t, "missing error signal from closing")
}
assert.Empty(t, apitest.ListUnmatchedRequests())
})

t.Run("retries on missing container", func(t *testing.T) {
// Setup mock docker
require.NoError(t, apitest.MockDocker(utils.Docker))
defer gock.OffAll()
gock.New(utils.Docker.DaemonHost()).
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs").
Reply(http.StatusNotFound).
BodyString("No such container")
gock.New(utils.Docker.DaemonHost()).
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs").
Reply(http.StatusConflict).
BodyString("can not get logs from container which is dead or marked for removal")
gock.New(utils.Docker.DaemonHost()).
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs").
Reply(http.StatusOK)
gock.New(utils.Docker.DaemonHost()).
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/json").
Reply(http.StatusNotFound).
BodyString("No such object")
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader("")))
// Run test
streamer := NewLogStreamer(context.Background())
streamer.Start(containerID)
// Check error
select {
case err := <-streamer.ErrCh:
assert.ErrorContains(t, err, "error running container: exit 1")
case <-time.After(2 * time.Second):
assert.Fail(t, "missing error signal from closing")
}
assert.Empty(t, apitest.ListUnmatchedRequests())
})
}
Loading