Skip to content

Commit e235a49

Browse files
committed
chore: add debounce file watcher
1 parent 5755b8a commit e235a49

File tree

3 files changed

+252
-49
lines changed

3 files changed

+252
-49
lines changed

internal/functions/serve/serve.go

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package serve
22

33
import (
4-
"bufio"
54
"context"
65
_ "embed"
76
"encoding/json"
@@ -11,7 +10,6 @@ import (
1110
"strconv"
1211
"strings"
1312

14-
"github.com/containerd/errdefs"
1513
"github.com/docker/docker/api/types/container"
1614
"github.com/docker/docker/api/types/network"
1715
"github.com/docker/go-connections/nat"
@@ -48,6 +46,7 @@ func (mode InspectMode) toFlag() string {
4846
type RuntimeOption struct {
4947
InspectMode *InspectMode
5048
InspectMain bool
49+
fileWatcher *debounceFileWatcher
5150
}
5251

5352
func (i *RuntimeOption) toArgs() []string {
@@ -70,13 +69,20 @@ const (
7069
var mainFuncEmbed string
7170

7271
func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error {
73-
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
72+
watcher, err := NewDebounceFileWatcher()
73+
if err != nil {
7474
return err
7575
}
76-
watcher := NewFileWatcher()
7776
go watcher.Start(ctx)
78-
streamer := NewLogStreamer()
79-
go streamer.Start(ctx)
77+
defer watcher.Close()
78+
// TODO: refactor this to edge runtime service
79+
runtimeOption.fileWatcher = watcher
80+
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
81+
return err
82+
}
83+
streamer := NewLogStreamer(ctx)
84+
go streamer.Start(utils.EdgeRuntimeId)
85+
defer streamer.Close()
8086
for {
8187
select {
8288
case <-ctx.Done():
@@ -112,49 +118,6 @@ func restartEdgeRuntime(ctx context.Context, envFilePath string, noVerifyJWT *bo
112118
return ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys)
113119
}
114120

115-
type logStreamer struct {
116-
ErrCh chan error
117-
}
118-
119-
func NewLogStreamer() logStreamer {
120-
return logStreamer{
121-
ErrCh: make(chan error, 1),
122-
}
123-
}
124-
125-
func (s logStreamer) Start(ctx context.Context) {
126-
for {
127-
if err := utils.DockerStreamLogs(ctx, utils.EdgeRuntimeId, os.Stdout, os.Stderr, func(lo *container.LogsOptions) {
128-
lo.Timestamps = true
129-
}); err != nil &&
130-
!errdefs.IsNotFound(err) &&
131-
!strings.HasSuffix(err.Error(), "exit 137") &&
132-
!strings.HasSuffix(err.Error(), "can not get logs from container which is dead or marked for removal") {
133-
s.ErrCh <- err
134-
break
135-
}
136-
}
137-
}
138-
139-
type fileWatcher struct {
140-
RestartCh chan struct{}
141-
}
142-
143-
func NewFileWatcher() fileWatcher {
144-
return fileWatcher{
145-
RestartCh: make(chan struct{}, 1),
146-
}
147-
}
148-
149-
func (w *fileWatcher) Start(ctx context.Context) {
150-
// TODO: implement fs.notify
151-
fmt.Fprintln(os.Stderr, "Press enter to reload...")
152-
scanner := bufio.NewScanner(os.Stdin)
153-
for scanner.Scan() {
154-
w.RestartCh <- struct{}{}
155-
}
156-
}
157-
158121
func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, dbUrl string, runtimeOption RuntimeOption, fsys afero.Fs) error {
159122
// 1. Parse custom env file
160123
env, err := parseEnvFile(envFilePath, fsys)
@@ -192,6 +155,19 @@ func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool,
192155
if err != nil {
193156
return err
194157
}
158+
if watcher := runtimeOption.fileWatcher; watcher != nil {
159+
var watchPaths []string
160+
for _, b := range binds {
161+
// Get the directory containing the path
162+
hostPath := strings.Split(b, ":")[0]
163+
if filepath.IsAbs(hostPath) {
164+
watchPaths = append(watchPaths, hostPath)
165+
}
166+
}
167+
if err := watcher.SetWatchPaths(watchPaths, fsys); err != nil {
168+
return err
169+
}
170+
}
195171
env = append(env, "SUPABASE_INTERNAL_FUNCTIONS_CONFIG="+functionsConfigString)
196172
// 3. Parse entrypoint script
197173
cmd := append([]string{

internal/functions/serve/streamer.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package serve
2+
3+
import (
4+
"context"
5+
"os"
6+
"strings"
7+
8+
"github.com/containerd/errdefs"
9+
"github.com/docker/docker/api/types/container"
10+
"github.com/supabase/cli/internal/utils"
11+
)
12+
13+
type logStreamer struct {
14+
ctx context.Context
15+
cancel context.CancelFunc
16+
ErrCh chan error
17+
}
18+
19+
func NewLogStreamer(ctx context.Context) logStreamer {
20+
cancelCtx, cancel := context.WithCancel(ctx)
21+
return logStreamer{
22+
ctx: cancelCtx,
23+
cancel: cancel,
24+
ErrCh: make(chan error, 1),
25+
}
26+
}
27+
28+
func (s *logStreamer) Start(containerID string) {
29+
for {
30+
if err := utils.DockerStreamLogs(s.ctx, containerID, os.Stdout, os.Stderr, func(lo *container.LogsOptions) {
31+
lo.Timestamps = true
32+
}); err != nil &&
33+
!errdefs.IsNotFound(err) &&
34+
!strings.HasSuffix(err.Error(), "exit 137") &&
35+
!strings.HasSuffix(err.Error(), "can not get logs from container which is dead or marked for removal") {
36+
s.ErrCh <- err
37+
break
38+
}
39+
}
40+
}
41+
42+
func (s *logStreamer) Close() {
43+
if s.cancel != nil {
44+
s.cancel()
45+
}
46+
}

internal/functions/serve/watcher.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package serve
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io/fs"
7+
"os"
8+
"path/filepath"
9+
"strings"
10+
"time"
11+
12+
"github.com/fsnotify/fsnotify"
13+
"github.com/go-errors/errors"
14+
"github.com/spf13/afero"
15+
"github.com/supabase/cli/internal/utils"
16+
)
17+
18+
const (
19+
// Debounce duration for file changes
20+
debounceDuration = 500 * time.Millisecond
21+
restartEvents = fsnotify.Write | fsnotify.Create | fsnotify.Remove | fsnotify.Rename
22+
)
23+
24+
var (
25+
// Directories to ignore.
26+
ignoredDirNames = []string{
27+
".git",
28+
"node_modules",
29+
".vscode",
30+
".idea",
31+
".DS_Store",
32+
"vendor",
33+
}
34+
35+
// Patterns for ignoring file events.
36+
ignoredFilePatterns = []struct {
37+
Prefix string // File basename prefix
38+
Suffix string // File basename suffix
39+
ExactMatch string // File basename exact match
40+
Op fsnotify.Op // Specific operation to ignore for this pattern (0 for any op)
41+
}{
42+
{Suffix: "~"}, // Common backup files (e.g., emacs, gedit)
43+
{Prefix: ".", Suffix: ".swp"}, // Vim swap files
44+
{Prefix: ".", Suffix: ".swx"}, // Vim swap files (extended)
45+
{Prefix: "___", Suffix: "___"}, // Deno deploy/bundle temporary files often look like ___<slug>___<hash>___
46+
{Prefix: "___"}, // Some other editor temp files might start with this
47+
{Suffix: ".tmp"}, // Generic temp files
48+
{Prefix: ".#"}, // Emacs lock files
49+
{Suffix: "___", Op: fsnotify.Chmod}, // Deno specific temp file pattern during write (often involves a chmod)
50+
}
51+
)
52+
53+
// isIgnoredFileEvent checks if a file event should be ignored based on predefined patterns.
54+
func isIgnoredFileEvent(eventName string, eventOp fsnotify.Op) bool {
55+
baseName := filepath.Base(eventName)
56+
for _, p := range ignoredFilePatterns {
57+
match := false
58+
if p.ExactMatch != "" && baseName == p.ExactMatch {
59+
match = true
60+
} else {
61+
// Check prefix if specified
62+
prefixMatch := p.Prefix == "" || strings.HasPrefix(baseName, p.Prefix)
63+
// Check suffix if specified
64+
suffixMatch := p.Suffix == "" || strings.HasSuffix(baseName, p.Suffix)
65+
66+
// Both prefix and suffix must match
67+
if p.Prefix != "" && p.Suffix != "" {
68+
match = prefixMatch && suffixMatch
69+
// Only prefix specified
70+
} else if p.Prefix != "" {
71+
match = prefixMatch
72+
// Only suffix specified
73+
} else if p.Suffix != "" {
74+
match = suffixMatch
75+
}
76+
}
77+
78+
if match {
79+
// If Op is 0, it means the pattern applies to any operation.
80+
// Otherwise, check if the event's operation is relevant to the pattern's Op.
81+
if p.Op == 0 || (eventOp&p.Op) != 0 {
82+
return true
83+
}
84+
}
85+
}
86+
return false
87+
}
88+
89+
type debounceFileWatcher struct {
90+
watcher *fsnotify.Watcher
91+
restartTimer *time.Timer
92+
RestartCh <-chan time.Time
93+
ErrCh <-chan error
94+
}
95+
96+
func NewDebounceFileWatcher() (*debounceFileWatcher, error) {
97+
restartTimer := time.NewTimer(debounceDuration)
98+
if !restartTimer.Stop() {
99+
return nil, errors.New("failed to initialise timer")
100+
}
101+
watcher, err := fsnotify.NewWatcher()
102+
if err != nil {
103+
return nil, errors.Errorf("failed to create file watcher: %w", err)
104+
}
105+
return &debounceFileWatcher{
106+
watcher: watcher,
107+
ErrCh: watcher.Errors,
108+
restartTimer: restartTimer,
109+
RestartCh: restartTimer.C,
110+
}, nil
111+
}
112+
113+
func (w *debounceFileWatcher) Start(ctx context.Context) {
114+
for {
115+
event, ok := <-w.watcher.Events
116+
if !ok {
117+
return
118+
}
119+
120+
if !event.Has(restartEvents) || isIgnoredFileEvent(event.Name, event.Op) {
121+
fmt.Fprintf(utils.GetDebugLogger(), "Ignoring file event: %s (%s)\n", event.Name, event.Op.String())
122+
continue
123+
}
124+
125+
fileName := filepath.Base(event.Name)
126+
if fileName == "config.toml" {
127+
fmt.Fprintf(os.Stderr, "Config file change detected: %s (%s) - will reload configuration\n", event.Name, event.Op.String())
128+
} else {
129+
fmt.Fprintf(os.Stderr, "File change detected: %s (%s)\n", event.Name, event.Op.String())
130+
}
131+
132+
if !w.restartTimer.Reset(debounceDuration) {
133+
fmt.Fprintln(utils.GetDebugLogger(), "Failed to restart debounce timer.")
134+
}
135+
}
136+
}
137+
138+
func (w *debounceFileWatcher) SetWatchPaths(watchPaths []string, fsys afero.Fs) error {
139+
shouldWatchDirs := make(map[string]struct{})
140+
for _, hostPath := range watchPaths {
141+
// Ignore non-existent paths
142+
if err := afero.Walk(fsys, hostPath, func(path string, info fs.FileInfo, err error) error {
143+
if err != nil {
144+
return errors.New(err)
145+
}
146+
if path == hostPath || info.IsDir() {
147+
shouldWatchDirs[path] = struct{}{}
148+
}
149+
return nil
150+
}); err != nil {
151+
return err
152+
}
153+
}
154+
// Add directories to watch, ignoring duplicates
155+
for hostPath := range shouldWatchDirs {
156+
if err := w.watcher.Add(hostPath); err != nil {
157+
return errors.Errorf("failed to watch directory: %w", err)
158+
}
159+
fmt.Fprintln(utils.GetDebugLogger(), "Added directory from watcher:", hostPath)
160+
}
161+
// Remove directories that are no longer needed
162+
for _, hostPath := range w.watcher.WatchList() {
163+
if _, ok := shouldWatchDirs[hostPath]; !ok {
164+
if err := w.watcher.Remove(hostPath); err != nil {
165+
return errors.Errorf("failed to remove watch directory: %w", err)
166+
}
167+
fmt.Fprintln(utils.GetDebugLogger(), "Removed directory from watcher:", hostPath)
168+
}
169+
}
170+
return nil
171+
}
172+
173+
func (r *debounceFileWatcher) Close() error {
174+
if r.watcher != nil {
175+
return r.watcher.Close()
176+
}
177+
if r.restartTimer != nil {
178+
r.restartTimer.Stop()
179+
}
180+
return nil
181+
}

0 commit comments

Comments
 (0)