diff --git a/genkit-tools/common/src/manager/manager.ts b/genkit-tools/common/src/manager/manager.ts index 2a78247bbb..ac1ba913e5 100644 --- a/genkit-tools/common/src/manager/manager.ts +++ b/genkit-tools/common/src/manager/manager.ts @@ -20,13 +20,13 @@ import EventEmitter from 'events'; import * as fsSync from 'fs'; import fs from 'fs/promises'; import path from 'path'; +import { GenkitError } from '../types'; import { RunActionResponseSchema, type Action, type RunActionResponse, } from '../types/action'; import * as apis from '../types/apis'; -import type { GenkitError } from '../types/error'; import type { TraceData } from '../types/trace'; import { logger } from '../utils/logger'; import { @@ -228,7 +228,12 @@ export class RuntimeManager { responseType: 'stream', } ) - .catch(this.httpErrorHandler); + .catch((err) => + this.handleStreamError( + err, + `Error running action key='${input.key}'.` + ) + ); let genkitVersion: string; if (response.headers['x-genkit-version']) { genkitVersion = response.headers['x-genkit-version']; @@ -302,7 +307,10 @@ export class RuntimeManager { responseType: 'stream', // Use stream to get early headers }) .catch((err) => - this.httpErrorHandler(err, `Error running action key='${input.key}'.`) + this.handleStreamError( + err, + `Error running action key='${input.key}'.` + ) ); const traceId = response.headers['x-genkit-trace-id']; @@ -735,15 +743,23 @@ export class RuntimeManager { /** * Handles an HTTP error. */ - private httpErrorHandler(error: AxiosError, message?: string): any { + private httpErrorHandler(error: AxiosError, message?: string): never { const newError = new GenkitToolsError(message || 'Internal Error'); if (error.response) { - if ((error.response?.data as any).message) { - newError.message = (error.response?.data as any).message; - } // we got a non-200 response; copy the payload and rethrow newError.data = error.response.data as GenkitError; + newError.stack = (error.response?.data as any).message; + if ((error.response?.data as any).message) { + newError.data.data = { + ...newError.data.data, + genkitErrorMessage: message, + genkitErrorDetails: { + stack: (error.response?.data as any).message, + traceId: (error.response?.data as any).traceId, + }, + }; + } throw newError; } @@ -753,6 +769,57 @@ export class RuntimeManager { }); } + /** + * Handles a stream error by reading the stream and then calling httpErrorHandler. + */ + private async handleStreamError( + error: AxiosError, + message: string + ): Promise { + if ( + error.response && + error.config?.responseType === 'stream' && + (error.response.data as any).on + ) { + try { + const body = await this.streamToString(error.response.data); + try { + error.response.data = JSON.parse(body); + } catch (e) { + error.response.data = { + message: body || 'Unknown error', + }; + } + } catch (e) { + // If stream reading fails, we must replace the stream object with a safe error object + // to prevent circular structure errors during JSON serialization. + error.response.data = { + message: 'Failed to read error response stream', + details: String(e), + }; + } + } + this.httpErrorHandler(error, message); + } + + /** + * Helper to convert a stream to string. + */ + private streamToString(stream: any): Promise { + return new Promise((resolve, reject) => { + let buffer = ''; + stream.on('data', (chunk: Buffer) => { + buffer += chunk.toString(); + }); + stream.on('end', () => { + resolve(buffer); + }); + stream.on('error', (err: Error) => { + reject(err); + }); + }); + } + /** * Performs health checks on all runtimes. */ diff --git a/go/genkit/reflection.go b/go/genkit/reflection.go index f0dcbf5578..1bd675f75a 100644 --- a/go/genkit/reflection.go +++ b/go/genkit/reflection.go @@ -467,7 +467,15 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res refErr.Details.TraceID = &resp.Telemetry.TraceID } - json.NewEncoder(w).Encode(errorResponse{Error: refErr}) + reflectErr, err := json.Marshal(refErr) + if err != nil { + logger.FromContext(ctx).Error("writing output", "err", err) + return nil + } + _, err = fmt.Fprintf(w, "{\"error\": %s }", reflectErr) + if err != nil { + return err + } return nil } @@ -477,10 +485,17 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res errorResponse.Details.TraceID = &resp.Telemetry.TraceID } - if !headersSent { - w.WriteHeader(errorResponse.Code) + reflectErr, err := json.Marshal(errorResponse) + if err != nil { + logger.FromContext(ctx).Error("writing output", "err", err) + return nil } - return writeJSON(ctx, w, errorResponse) + + _, err = fmt.Fprintf(w, "{\"error\": %s }", reflectErr) + if err != nil { + return err + } + return nil } // Success case @@ -491,7 +506,13 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res Result: resp.Result, Telemetry: telemetry{TraceID: resp.Telemetry.TraceID}, } - json.NewEncoder(w).Encode(finalResponse) + data, err := json.Marshal(finalResponse) + if err != nil { + logger.FromContext(ctx).Error("writing output", "err", err) + return nil + } + + w.Write(data) } else { // For non-streaming, headers were already sent via telemetry callback // Response already includes telemetry.traceId in body diff --git a/go/genkit/reflection_test.go b/go/genkit/reflection_test.go index 7b11914348..9f7aafc3bf 100644 --- a/go/genkit/reflection_test.go +++ b/go/genkit/reflection_test.go @@ -180,12 +180,12 @@ func TestServeMux(t *testing.T) { { name: "invalid action key", body: `{"key": "/custom/test/invalid", "input": 3}`, - wantStatus: http.StatusNotFound, + wantStatus: http.StatusOK, }, { name: "invalid input type", body: `{"key": "/custom/test/inc", "input": "not a number"}`, - wantStatus: http.StatusBadRequest, + wantStatus: http.StatusOK, }, } diff --git a/go/samples/basic-gemini/main.go b/go/samples/basic-gemini/main.go index 6be8c112ac..e61ec9df42 100644 --- a/go/samples/basic-gemini/main.go +++ b/go/samples/basic-gemini/main.go @@ -50,7 +50,7 @@ func main() { }, }), ai.WithStreaming(cb), - ai.WithOutputSchemaName("joke"), + ai.WithOutputSchemaName("Joke"), ai.WithPrompt(`Tell short jokes about %s`, input)) if err != nil { return "", err diff --git a/js/core/src/reflection.ts b/js/core/src/reflection.ts index 6af3d67d7a..9b9389ad9d 100644 --- a/js/core/src/reflection.ts +++ b/js/core/src/reflection.ts @@ -379,11 +379,7 @@ export class ReflectionServer { }; // Headers may have been sent already (via onTraceStart), so check before setting status - if (!res.headersSent) { - res.status(500).json(errorResponse); - } else { - res.end(JSON.stringify(errorResponse)); - } + res.status(200).end(JSON.stringify({ error: errorResponse })); }); this.port = await this.findPort();