diff --git a/internal/functions/serve/serve.go b/internal/functions/serve/serve.go index 9fba48a76..9819a181d 100644 --- a/internal/functions/serve/serve.go +++ b/internal/functions/serve/serve.go @@ -10,7 +10,9 @@ import ( "strconv" "strings" + "github.com/docker/cli/cli/compose/loader" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/network" "github.com/docker/go-connections/nat" "github.com/go-errors/errors" @@ -46,6 +48,7 @@ func (mode InspectMode) toFlag() string { type RuntimeOption struct { InspectMode *InspectMode InspectMain bool + fileWatcher *debounceFileWatcher } func (i *RuntimeOption) toArgs() []string { @@ -68,6 +71,36 @@ const ( var mainFuncEmbed string func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error { + watcher, err := NewDebounceFileWatcher() + if err != nil { + return err + } + go watcher.Start() + defer watcher.Close() + // TODO: refactor this to edge runtime service + runtimeOption.fileWatcher = watcher + if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil { + return err + } + streamer := NewLogStreamer(ctx) + go streamer.Start(utils.EdgeRuntimeId) + defer streamer.Close() + for { + select { + case <-ctx.Done(): + fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir)) + return ctx.Err() + case <-watcher.RestartCh: + if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil { + return err + } + case err := <-streamer.ErrCh: + return err + } + } +} + +func restartEdgeRuntime(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error { // 1. Sanity checks. if err := flags.LoadConfig(fsys); err != nil { return err @@ -84,14 +117,7 @@ func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPa dbUrl := fmt.Sprintf("postgresql://postgres:postgres@%s:5432/postgres", utils.DbAliases[0]) // 3. Serve and log to console fmt.Fprintln(os.Stderr, "Setting up Edge Functions runtime...") - if err := ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys); err != nil { - return err - } - if err := utils.DockerStreamLogs(ctx, utils.EdgeRuntimeId, os.Stdout, os.Stderr); err != nil { - return err - } - fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir)) - return nil + return ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys) } func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, dbUrl string, runtimeOption RuntimeOption, fsys afero.Fs) error { @@ -131,6 +157,19 @@ func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool, if err != nil { return err } + if watcher := runtimeOption.fileWatcher; watcher != nil { + var watchPaths []string + for _, b := range binds { + if spec, err := loader.ParseVolume(b); err != nil { + return errors.Errorf("failed to parse docker volume: %w", err) + } else if spec.Type == string(mount.TypeBind) { + watchPaths = append(watchPaths, spec.Source) + } + } + if err := watcher.SetWatchPaths(watchPaths, fsys); err != nil { + return err + } + } env = append(env, "SUPABASE_INTERNAL_FUNCTIONS_CONFIG="+functionsConfigString) // 3. Parse entrypoint script cmd := append([]string{ @@ -215,6 +254,7 @@ func populatePerFunctionConfigs(cwd, importMapPath string, noVerifyJWT *bool, fs for slug, fc := range functionsConfig { if !fc.Enabled { fmt.Fprintln(os.Stderr, "Skipped serving Function:", slug) + delete(functionsConfig, slug) continue } modules, err := deploy.GetBindMounts(cwd, utils.FunctionsDir, "", fc.Entrypoint, fc.ImportMap, fsys) diff --git a/internal/functions/serve/serve_test.go b/internal/functions/serve/serve_test.go index 7dbd75377..7b0bb17fa 100644 --- a/internal/functions/serve/serve_test.go +++ b/internal/functions/serve/serve_test.go @@ -2,9 +2,10 @@ package serve import ( "context" + "embed" "net/http" - "os" "path/filepath" + "strings" "testing" "github.com/docker/docker/api/types/container" @@ -17,13 +18,20 @@ import ( "github.com/supabase/cli/pkg/cast" ) +var ( + //go:embed testdata/config.toml + testConfig []byte + //go:embed testdata/* + testdata embed.FS +) + func TestServeCommand(t *testing.T) { t.Run("serves all functions", func(t *testing.T) { // Setup in-memory fs fsys := afero.NewMemMapFs() - require.NoError(t, utils.InitConfig(utils.InitParams{ProjectId: "test"}, fsys)) + require.NoError(t, afero.WriteFile(fsys, utils.ConfigPath, testConfig, 0644)) require.NoError(t, afero.WriteFile(fsys, utils.FallbackEnvFilePath, []byte{}, 0644)) - require.NoError(t, afero.WriteFile(fsys, utils.FallbackImportMapPath, []byte{}, 0644)) + require.NoError(t, afero.WriteFile(fsys, utils.FallbackImportMapPath, []byte("{}"), 0644)) // Setup mock docker require.NoError(t, apitest.MockDocker(utils.Docker)) defer gock.OffAll() @@ -36,11 +44,11 @@ func TestServeCommand(t *testing.T) { Delete("/v" + utils.Docker.ClientVersion() + "/containers/" + containerId). Reply(http.StatusOK) apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.EdgeRuntime.Image), containerId) - require.NoError(t, apitest.MockDockerLogs(utils.Docker, containerId, "success")) - // Run test + require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerId, 1, strings.NewReader("failed"))) + // Run test with timeout context err := Run(context.Background(), "", nil, "", RuntimeOption{}, fsys) // Check error - assert.NoError(t, err) + assert.ErrorContains(t, err, "error running container: exit 1") assert.Empty(t, apitest.ListUnmatchedRequests()) }) @@ -88,7 +96,6 @@ func TestServeCommand(t *testing.T) { }) t.Run("throws error on missing import map", func(t *testing.T) { - utils.CurrentDirAbs = "/" // Setup in-memory fs fsys := afero.NewMemMapFs() require.NoError(t, utils.InitConfig(utils.InitParams{ProjectId: "test"}, fsys)) @@ -105,6 +112,62 @@ func TestServeCommand(t *testing.T) { // Run test err := Run(context.Background(), ".env", cast.Ptr(true), "import_map.json", RuntimeOption{}, fsys) // Check error - assert.ErrorIs(t, err, os.ErrNotExist) + assert.ErrorContains(t, err, "failed to resolve relative path:") + }) +} + +func TestServeFunctions(t *testing.T) { + require.NoError(t, utils.Config.Load("testdata/config.toml", testdata)) + utils.UpdateDockerIds() + + t.Run("runs inspect mode", func(t *testing.T) { + // Setup in-memory fs + fsys := afero.FromIOFS{FS: testdata} + // Setup mock docker + require.NoError(t, apitest.MockDocker(utils.Docker)) + defer gock.OffAll() + apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.EdgeRuntime.Image), utils.EdgeRuntimeId) + // Run test + err := ServeFunctions(context.Background(), "", nil, "", "", RuntimeOption{ + InspectMode: cast.Ptr(InspectModeRun), + InspectMain: true, + }, fsys) + // Check error + assert.NoError(t, err) + assert.Empty(t, apitest.ListUnmatchedRequests()) + }) + + t.Run("parses env file", func(t *testing.T) { + envPath := "/project/.env" + // Setup in-memory fs + fsys := afero.NewMemMapFs() + require.NoError(t, utils.WriteFile(envPath, []byte(` + DATABASE_URL=postgresql://localhost:5432/test + API_KEY=secret123 + DEBUG=true + `), fsys)) + // Run test + env, err := parseEnvFile(envPath, fsys) + // Check error + assert.NoError(t, err) + assert.ElementsMatch(t, []string{ + "DATABASE_URL=postgresql://localhost:5432/test", + "API_KEY=secret123", + "DEBUG=true", + }, env) + }) + + t.Run("parses function config", func(t *testing.T) { + // Setup in-memory fs + fsys := afero.FromIOFS{FS: testdata} + // Run test + binds, configString, err := populatePerFunctionConfigs("/", "", nil, fsys) + // Check error + assert.NoError(t, err) + assert.ElementsMatch(t, []string{ + "supabase_edge_runtime_test:/root/.cache/deno:rw", + "/supabase/functions/:/supabase/functions/:ro", + }, binds) + assert.Equal(t, `{"hello":{"verifyJWT":true,"entrypointPath":"testdata/functions/hello/index.ts","staticFiles":["testdata/image.png"]}}`, configString) }) } diff --git a/internal/functions/serve/streamer.go b/internal/functions/serve/streamer.go new file mode 100644 index 000000000..09d4171d5 --- /dev/null +++ b/internal/functions/serve/streamer.go @@ -0,0 +1,47 @@ +package serve + +import ( + "context" + "os" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/containerd/errdefs" + "github.com/docker/docker/api/types/container" + "github.com/go-errors/errors" + "github.com/supabase/cli/internal/utils" +) + +type logStreamer struct { + ctx context.Context + Close context.CancelFunc + ErrCh chan error +} + +func NewLogStreamer(ctx context.Context) logStreamer { + cancelCtx, cancel := context.WithCancel(ctx) + return logStreamer{ + ctx: cancelCtx, + Close: cancel, + ErrCh: make(chan error, 1), + } +} + +// Used by unit tests +var retryInterval = time.Millisecond * 400 + +func (s *logStreamer) Start(containerID string) { + // Retry indefinitely until stream is closed + policy := backoff.WithContext(backoff.NewConstantBackOff(retryInterval), s.ctx) + fetch := func() error { + if err := utils.DockerStreamLogs(s.ctx, containerID, os.Stdout, os.Stderr, func(lo *container.LogsOptions) { + lo.Timestamps = true + }); errdefs.IsNotFound(err) || errdefs.IsConflict(err) || errors.Is(err, utils.ErrContainerKilled) { + return err + } else if err != nil { + return &backoff.PermanentError{Err: err} + } + return errors.Errorf("container exited gracefully: %s", containerID) + } + s.ErrCh <- backoff.Retry(fetch, policy) +} diff --git a/internal/functions/serve/streamer_test.go b/internal/functions/serve/streamer_test.go new file mode 100644 index 000000000..23116860e --- /dev/null +++ b/internal/functions/serve/streamer_test.go @@ -0,0 +1,91 @@ +package serve + +import ( + "context" + "net/http" + "strings" + "testing" + "time" + + "github.com/h2non/gock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/supabase/cli/internal/testing/apitest" + "github.com/supabase/cli/internal/utils" +) + +func TestLogStreamer(t *testing.T) { + containerID := "test-container" + retryInterval = 0 + + t.Run("streams logs from container", func(t *testing.T) { + // Setup mock docker + require.NoError(t, apitest.MockDocker(utils.Docker)) + defer gock.OffAll() + require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader(""))) + // Run test + streamer := NewLogStreamer(context.Background()) + streamer.Start(containerID) + // Check error + select { + case err := <-streamer.ErrCh: + assert.ErrorContains(t, err, "error running container: exit 1") + case <-time.After(2 * time.Second): + assert.Fail(t, "missing error signal from closing") + } + assert.Empty(t, apitest.ListUnmatchedRequests()) + }) + + t.Run("retries on container exit", func(t *testing.T) { + // Setup mock docker + require.NoError(t, apitest.MockDocker(utils.Docker)) + defer gock.OffAll() + require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 0, strings.NewReader(""))) + require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 137, strings.NewReader(""))) + require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader(""))) + // Run test + streamer := NewLogStreamer(context.Background()) + streamer.Start(containerID) + // Check error + select { + case err := <-streamer.ErrCh: + assert.ErrorContains(t, err, "error running container: exit 1") + case <-time.After(2 * time.Second): + assert.Fail(t, "missing error signal from closing") + } + assert.Empty(t, apitest.ListUnmatchedRequests()) + }) + + t.Run("retries on missing container", func(t *testing.T) { + // Setup mock docker + require.NoError(t, apitest.MockDocker(utils.Docker)) + defer gock.OffAll() + gock.New(utils.Docker.DaemonHost()). + Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs"). + Reply(http.StatusNotFound). + BodyString("No such container") + gock.New(utils.Docker.DaemonHost()). + Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs"). + Reply(http.StatusConflict). + BodyString("can not get logs from container which is dead or marked for removal") + gock.New(utils.Docker.DaemonHost()). + Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs"). + Reply(http.StatusOK) + gock.New(utils.Docker.DaemonHost()). + Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/json"). + Reply(http.StatusNotFound). + BodyString("No such object") + require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader(""))) + // Run test + streamer := NewLogStreamer(context.Background()) + streamer.Start(containerID) + // Check error + select { + case err := <-streamer.ErrCh: + assert.ErrorContains(t, err, "error running container: exit 1") + case <-time.After(2 * time.Second): + assert.Fail(t, "missing error signal from closing") + } + assert.Empty(t, apitest.ListUnmatchedRequests()) + }) +} diff --git a/internal/functions/serve/templates/main.ts b/internal/functions/serve/templates/main.ts index 534409a98..568c547f8 100644 --- a/internal/functions/serve/templates/main.ts +++ b/internal/functions/serve/templates/main.ts @@ -48,6 +48,7 @@ const DENO_SB_ERROR_MAP = new Map([ SB_SPECIFIC_ERROR_CODE.WorkerLimit, ], ]); +const GENERIC_FUNCTION_SERVE_MESSAGE = `Serving functions on http://127.0.0.1:${HOST_PORT}/functions/v1/` interface FunctionConfig { entrypointPath: string; @@ -228,9 +229,36 @@ Deno.serve({ }, onListen: () => { - console.log( - `Serving functions on http://127.0.0.1:${HOST_PORT}/functions/v1/\nUsing ${Deno.version.deno}`, - ); + try { + const functionsConfigString = Deno.env.get( + "SUPABASE_INTERNAL_FUNCTIONS_CONFIG" + ); + if (functionsConfigString) { + const MAX_FUNCTIONS_URL_EXAMPLES = 5 + const functionsConfig = JSON.parse(functionsConfigString) as Record< + string, + unknown + >; + const functionNames = Object.keys(functionsConfig); + const exampleFunctions = functionNames.slice(0, MAX_FUNCTIONS_URL_EXAMPLES); + const functionsUrls = exampleFunctions.map( + (fname) => ` - http://127.0.0.1:${HOST_PORT}/functions/v1/${fname}` + ); + const functionsExamplesMessages = functionNames.length > 0 + // Show some functions urls examples + ? `\n${functionsUrls.join(`\n`)}${functionNames.length > MAX_FUNCTIONS_URL_EXAMPLES + // If we have more than 10 functions to serve, then show examples for first 10 + // and a count for the remaining ones + ? `\n... and ${functionNames.length - MAX_FUNCTIONS_URL_EXAMPLES} more functions` + : ''}` + : '' + console.log(`${GENERIC_FUNCTION_SERVE_MESSAGE}${functionsExamplesMessages}\nUsing ${Deno.version.deno}`); + } + } catch (e) { + console.log( + `${GENERIC_FUNCTION_SERVE_MESSAGE}\nUsing ${Deno.version.deno}` + ); + } }, onError: e => { diff --git a/internal/functions/serve/testdata/config.toml b/internal/functions/serve/testdata/config.toml new file mode 100644 index 000000000..28ea9a510 --- /dev/null +++ b/internal/functions/serve/testdata/config.toml @@ -0,0 +1,8 @@ +project_id = "test" + +[functions.hello] +static_files = ["image.png"] + +[functions.world] +enabled = false +verify_jwt = false diff --git a/internal/functions/serve/watcher.go b/internal/functions/serve/watcher.go new file mode 100644 index 000000000..6e75a104b --- /dev/null +++ b/internal/functions/serve/watcher.go @@ -0,0 +1,165 @@ +package serve + +import ( + "fmt" + "io/fs" + "os" + "path/filepath" + "slices" + "strings" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/go-errors/errors" + "github.com/spf13/afero" + "github.com/spf13/viper" + "github.com/supabase/cli/internal/utils" +) + +const ( + // Debounce duration for file changes + debounceDuration = 500 * time.Millisecond + restartEvents = fsnotify.Write | fsnotify.Create | fsnotify.Remove | fsnotify.Rename + maxFileLimit = 1000 +) + +var ( + errTooManyFiles = errors.New("too many files") + + // Directories to ignore. + ignoredDirNames = []string{ + ".git", + "node_modules", + ".vscode", + ".idea", + ".DS_Store", + "vendor", + } + + // Patterns for ignoring file events. + ignoredFilePatterns = []struct { + Prefix string // File basename prefix + Suffix string // File basename suffix + ExactMatch bool // File basename exact match + }{ + {Suffix: "~"}, // Common backup files (e.g., emacs, gedit) + {Prefix: ".", Suffix: ".swp"}, // Vim swap files + {Prefix: ".", Suffix: ".swx"}, // Vim swap files (extended) + {Prefix: "___"}, // Deno temp files often start with this + {Suffix: ".tmp"}, // Generic temp files + {Prefix: ".#"}, // Emacs lock files + } +) + +// isIgnoredFileEvent checks if a file event should be ignored based on predefined patterns. +func isIgnoredFileEvent(event fsnotify.Event) bool { + if !event.Has(restartEvents) { + return true + } + baseName := filepath.Base(event.Name) + for _, p := range ignoredFilePatterns { + if strings.HasPrefix(baseName, p.Prefix) && strings.HasSuffix(baseName, p.Suffix) { + // An exact match means all characters match both prefix and suffix + if p.ExactMatch && len(baseName) > len(p.Prefix)+len(p.Suffix) { + continue + } + return true + } + } + return false +} + +type debounceFileWatcher struct { + watcher *fsnotify.Watcher + restartTimer *time.Timer + RestartCh <-chan time.Time + ErrCh <-chan error +} + +func NewDebounceFileWatcher() (*debounceFileWatcher, error) { + restartTimer := time.NewTimer(debounceDuration) + if !restartTimer.Stop() { + return nil, errors.New("failed to initialise timer") + } + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, errors.Errorf("failed to create file watcher: %w", err) + } + return &debounceFileWatcher{ + watcher: watcher, + ErrCh: watcher.Errors, + restartTimer: restartTimer, + RestartCh: restartTimer.C, + }, nil +} + +func (w *debounceFileWatcher) Start() { + for { + event, ok := <-w.watcher.Events + if !isIgnoredFileEvent(event) { + fmt.Fprintf(os.Stderr, "File change detected: %s (%s)\n", event.Name, event.Op.String()) + // Fire immediately when timer is inactive, without blocking this thread + if active := w.restartTimer.Reset(0); active { + w.restartTimer.Reset(debounceDuration) + } + } + // Ensure the last event is fired before channel close + if !ok { + return + } + fmt.Fprintf(utils.GetDebugLogger(), "Ignoring file event: %s (%s)\n", event.Name, event.Op.String()) + } +} + +func (w *debounceFileWatcher) SetWatchPaths(watchPaths []string, fsys afero.Fs) error { + watchLimit := viper.GetUint("FUNCTIONS_WATCH_LIMIT") + if watchLimit == 0 { + watchLimit = maxFileLimit + } + shouldWatchDirs := make(map[string]struct{}) + for _, hostPath := range watchPaths { + // Ignore non-existent paths and symlink directories + if err := afero.Walk(fsys, hostPath, func(path string, info fs.FileInfo, err error) error { + if errors.Is(err, os.ErrNotExist) || slices.Contains(ignoredDirNames, filepath.Base(path)) { + return nil + } else if err != nil { + return errors.Errorf("failed to walk path: %w", err) + } + if info.IsDir() { + shouldWatchDirs[path] = struct{}{} + } else if path == hostPath { + shouldWatchDirs[filepath.Dir(path)] = struct{}{} + } + if uint(len(shouldWatchDirs)) >= watchLimit { + return errors.Errorf("file watcher stopped at %s: %w", path, errTooManyFiles) + } + return nil + }); errors.Is(err, errTooManyFiles) { + fmt.Fprintf(os.Stderr, "%s\nYou can increase this limit by setting SUPABASE_FUNCTIONS_WATCH_LIMIT=%d", err.Error(), watchLimit<<2) + } else if err != nil { + return err + } + } + // Add directories to watch, ignoring duplicates + for hostPath := range shouldWatchDirs { + if err := w.watcher.Add(hostPath); err != nil { + return errors.Errorf("failed to watch directory: %w", err) + } + fmt.Fprintln(utils.GetDebugLogger(), "Added directory from watcher:", hostPath) + } + // Remove directories that are no longer needed + for _, hostPath := range w.watcher.WatchList() { + if _, ok := shouldWatchDirs[hostPath]; !ok { + if err := w.watcher.Remove(hostPath); err != nil { + return errors.Errorf("failed to remove watch directory: %w", err) + } + fmt.Fprintln(utils.GetDebugLogger(), "Removed directory from watcher:", hostPath) + } + } + return nil +} + +func (r *debounceFileWatcher) Close() error { + // Don't stop the timer to allow debounced events to fire + return r.watcher.Close() +} diff --git a/internal/functions/serve/watcher_test.go b/internal/functions/serve/watcher_test.go new file mode 100644 index 000000000..cde1dc078 --- /dev/null +++ b/internal/functions/serve/watcher_test.go @@ -0,0 +1,264 @@ +package serve + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Integration test setup for watcher functionality +type WatcherIntegrationSetup struct { + T *testing.T + Context context.Context + Cancel context.CancelFunc + TempDir string +} + +func NewWatcherIntegrationSetup(t *testing.T) *WatcherIntegrationSetup { + ctx, cancel := context.WithCancel(context.Background()) + tempDir := t.TempDir() + + setup := &WatcherIntegrationSetup{ + T: t, + Context: ctx, + Cancel: cancel, + TempDir: tempDir, + } + + return setup +} + +func (s *WatcherIntegrationSetup) Cleanup() { + s.Cancel() +} + +// SetupFunctionsDirectory creates a functions directory with test functions +func (s *WatcherIntegrationSetup) SetupFunctionsDirectory() string { + functionsDir := filepath.Join(s.TempDir, "supabase", "functions") + require.NoError(s.T, os.MkdirAll(functionsDir, 0755)) + + // Set up test functions + s.createFunction("hello", `export default () => new Response("Hello World")`) + s.createFunction("protected", `export default () => new Response("Protected")`) + + return functionsDir +} + +func (s *WatcherIntegrationSetup) SetupSupabaseDirectory() string { + supabaseDir := filepath.Join(s.TempDir, "supabase") + require.NoError(s.T, os.MkdirAll(supabaseDir, 0755)) + + return supabaseDir +} + +func (s *WatcherIntegrationSetup) createFunction(name, content string) { + funcDir := filepath.Join(s.TempDir, "supabase", "functions", name) + require.NoError(s.T, os.MkdirAll(funcDir, 0755)) + require.NoError(s.T, os.WriteFile(filepath.Join(funcDir, "index.ts"), []byte(content), 0600)) +} + +// CreateFileWatcher creates and configures a debounce file watcher for testing +func (s *WatcherIntegrationSetup) CreateFileWatcher() (*debounceFileWatcher, error) { + watcher, err := NewDebounceFileWatcher() + if err != nil { + return nil, err + } + + // Set up watch paths to include our test directory + fsys := afero.NewOsFs() + watchPaths := []string{s.TempDir} + + if err := watcher.SetWatchPaths(watchPaths, fsys); err != nil { + watcher.Close() + return nil, err + } + + return watcher, nil +} + +func TestFileWatcher(t *testing.T) { + t.Run("detects TypeScript function changes and triggers restart", func(t *testing.T) { + setup := NewWatcherIntegrationSetup(t) + defer setup.Cleanup() + + functionsDir := setup.SetupFunctionsDirectory() + watcher, err := setup.CreateFileWatcher() + require.NoError(t, err) + + // Modify a function file in background + go func() { + defer watcher.Close() + funcFile := filepath.Join(functionsDir, "hello", "index.ts") + newContent := `export default () => new Response("Hello Modified World")` + require.NoError(t, os.WriteFile(funcFile, []byte(newContent), 0600)) + // https://github.com/fsnotify/fsnotify/blob/main/fsnotify_test.go#L181 + time.Sleep(50 * time.Millisecond) + }() + + // Run watcher on main thread to avoid sleeping + watcher.Start() + + // Wait for restart signal + select { + case ts, ok := <-watcher.RestartCh: + assert.NotZero(t, ts, "file change should trigger restart") + assert.True(t, ok, "timer channel should be closed") + case <-time.After(2 * time.Second): + assert.Fail(t, "missing restart signal after modifying TypeScript file") + } + }) + + t.Run("ignores editor temporary files", func(t *testing.T) { + watcher, err := NewDebounceFileWatcher() + require.NoError(t, err) + + // Create various temporary/editor files that should be ignored + go func() { + defer watcher.Close() + tempFiles := []string{ + filepath.Join("/tmp", "test.txt~"), // Backup file + filepath.Join("/tmp", ".test.swp"), // Vim swap + filepath.Join("/tmp", ".#test.ts"), // Emacs lock + filepath.Join("/tmp", "test.tmp"), // Temp file + filepath.Join("/tmp", "___deno_temp___"), // Deno temp + } + for _, tempFile := range tempFiles { + // Fire events directly since we only care about ignore files + watcher.watcher.Events <- fsnotify.Event{ + Name: tempFile, + Op: fsnotify.Create, + } + } + }() + + // Run watcher on main thread to avoid sleeping + watcher.Start() + + // Wait multiple times for out of order events + for range 3 { + select { + case <-watcher.RestartCh: + assert.Fail(t, "should not receive any restart signals from ignored files") + case err := <-watcher.ErrCh: + assert.NoError(t, err) + } + } + }) + + t.Run("detects config file changes and triggers restart", func(t *testing.T) { + setup := NewWatcherIntegrationSetup(t) + defer setup.Cleanup() + + supabaseDir := setup.SetupSupabaseDirectory() + watcher, err := setup.CreateFileWatcher() + require.NoError(t, err) + + // Create and modify a config.toml file + go func() { + defer watcher.Close() + configFile := filepath.Join(supabaseDir, "config.toml") + require.NoError(t, os.WriteFile(configFile, []byte(` + [functions.hello] + enabled = true + verify_jwt = false + `), 0600)) + // https://github.com/fsnotify/fsnotify/blob/main/fsnotify_test.go#L181 + time.Sleep(50 * time.Millisecond) + }() + + // Run watcher on main thread to avoid sleeping + watcher.Start() + + // Wait for restart signal + select { + case ts, ok := <-watcher.RestartCh: + assert.NotZero(t, ts, "config change should trigger restart") + assert.True(t, ok, "timer channel should be closed") + case <-time.After(2 * time.Second): + assert.Fail(t, "missing restart signal after modifying config file") + } + }) + + t.Run("debounces rapid file changes", func(t *testing.T) { + watcher, err := NewDebounceFileWatcher() + require.NoError(t, err) + + // Make rapid changes to a file + go func() { + defer watcher.Close() + for range 5 { + watcher.watcher.Events <- fsnotify.Event{ + Name: filepath.Join("/tmp", "index.ts"), + Op: fsnotify.Write, + } + } + }() + + // Run watcher on main thread to avoid sleeping + watcher.Start() + + // Wait for debounce duration + select { + case ts, ok := <-watcher.RestartCh: + assert.NotZero(t, ts) + assert.True(t, ok) + case <-time.After(debounceDuration): + assert.Fail(t, "missing restart signal after rapid file changes") + } + select { + case <-watcher.RestartCh: + assert.Fail(t, "should only get one restart signal due to debouncing") + case ts, ok := <-time.After(debounceDuration): + assert.NotZero(t, ts) + assert.True(t, ok) + } + }) + + t.Run("watches multiple directories", func(t *testing.T) { + setup := NewWatcherIntegrationSetup(t) + defer setup.Cleanup() + + // Create multiple directories with functions + functionsDir := setup.SetupFunctionsDirectory() + libDir := filepath.Join(setup.TempDir, "lib") + require.NoError(t, os.MkdirAll(libDir, 0755)) + + // Create a utility file in lib directory + utilFile := filepath.Join(libDir, "utils.ts") + require.NoError(t, os.WriteFile(utilFile, []byte(`export function util() { return "utility"; }`), 0600)) + + watcher, err := NewDebounceFileWatcher() + require.NoError(t, err) + + go func() { + defer watcher.Close() + // Set up watch paths to include both directories + fsys := afero.NewOsFs() + watchPaths := []string{functionsDir, libDir} + require.NoError(t, watcher.SetWatchPaths(watchPaths, fsys)) + // Modify file in lib directory + require.NoError(t, os.WriteFile(utilFile, []byte(`export function util() { return "modified utility"; }`), 0600)) + // https://github.com/fsnotify/fsnotify/blob/main/fsnotify_test.go#L181 + time.Sleep(50 * time.Millisecond) + }() + + // Run watcher on main thread to avoid sleeping + watcher.Start() + + // Wait for restart signal + select { + case ts, ok := <-watcher.RestartCh: + assert.NotZero(t, ts, "change in watched lib directory should trigger restart") + assert.True(t, ok, "timer channel should be closed") + case <-time.After(2 * time.Second): + assert.Fail(t, "missing restart signal after modifying file in watched lib directory") + } + }) +} diff --git a/internal/testing/apitest/docker.go b/internal/testing/apitest/docker.go index 17a14355d..9b9020c04 100644 --- a/internal/testing/apitest/docker.go +++ b/internal/testing/apitest/docker.go @@ -3,7 +3,9 @@ package apitest import ( "bytes" "fmt" + "io" "net/http" + "strings" "github.com/docker/docker/api" "github.com/docker/docker/api/types/container" @@ -84,9 +86,17 @@ func MockDockerStop(docker *client.Client) { // Ref: internal/utils/docker.go::DockerRunOnce func setupDockerLogs(docker *client.Client, containerID, stdout string, exitCode int) error { + err := MockDockerLogsStream(docker, containerID, exitCode, strings.NewReader(stdout)) + gock.New(docker.DaemonHost()). + Delete("/v" + docker.ClientVersion() + "/containers/" + containerID). + Reply(http.StatusOK) + return err +} + +func MockDockerLogsStream(docker *client.Client, containerID string, exitCode int, r io.Reader) error { var body bytes.Buffer writer := stdcopy.NewStdWriter(&body, stdcopy.Stdout) - _, err := writer.Write([]byte(stdout)) + _, err := io.Copy(writer, r) gock.New(docker.DaemonHost()). Get("/v"+docker.ClientVersion()+"/containers/"+containerID+"/logs"). Reply(http.StatusOK). @@ -99,9 +109,6 @@ func setupDockerLogs(docker *client.Client, containerID, stdout string, exitCode State: &container.State{ ExitCode: exitCode, }}}) - gock.New(docker.DaemonHost()). - Delete("/v" + docker.ClientVersion() + "/containers/" + containerID). - Reply(http.StatusOK) return err } diff --git a/internal/utils/container_output.go b/internal/utils/container_output.go index 8f417afd1..ccbbab033 100644 --- a/internal/utils/container_output.go +++ b/internal/utils/container_output.go @@ -12,8 +12,6 @@ import ( "strings" "github.com/docker/docker/pkg/jsonmessage" - "github.com/docker/docker/pkg/stdcopy" - "github.com/go-errors/errors" ) func ProcessPullOutput(out io.ReadCloser, p Program) error { @@ -205,36 +203,3 @@ func ProcessDiffOutput(diffBytes []byte) ([]byte, error) { } return []byte(diffHeader + "\n\n" + strings.Join(filteredDiffDdls, "\n\n") + "\n"), nil } - -func ProcessPsqlOutput(out io.Reader, p Program) error { - r, w := io.Pipe() - doneCh := make(chan struct{}, 1) - - go func() { - scanner := bufio.NewScanner(r) - - for scanner.Scan() { - select { - case <-doneCh: - return - default: - } - - line := scanner.Text() - p.Send(PsqlMsg(&line)) - } - }() - - var errBuf bytes.Buffer - if _, err := stdcopy.StdCopy(w, &errBuf, out); err != nil { - return err - } - if errBuf.Len() > 0 { - return errors.New("Error running SQL: " + errBuf.String()) - } - - doneCh <- struct{}{} - p.Send(PsqlMsg(nil)) - - return nil -} diff --git a/internal/utils/container_output_test.go b/internal/utils/container_output_test.go index cf3e529bb..325084618 100644 --- a/internal/utils/container_output_test.go +++ b/internal/utils/container_output_test.go @@ -1,7 +1,6 @@ package utils import ( - "bytes" "encoding/json" "io" "sync" @@ -9,7 +8,6 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/docker/docker/pkg/jsonmessage" - "github.com/docker/docker/pkg/stdcopy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -65,41 +63,6 @@ func TestProcessDiffOutput(t *testing.T) { }) } -func TestProcessPsqlOutput(t *testing.T) { - t.Run("processes psql output", func(t *testing.T) { - var buf bytes.Buffer - writer := stdcopy.NewStdWriter(&buf, stdcopy.Stdout) - _, err := writer.Write([]byte("test output\n")) - require.NoError(t, err) - - var lastLine *string - p := NewMockProgram(func(msg tea.Msg) { - if m, ok := msg.(PsqlMsg); ok { - lastLine = m - } - }) - - err = ProcessPsqlOutput(&buf, p) - - assert.NoError(t, err) - assert.Nil(t, lastLine) - }) - - t.Run("handles stderr output", func(t *testing.T) { - var buf bytes.Buffer - writer := stdcopy.NewStdWriter(&buf, stdcopy.Stderr) - _, err := writer.Write([]byte("error message\n")) - require.NoError(t, err) - - p := NewMockProgram(nil) - - err = ProcessPsqlOutput(&buf, p) - - assert.Error(t, err) - assert.Contains(t, err.Error(), "error message") - }) -} - func TestProcessPullOutput(t *testing.T) { t.Run("processes docker pull messages", func(t *testing.T) { messages := []jsonmessage.JSONMessage{ diff --git a/internal/utils/docker.go b/internal/utils/docker.go index 374182a3e..7e908649e 100644 --- a/internal/utils/docker.go +++ b/internal/utils/docker.go @@ -376,13 +376,19 @@ func DockerRunOnceWithConfig(ctx context.Context, config container.Config, hostC return DockerStreamLogs(ctx, container, stdout, stderr) } -func DockerStreamLogs(ctx context.Context, containerId string, stdout, stderr io.Writer) error { - // Stream logs - logs, err := Docker.ContainerLogs(ctx, containerId, container.LogsOptions{ +var ErrContainerKilled = errors.New("exit 137") + +func DockerStreamLogs(ctx context.Context, containerId string, stdout, stderr io.Writer, opts ...func(*container.LogsOptions)) error { + logsOptions := container.LogsOptions{ ShowStdout: true, ShowStderr: true, Follow: true, - }) + } + for _, apply := range opts { + apply(&logsOptions) + } + // Stream logs + logs, err := Docker.ContainerLogs(ctx, containerId, logsOptions) if err != nil { return errors.Errorf("failed to read docker logs: %w", err) } @@ -395,10 +401,15 @@ func DockerStreamLogs(ctx context.Context, containerId string, stdout, stderr io if err != nil { return errors.Errorf("failed to inspect docker container: %w", err) } - if resp.State.ExitCode > 0 { - return errors.Errorf("error running container: exit %d", resp.State.ExitCode) + switch resp.State.ExitCode { + case 0: + return nil + case 137: + err = ErrContainerKilled + default: + err = errors.Errorf("exit %d", resp.State.ExitCode) } - return nil + return errors.Errorf("error running container: %w", err) } func DockerStreamLogsOnce(ctx context.Context, containerId string, stdout, stderr io.Writer) error {