Skip to content

Commit

Permalink
[Go] Implemented new runtime discovery behavior. (#1179)
Browse files Browse the repository at this point in the history
  • Loading branch information
apascal07 authored Nov 5, 2024
1 parent 73c51dc commit 6eb7b9e
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 16 deletions.
21 changes: 17 additions & 4 deletions genkit-tools/common/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,27 @@ export async function findProjectRoot(): Promise<string> {
let currentDir = process.cwd();
while (currentDir !== path.parse(currentDir).root) {
const packageJsonPath = path.join(currentDir, 'package.json');
const goModPath = path.join(currentDir, 'go.mod');
try {
await fs.access(packageJsonPath);
return currentDir;
const [packageJsonExists, goModExists] = await Promise.all([
fs
.access(packageJsonPath)
.then(() => true)
.catch(() => false),
fs
.access(goModPath)
.then(() => true)
.catch(() => false),
]);
if (packageJsonExists || goModExists) {
return currentDir;
}
} catch {
currentDir = path.dirname(currentDir);
// Continue searching if any errors occur
}
currentDir = path.dirname(currentDir);
}
throw new Error('Could not find project root (package.json not found)');
throw new Error('Could not find project root');
}

/**
Expand Down
2 changes: 1 addition & 1 deletion go/genkit/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Init(ctx context.Context, opts *Options) error {
wg.Add(1)
go func() {
defer wg.Done()
s := startReflectionServer(errCh)
s := startReflectionServer(ctx, errCh)
mu.Lock()
servers = append(servers, s)
mu.Unlock()
Expand Down
112 changes: 102 additions & 10 deletions go/genkit/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"log/slog"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -44,14 +45,110 @@ import (
"go.opentelemetry.io/otel/trace"
)

type runtimeFileData struct {
ID string `json:"id"`
PID int `json:"pid"`
ReflectionServerURL string `json:"reflectionServerUrl"`
Timestamp string `json:"timestamp"`
}

type devServer struct {
reg *registry.Registry
runtimeFilePath string
}

// startReflectionServer starts the Reflection API server listening at the
// value of the environment variable GENKIT_REFLECTION_PORT for the port,
// or ":3100" if it is empty.
func startReflectionServer(errCh chan<- error) *http.Server {
slog.Info("starting reflection server")
func startReflectionServer(ctx context.Context, errCh chan<- error) *http.Server {
slog.Debug("starting reflection server")
addr := serverAddress("", "GENKIT_REFLECTION_PORT", "127.0.0.1:3100")
mux := newDevServeMux(registry.Global)
return startServer(addr, mux, errCh)
s := &devServer{reg: registry.Global}
if err := s.writeRuntimeFile(addr); err != nil {
slog.Error("failed to write runtime file", "error", err)
}
mux := newDevServeMux(s)
server := startServer(addr, mux, errCh)
go func() {
<-ctx.Done()
if err := s.cleanupRuntimeFile(); err != nil {
slog.Error("failed to cleanup runtime file", "error", err)
}
}()
return server
}

// writeRuntimeFile writes a file describing the runtime to the project root.
func (s *devServer) writeRuntimeFile(url string) error {
projectRoot, err := findProjectRoot()
if err != nil {
return fmt.Errorf("failed to find project root: %w", err)
}
runtimesDir := filepath.Join(projectRoot, ".genkit", "runtimes")
if err := os.MkdirAll(runtimesDir, 0755); err != nil {
return fmt.Errorf("failed to create runtimes directory: %w", err)
}
runtimeID := os.Getenv("GENKIT_RUNTIME_ID")
if runtimeID == "" {
runtimeID = strconv.Itoa(os.Getpid())
}
timestamp := time.Now().UTC().Format(time.RFC3339)
s.runtimeFilePath = filepath.Join(runtimesDir, fmt.Sprintf("%d-%s.json", os.Getpid(), timestamp))
data := runtimeFileData{
ID: runtimeID,
PID: os.Getpid(),
ReflectionServerURL: fmt.Sprintf("http://%s", url),
Timestamp: timestamp,
}
fileContent, err := json.MarshalIndent(data, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal runtime data: %w", err)
}
if err := os.WriteFile(s.runtimeFilePath, fileContent, 0644); err != nil {
return fmt.Errorf("failed to write runtime file: %w", err)
}
slog.Debug("runtime file written", "path", s.runtimeFilePath)
return nil
}

// cleanupRuntimeFile removes the runtime file associated with the dev server.
func (s *devServer) cleanupRuntimeFile() error {
if s.runtimeFilePath == "" {
return nil
}
content, err := os.ReadFile(s.runtimeFilePath)
if err != nil {
return fmt.Errorf("failed to read runtime file: %w", err)
}
var data runtimeFileData
if err := json.Unmarshal(content, &data); err != nil {
return fmt.Errorf("failed to unmarshal runtime data: %w", err)
}
if data.PID == os.Getpid() {
if err := os.Remove(s.runtimeFilePath); err != nil {
return fmt.Errorf("failed to remove runtime file: %w", err)
}
slog.Debug("runtime file cleaned up", "path", s.runtimeFilePath)
}
return nil
}

// findProjectRoot finds the project root by looking for a go.mod file.
func findProjectRoot() (string, error) {
dir, err := os.Getwd()
if err != nil {
return "", err
}
for {
if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
return dir, nil
}
parent := filepath.Dir(dir)
if parent == dir {
return "", fmt.Errorf("could not find project root (go.mod not found)")
}
dir = parent
}
}

// startFlowServer starts a production server listening at the given address.
Expand Down Expand Up @@ -129,13 +226,8 @@ func shutdownServers(servers []*http.Server) error {
return nil
}

type devServer struct {
reg *registry.Registry
}

func newDevServeMux(r *registry.Registry) *http.ServeMux {
func newDevServeMux(s *devServer) *http.ServeMux {
mux := http.NewServeMux()
s := &devServer{r}
handle(mux, "GET /api/__health", func(w http.ResponseWriter, _ *http.Request) error {
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion go/genkit/servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestDevServer(t *testing.T) {
core.DefineActionInRegistry(r, "devServer", "dec", atype.Custom, map[string]any{
"bar": "baz",
}, nil, dec)
srv := httptest.NewServer(newDevServeMux(r))
srv := httptest.NewServer(newDevServeMux(&devServer{reg: r}))
defer srv.Close()

t.Run("runAction", func(t *testing.T) {
Expand Down

0 comments on commit 6eb7b9e

Please sign in to comment.