From 515a357c8b1e6b3dce7b53d860e9064a0466667f Mon Sep 17 00:00:00 2001 From: Michael Dwan Date: Fri, 12 Sep 2025 11:18:43 -0600 Subject: [PATCH 1/4] use latest coglet for procedures --- pkg/config/config.go | 2 + pkg/dockerfile/coglet.go | 67 ++++++++++++++++++++++++++++ pkg/dockerfile/standard_generator.go | 13 +++++- 3 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 pkg/dockerfile/coglet.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 2074c1d3fd..8af54f67c4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -59,6 +59,8 @@ type Build struct { CogRuntime *bool `json:"cog_runtime,omitempty" yaml:"cog_runtime,omitempty"` PythonOverrides string `json:"python_overrides,omitempty" yaml:"python_overrides,omitempty"` + ProcedureMode bool `json:"-" yaml:"-"` + pythonRequirementsContent []string } diff --git a/pkg/dockerfile/coglet.go b/pkg/dockerfile/coglet.go new file mode 100644 index 0000000000..78b87f5cc9 --- /dev/null +++ b/pkg/dockerfile/coglet.go @@ -0,0 +1,67 @@ +package dockerfile + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +// GitHubRelease represents a GitHub release response +type GitHubRelease struct { + TagName string `json:"tag_name"` + Assets []struct { + Name string `json:"name"` + BrowserDownloadURL string `json:"browser_download_url"` + } `json:"assets"` +} + +// GetLatestCogletWheelURL fetches the latest coglet wheel URL from GitHub releases +func GetLatestCogletWheelURL(ctx context.Context) (string, error) { + // Create HTTP client with timeout + client := &http.Client{ + Timeout: 30 * time.Second, + } + + // GitHub API URL for latest release + apiURL := "https://api.github.com/repos/replicate/cog-runtime/releases/latest" + + // Create request with context + req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + + // Add headers + req.Header.Set("Accept", "application/vnd.github.v3+json") + req.Header.Set("User-Agent", "cog-cli") + + // Make the request + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("failed to fetch release data: %w", err) + } + defer resp.Body.Close() + + // Check status code + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("GitHub API returned status %d", resp.StatusCode) + } + + // Parse JSON response + var release GitHubRelease + if err := json.NewDecoder(resp.Body).Decode(&release); err != nil { + return "", fmt.Errorf("failed to parse release data: %w", err) + } + + // Find coglet wheel in assets + for _, asset := range release.Assets { + if strings.HasSuffix(asset.Name, ".whl") && strings.Contains(asset.Name, "coglet") { + return asset.BrowserDownloadURL, nil + } + } + + return "", fmt.Errorf("no coglet wheel found in latest release %s", release.TagName) +} diff --git a/pkg/dockerfile/standard_generator.go b/pkg/dockerfile/standard_generator.go index d30c2234a3..06fccf7bb8 100644 --- a/pkg/dockerfile/standard_generator.go +++ b/pkg/dockerfile/standard_generator.go @@ -458,7 +458,7 @@ func (g *StandardGenerator) installCog() (string, error) { return "", nil } - if g.Config.Build.CogRuntime != nil && *g.Config.Build.CogRuntime { + if g.Config.Build.CogRuntime != nil && *g.Config.Build.CogRuntime || g.Config.Build.ProcedureMode { return g.installCogRuntime() } @@ -485,10 +485,19 @@ func (g *StandardGenerator) installCogRuntime() (string, error) { if !CheckMajorMinorOnly(g.Config.Build.PythonVersion) { return "", fmt.Errorf("Python version must be .") } + + cogletURL := PinnedCogletURL + if g.Config.Build.ProcedureMode { + // if we're building a procedure, use the latest coglet release + if latestURL, err := GetLatestCogletWheelURL(context.TODO()); err == nil { + cogletURL = latestURL + } + } + cmds := []string{ "ENV R8_COG_VERSION=coglet", "ENV R8_PYTHON_VERSION=" + g.Config.Build.PythonVersion, - "RUN pip install " + PinnedCogletURL, + "RUN pip install " + cogletURL, } return strings.Join(cmds, "\n"), nil } From eee7de3ecc48a8dcad217628ca86504a3af10044 Mon Sep 17 00:00:00 2001 From: Michael Dwan Date: Fri, 12 Sep 2025 13:59:00 -0600 Subject: [PATCH 2/4] set ProcedureMode key on config if cli flag is present --- pkg/cli/predict.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/cli/predict.go b/pkg/cli/predict.go index 0efe985b3c..de7adc1d1f 100644 --- a/pkg/cli/predict.go +++ b/pkg/cli/predict.go @@ -187,6 +187,12 @@ func cmdPredict(cmd *cobra.Command, args []string) error { buildFast = cfg.Build.Fast } + // TODO[md]: this is a temporary hack to propagate a procedure flag through the build system without + // touching every function signature with another param. The cogpacks refactor addresses this. + if pipelinesImage { + cfg.Build.ProcedureMode = true + } + client := registry.NewRegistryClient() if buildFast || pipelinesImage { imageName = config.DockerImageName(projectDir) From 44fbeff84adda9c5a9c30bf16da64a02784d83bb Mon Sep 17 00:00:00 2001 From: Michael Dwan Date: Fri, 12 Sep 2025 14:00:57 -0600 Subject: [PATCH 3/4] redirect container logs to console logger --- pkg/cli/predict.go | 6 +- pkg/cli/train.go | 4 +- pkg/predict/log_handler.go | 115 ++++++++++++ pkg/predict/log_handler_test.go | 302 ++++++++++++++++++++++++++++++++ 4 files changed, 424 insertions(+), 3 deletions(-) create mode 100644 pkg/predict/log_handler.go create mode 100644 pkg/predict/log_handler_test.go diff --git a/pkg/cli/predict.go b/pkg/cli/predict.go index de7adc1d1f..84a3110d67 100644 --- a/pkg/cli/predict.go +++ b/pkg/cli/predict.go @@ -264,6 +264,8 @@ func cmdPredict(cmd *cobra.Command, args []string) error { console.Info("") console.Infof("Starting Docker image %s and running setup()...", imageName) + logHandler := predict.NewLogHandler() + predictor, err := predict.NewPredictor(ctx, command.RunOptions{ GPUs: gpus, Image: imageName, @@ -287,7 +289,7 @@ func cmdPredict(cmd *cobra.Command, args []string) error { }() timeout := time.Duration(setupTimeout) * time.Second - if err := predictor.Start(ctx, os.Stderr, timeout); err != nil { + if err := predictor.Start(ctx, logHandler, timeout); err != nil { // Only retry if we're using a GPU but but the user didn't explicitly select a GPU with --gpus // If the user specified the wrong GPU, they are explicitly selecting a GPU and they'll want to hear about it if gpus == "all" && errors.Is(err, docker.ErrMissingDeviceDriver) { @@ -303,7 +305,7 @@ func cmdPredict(cmd *cobra.Command, args []string) error { return err } - if err := predictor.Start(ctx, os.Stderr, timeout); err != nil { + if err := predictor.Start(ctx, logHandler, timeout); err != nil { return err } } else { diff --git a/pkg/cli/train.go b/pkg/cli/train.go index 301f74bd80..523ee557c9 100644 --- a/pkg/cli/train.go +++ b/pkg/cli/train.go @@ -117,6 +117,8 @@ func cmdTrain(cmd *cobra.Command, args []string) error { console.Info("") console.Infof("Starting Docker image %s...", imageName) + logHandler := predict.NewLogHandler() + predictor, err := predict.NewPredictor(ctx, command.RunOptions{ GPUs: gpus, Image: imageName, @@ -140,7 +142,7 @@ func cmdTrain(cmd *cobra.Command, args []string) error { } }() - if err := predictor.Start(ctx, os.Stderr, time.Duration(setupTimeout)*time.Second); err != nil { + if err := predictor.Start(ctx, logHandler, time.Duration(setupTimeout)*time.Second); err != nil { return err } diff --git a/pkg/predict/log_handler.go b/pkg/predict/log_handler.go new file mode 100644 index 0000000000..cd1298a06e --- /dev/null +++ b/pkg/predict/log_handler.go @@ -0,0 +1,115 @@ +package predict + +import ( + "bufio" + "encoding/json" + "os" + "strings" + "sync" + + "github.com/replicate/cog/pkg/util/console" +) + +// LogEntry represents a structured log entry from the container +type LogEntry struct { + Severity string `json:"severity"` + Timestamp string `json:"timestamp"` + Logger string `json:"logger"` + Caller string `json:"caller"` + Message string `json:"message"` + // Additional fields are ignored but preserved +} + +// LogHandler implements io.Writer and processes container stderr output +// It parses JSON logs and routes them to appropriate console levels, +// while handling unstructured logs gracefully. +type LogHandler struct { + mu sync.Mutex +} + +// NewLogHandler creates a new LogHandler +func NewLogHandler() *LogHandler { + return &LogHandler{} +} + +// Write implements io.Writer interface +func (lh *LogHandler) Write(p []byte) (n int, err error) { + lh.mu.Lock() + defer lh.mu.Unlock() + + // TEMPORARY: Tee raw output to stderr for debugging + os.Stderr.WriteString("RAW: " + string(p)) + + // Process each line + scanner := bufio.NewScanner(strings.NewReader(string(p))) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + lh.processLine(line) + } + + return len(p), nil +} + +// processLine processes a single log line +func (lh *LogHandler) processLine(line string) { + // Try to parse as JSON first + if lh.TryParseJSONLog(line) { + return + } + + // Handle unstructured logs + lh.handleUnstructuredLog(line) +} + +// TryParseJSONLog attempts to parse the line as a JSON log entry +// Returns true if successfully parsed and handled +// This is exported for testing purposes +func (lh *LogHandler) TryParseJSONLog(line string) bool { + var entry LogEntry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + return false + } + + // Route based on severity level + switch strings.ToLower(entry.Severity) { + case "debug": + console.Debug(entry.Message) + case "info": + console.Debug(entry.Message) // Info logs from container go to debug level + case "warn", "warning": + console.Warn(entry.Message) + case "error": + console.Error(entry.Message) + case "fatal": + console.Error(entry.Message) // Fatal logs from container go to error level + default: + // Unknown severity, treat as info + console.Debug(entry.Message) + } + + return true +} + +// handleUnstructuredLog handles non-JSON log lines +func (lh *LogHandler) handleUnstructuredLog(line string) { + // Check for common error patterns + lowerLine := strings.ToLower(line) + + // Route based on content patterns + switch { + case strings.Contains(lowerLine, "error") || strings.Contains(lowerLine, "failed") || strings.Contains(lowerLine, "exception"): + console.Error(line) + case strings.Contains(lowerLine, "warning") || strings.Contains(lowerLine, "warn"): + console.Warn(line) + case strings.Contains(lowerLine, "debug"): + console.Debug(line) + default: + // Default to debug level for unstructured logs + // This prevents cluttering the user's output with container internals + console.Debug(line) + } +} diff --git a/pkg/predict/log_handler_test.go b/pkg/predict/log_handler_test.go new file mode 100644 index 0000000000..566c6bed20 --- /dev/null +++ b/pkg/predict/log_handler_test.go @@ -0,0 +1,302 @@ +package predict + +import ( + "testing" +) + +// Test data from your actual sample output +const sampleContainerOutput = `Failed to parse log level "warning": unrecognized level: "warning" +Failed to parse log level "warning": unrecognized level: "warning" +Failed to parse log level "warning": unrecognized level: "warning" +{"severity":"info","timestamp":"2025-09-12T16:32:29.498Z","logger":"cog","caller":"cog/main.go:59","message":"configuration","use-procedure-mode":false,"await-explicit-shutdown":false,"one-shot":false,"upload-url":""} +{"severity":"info","timestamp":"2025-09-12T16:32:29.500Z","logger":"cog","caller":"cog/main.go:67","message":"starting Cog HTTP server","addr":"0.0.0.0:5000","version":"0.2.0b3","pid":6} +{"severity":"info","timestamp":"2025-09-12T16:32:29.505Z","logger":"cog-http-server","caller":"server/runner.go:213","message":"python runner started","pid":12} +{"severity":"info","timestamp":"2025-09-12T16:32:29.510Z","logger":"cog-http-server","caller":"server/runner.go:547","message":"configuring runner","module":"predict","predictor":"run","max_concurrency":1} +{"severity":"info","timestamp":"2025-09-12T16:32:29.755Z","logger":"cog-http-server","caller":"server/runner.go:642","message":"updating OpenAPI schema"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.763Z","logger":"cog-http-server","caller":"server/runner.go:664","message":"updating setup result"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.763Z","logger":"cog-http-server","caller":"server/runner.go:676","message":"setup succeeded"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.763Z","logger":"cog-http-server","caller":"server/runner.go:620","message":"runner is ready"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.786Z","logger":"cog-http-server","caller":"server/runner.go:425","message":"received prediction request","id":"v86bg93cb9t6t0cs7v2rdcz0j0"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.872Z","logger":"cog-http-server","caller":"server/runner.go:625","message":"runner is busy"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.874Z","logger":"cog-http-server","caller":"server/runner.go:708","message":"received prediction response","id":"v86bg93cb9t6t0cs7v2rdcz0j0"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.876Z","logger":"cog-http-server","caller":"server/runner.go:781","message":"prediction completed","id":"v86bg93cb9t6t0cs7v2rdcz0j0","status":"succeeded"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.885Z","logger":"cog","caller":"cog/main.go:129","message":"stopping Cog HTTP server","signal":"terminated"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.886Z","logger":"cog-http-server","caller":"server/runner.go:317","message":"stop requested"} +{"severity":"info","timestamp":"2025-09-12T16:32:29.999Z","logger":"cog-http-server","caller":"server/runner.go:591","message":"python runner exited successfully","pid":12} +{"severity":"info","timestamp":"2025-09-12T16:32:30.000Z","logger":"cog","caller":"cog/main.go:144","message":"shutdown completed normally"}` + +func TestLogHandler_JSONLogs(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "debug severity", + input: `{"severity":"debug","message":"debug message"}`, + expected: "debug message", + }, + { + name: "info severity", + input: `{"severity":"info","message":"info message"}`, + expected: "info message", + }, + { + name: "warn severity", + input: `{"severity":"warn","message":"warning message"}`, + expected: "warning message", + }, + { + name: "warning severity", + input: `{"severity":"warning","message":"warning message"}`, + expected: "warning message", + }, + { + name: "error severity", + input: `{"severity":"error","message":"error message"}`, + expected: "error message", + }, + { + name: "fatal severity", + input: `{"severity":"fatal","message":"fatal message"}`, + expected: "fatal message", + }, + { + name: "unknown severity", + input: `{"severity":"unknown","message":"unknown message"}`, + expected: "unknown message", + }, + { + name: "missing severity field", + input: `{"message":"test message"}`, + expected: "test message", + }, + { + name: "missing message field", + input: `{"severity":"info"}`, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := NewLogHandler() + n, err := handler.Write([]byte(tt.input + "\n")) + + if err != nil { + t.Errorf("Write failed: %v", err) + } + if n != len(tt.input)+1 { + t.Errorf("Expected to write %d bytes, wrote %d", len(tt.input)+1, n) + } + + // For now, just verify the handler doesn't crash + // The actual console routing behavior is tested in integration tests + }) + } +} + +func TestLogHandler_UnstructuredLogs(t *testing.T) { + tests := []string{ + "Something went wrong with error handling", + "Failed to connect to database", + "Exception occurred during processing", + "This is a warning message", + "This is a warn message", + "Debug information here", + "Just some regular output", + `Failed to parse log level "warning": unrecognized level: "warning"`, + } + + for _, input := range tests { + t.Run(input, func(t *testing.T) { + handler := NewLogHandler() + n, err := handler.Write([]byte(input + "\n")) + + if err != nil { + t.Errorf("Write failed: %v", err) + } + if n != len(input)+1 { + t.Errorf("Expected to write %d bytes, wrote %d", len(input)+1, n) + } + }) + } +} + +func TestLogHandler_LineProcessing(t *testing.T) { + tests := []struct { + name string + input string + }{ + { + name: "empty line", + input: "\n", + }, + { + name: "whitespace only", + input: " \n", + }, + { + name: "single line", + input: "test message\n", + }, + { + name: "multiple lines", + input: "line 1\nline 2\nline 3\n", + }, + { + name: "no trailing newline", + input: "test message", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := NewLogHandler() + n, err := handler.Write([]byte(tt.input)) + + if err != nil { + t.Errorf("Write failed: %v", err) + } + if n != len(tt.input) { + t.Errorf("Expected to write %d bytes, wrote %d", len(tt.input), n) + } + }) + } +} + +func TestLogHandler_RealSampleOutput(t *testing.T) { + handler := NewLogHandler() + n, err := handler.Write([]byte(sampleContainerOutput)) + + if err != nil { + t.Errorf("Write failed: %v", err) + } + if n != len(sampleContainerOutput) { + t.Errorf("Expected to write %d bytes, wrote %d", len(sampleContainerOutput), n) + } +} + +func TestLogHandler_MalformedJSON(t *testing.T) { + tests := []string{ + `{"severity":"info","message":"incomplete`, + "This is just plain text", + "{}", + `{"message":"test message"}`, + `{"severity":"info"}`, + } + + for _, input := range tests { + t.Run(input, func(t *testing.T) { + handler := NewLogHandler() + n, err := handler.Write([]byte(input + "\n")) + + if err != nil { + t.Errorf("Write failed: %v", err) + } + if n != len(input)+1 { + t.Errorf("Expected to write %d bytes, wrote %d", len(input)+1, n) + } + }) + } +} + +func TestLogHandler_Concurrency(t *testing.T) { + handler := NewLogHandler() + + // Test concurrent writes + done := make(chan bool, 2) + + go func() { + handler.Write([]byte("goroutine 1\n")) + done <- true + }() + + go func() { + handler.Write([]byte("goroutine 2\n")) + done <- true + }() + + // Wait for both goroutines + <-done + <-done + + // Should not crash or have race conditions +} + +func TestLogHandler_JSONParsing(t *testing.T) { + tests := []struct { + name string + input string + shouldParse bool + severity string + message string + }{ + { + name: "valid JSON with all fields", + input: `{"severity":"info","timestamp":"2025-09-12T16:32:29.498Z","logger":"cog","caller":"cog/main.go:59","message":"configuration"}`, + shouldParse: true, + severity: "info", + message: "configuration", + }, + { + name: "valid JSON with extra fields", + input: `{"severity":"warn","message":"warning message","extra":"field"}`, + shouldParse: true, + severity: "warn", + message: "warning message", + }, + { + name: "invalid JSON", + input: `{"severity":"info","message":"incomplete`, + shouldParse: false, + }, + { + name: "not JSON at all", + input: "This is just plain text", + shouldParse: false, + }, + { + name: "empty object", + input: "{}", + shouldParse: true, + severity: "", + message: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := NewLogHandler() + + // Test the TryParseJSONLog method directly + parsed := handler.TryParseJSONLog(tt.input) + + if parsed != tt.shouldParse { + t.Errorf("Expected parse result %v, got %v", tt.shouldParse, parsed) + } + }) + } +} + +func TestLogHandler_EmptyInput(t *testing.T) { + handler := NewLogHandler() + + // Test empty input + n, err := handler.Write([]byte("")) + if err != nil { + t.Errorf("Write failed: %v", err) + } + if n != 0 { + t.Errorf("Expected to write 0 bytes, wrote %d", n) + } + + // Test input with only newlines + n, err = handler.Write([]byte("\n\n\n")) + if err != nil { + t.Errorf("Write failed: %v", err) + } + if n != 3 { + t.Errorf("Expected to write 3 bytes, wrote %d", n) + } +} From aa0e1bc2395848056450b4e5229569cb47b0266c Mon Sep 17 00:00:00 2001 From: Michael Dwan Date: Fri, 12 Sep 2025 16:08:20 -0600 Subject: [PATCH 4/4] Remove `cog_runtime: true` from the procedures template --- pkg/cli/init-templates/pipeline/cog.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/cli/init-templates/pipeline/cog.yaml b/pkg/cli/init-templates/pipeline/cog.yaml index 2f06bf1a05..07daf92e1a 100644 --- a/pkg/cli/init-templates/pipeline/cog.yaml +++ b/pkg/cli/init-templates/pipeline/cog.yaml @@ -3,6 +3,5 @@ build: gpu: false - cog_runtime: true python_requirements: requirements.txt predict: "main.py:run"