Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 74 additions & 7 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import EventEmitter from 'events';
import * as fsSync from 'fs';
import fs from 'fs/promises';
import path from 'path';
import { GenkitError } from '../types';
import {
RunActionResponseSchema,
type Action,
type RunActionResponse,
} from '../types/action';
import * as apis from '../types/apis';
import type { GenkitError } from '../types/error';
import type { TraceData } from '../types/trace';
import { logger } from '../utils/logger';
import {
Expand Down Expand Up @@ -228,7 +228,12 @@ export class RuntimeManager {
responseType: 'stream',
}
)
.catch(this.httpErrorHandler);
.catch((err) =>
this.handleStreamError(
err,
`Error running action key='${input.key}'.`
)
);
let genkitVersion: string;
if (response.headers['x-genkit-version']) {
genkitVersion = response.headers['x-genkit-version'];
Expand Down Expand Up @@ -302,7 +307,10 @@ export class RuntimeManager {
responseType: 'stream', // Use stream to get early headers
})
.catch((err) =>
this.httpErrorHandler(err, `Error running action key='${input.key}'.`)
this.handleStreamError(
err,
`Error running action key='${input.key}'.`
)
);

const traceId = response.headers['x-genkit-trace-id'];
Expand Down Expand Up @@ -735,15 +743,23 @@ export class RuntimeManager {
/**
* Handles an HTTP error.
*/
private httpErrorHandler(error: AxiosError, message?: string): any {
private httpErrorHandler(error: AxiosError, message?: string): never {
const newError = new GenkitToolsError(message || 'Internal Error');

if (error.response) {
if ((error.response?.data as any).message) {
newError.message = (error.response?.data as any).message;
}
// we got a non-200 response; copy the payload and rethrow
newError.data = error.response.data as GenkitError;
newError.stack = (error.response?.data as any).message;
if ((error.response?.data as any).message) {
newError.data.data = {
...newError.data.data,
genkitErrorMessage: message,
genkitErrorDetails: {
stack: (error.response?.data as any).message,
traceId: (error.response?.data as any).traceId,
},
};
}
throw newError;
}

Expand All @@ -753,6 +769,57 @@ export class RuntimeManager {
});
}

/**
* Handles a stream error by reading the stream and then calling httpErrorHandler.
*/
private async handleStreamError(
error: AxiosError,
message: string
): Promise<never> {
if (
error.response &&
error.config?.responseType === 'stream' &&
(error.response.data as any).on
) {
try {
const body = await this.streamToString(error.response.data);
try {
error.response.data = JSON.parse(body);
} catch (e) {
error.response.data = {
message: body || 'Unknown error',
};
}
} catch (e) {
// If stream reading fails, we must replace the stream object with a safe error object
// to prevent circular structure errors during JSON serialization.
error.response.data = {
message: 'Failed to read error response stream',
details: String(e),
};
}
}
this.httpErrorHandler(error, message);
}

/**
* Helper to convert a stream to string.
*/
private streamToString(stream: any): Promise<string> {
return new Promise((resolve, reject) => {
let buffer = '';
stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
});
stream.on('end', () => {
resolve(buffer);
});
stream.on('error', (err: Error) => {
reject(err);
});
});
}

/**
* Performs health checks on all runtimes.
*/
Expand Down
31 changes: 26 additions & 5 deletions go/genkit/reflection.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,15 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res
refErr.Details.TraceID = &resp.Telemetry.TraceID
}

json.NewEncoder(w).Encode(errorResponse{Error: refErr})
reflectErr, err := json.Marshal(refErr)
if err != nil {
logger.FromContext(ctx).Error("writing output", "err", err)
return nil
}
_, err = fmt.Fprintf(w, "{\"error\": %s }", reflectErr)
if err != nil {
return err
}
return nil
}

Expand All @@ -477,10 +485,17 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res
errorResponse.Details.TraceID = &resp.Telemetry.TraceID
}

if !headersSent {
w.WriteHeader(errorResponse.Code)
reflectErr, err := json.Marshal(errorResponse)
if err != nil {
logger.FromContext(ctx).Error("writing output", "err", err)
return nil
}
return writeJSON(ctx, w, errorResponse)

_, err = fmt.Fprintf(w, "{\"error\": %s }", reflectErr)
if err != nil {
return err
}
return nil
}

// Success case
Expand All @@ -491,7 +506,13 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res
Result: resp.Result,
Telemetry: telemetry{TraceID: resp.Telemetry.TraceID},
}
json.NewEncoder(w).Encode(finalResponse)
data, err := json.Marshal(finalResponse)
if err != nil {
logger.FromContext(ctx).Error("writing output", "err", err)
return nil
}

w.Write(data)
} else {
// For non-streaming, headers were already sent via telemetry callback
// Response already includes telemetry.traceId in body
Expand Down
4 changes: 2 additions & 2 deletions go/genkit/reflection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ func TestServeMux(t *testing.T) {
{
name: "invalid action key",
body: `{"key": "/custom/test/invalid", "input": 3}`,
wantStatus: http.StatusNotFound,
wantStatus: http.StatusOK,
},
{
name: "invalid input type",
body: `{"key": "/custom/test/inc", "input": "not a number"}`,
wantStatus: http.StatusBadRequest,
wantStatus: http.StatusOK,
},
}

Expand Down
2 changes: 1 addition & 1 deletion go/samples/basic-gemini/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
},
}),
ai.WithStreaming(cb),
ai.WithOutputSchemaName("joke"),
ai.WithOutputSchemaName("Joke"),
ai.WithPrompt(`Tell short jokes about %s`, input))
if err != nil {
return "", err
Expand Down
6 changes: 1 addition & 5 deletions js/core/src/reflection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,7 @@ export class ReflectionServer {
};

// Headers may have been sent already (via onTraceStart), so check before setting status
if (!res.headersSent) {
res.status(500).json(errorResponse);
} else {
res.end(JSON.stringify(errorResponse));
}
res.status(200).end(JSON.stringify({ error: errorResponse }));
});

this.port = await this.findPort();
Expand Down
Loading