Skip to content
This repository was archived by the owner on Aug 5, 2025. It is now read-only.

Commit e5c0789

Browse files
authored
feat(instrumentation): make the openai instrumentation context aware
1 parent 11546b7 commit e5c0789

File tree

4 files changed

+507
-232
lines changed

4 files changed

+507
-232
lines changed

src/api.ts

Lines changed: 50 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ export class API {
399399

400400
return response.data;
401401
} catch (e) {
402+
console.error(e);
402403
if (e instanceof AxiosError) {
403404
throw new Error(JSON.stringify(e.response?.data.errors));
404405
} else {
@@ -426,6 +427,7 @@ export class API {
426427

427428
return response.data;
428429
} catch (e) {
430+
console.error(e);
429431
if (e instanceof AxiosError) {
430432
throw new Error(JSON.stringify(e.response?.data));
431433
} else {
@@ -696,70 +698,60 @@ export class API {
696698
orderBy?: GenerationsOrderBy;
697699
}): Promise<PaginatedResponse<PersistedGeneration>> {
698700
const query = `
699-
query GetGenerations(
700-
$after: ID,
701-
$before: ID,
702-
$cursorAnchor: DateTime,
703-
$filters: [generationsInputType!],
704-
$orderBy: GenerationsOrderByInput,
705-
$first: Int,
706-
$last: Int,
707-
$projectId: String,
701+
query GetGenerations(
702+
$after: ID
703+
$before: ID
704+
$cursorAnchor: DateTime
705+
$filters: [generationsInputType!]
706+
$orderBy: GenerationsOrderByInput
707+
$first: Int
708+
$last: Int
709+
$projectId: String
708710
) {
709-
generations(
710-
after: $after,
711-
before: $before,
712-
cursorAnchor: $cursorAnchor,
713-
filters: $filters,
714-
orderBy: $orderBy,
715-
first: $first,
716-
last: $last,
717-
projectId: $projectId,
718-
) {
711+
generations(
712+
after: $after
713+
before: $before
714+
cursorAnchor: $cursorAnchor
715+
filters: $filters
716+
orderBy: $orderBy
717+
first: $first
718+
last: $last
719+
projectId: $projectId
720+
) {
719721
pageInfo {
720-
startCursor
721-
endCursor
722-
hasNextPage
723-
hasPreviousPage
722+
startCursor
723+
endCursor
724+
hasNextPage
725+
hasPreviousPage
724726
}
725727
totalCount
726728
edges {
727-
cursor
728-
node {
729-
id
730-
projectId
731-
prompt
732-
completion
733-
createdAt
734-
provider
735-
model
736-
variables
737-
messages
738-
messageCompletion
739-
tools
740-
settings
741-
stepId
742-
tokenCount
743-
duration
744-
inputTokenCount
745-
outputTokenCount
746-
ttFirstToken
747-
duration
748-
tokenThroughputInSeconds
749-
error
750-
type
751-
tags
752-
step {
753-
threadId
754-
thread {
755-
participant {
756-
identifier
757-
}
758-
}
759-
}
760-
}
761-
}
729+
cursor
730+
node {
731+
id
732+
projectId
733+
prompt
734+
completion
735+
createdAt
736+
provider
737+
model
738+
variables
739+
messages
740+
messageCompletion
741+
tools
742+
settings
743+
tokenCount
744+
duration
745+
inputTokenCount
746+
outputTokenCount
747+
ttFirstToken
748+
tokenThroughputInSeconds
749+
error
750+
type
751+
tags
752+
}
762753
}
754+
}
763755
}`;
764756

765757
const result = await this.makeGqlCall(query, variables);

src/instrumentation/index.ts

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
1-
import { LiteralClient, Step, Thread } from '..';
1+
import { LiteralClient } from '..';
22
import { LiteralCallbackHandler } from './langchain';
33
import { instrumentLlamaIndex, withThread } from './llamaindex';
4-
import instrumentOpenAI, {
5-
InstrumentOpenAIOptions,
6-
OpenAIOutput
7-
} from './openai';
4+
import instrumentOpenAI from './openai';
5+
import { InstrumentOpenAIOptions } from './openai';
86
import { makeInstrumentVercelSDK } from './vercel-sdk';
97

108
export type { InstrumentOpenAIOptions } from './openai';
119

1210
export default (client: LiteralClient) => ({
13-
openai: (
14-
output: OpenAIOutput,
15-
parent?: Step | Thread,
16-
options?: InstrumentOpenAIOptions
17-
) => instrumentOpenAI(client, output, parent, options),
11+
openai: (options?: InstrumentOpenAIOptions) =>
12+
instrumentOpenAI(client, options),
1813
langchain: {
1914
literalCallback: (threadId?: string) => {
2015
try {

src/instrumentation/openai.ts

Lines changed: 85 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,14 @@ import {
1818
Thread
1919
} from '..';
2020

21-
const openaiReqs: Record<
22-
string,
23-
{
24-
// Record the ID of the request
25-
id: string;
26-
// Record the start time of the request
27-
start: number;
28-
// Record the inputs of the request
29-
inputs: Record<string, any>;
30-
// Record the stream of the request if it's a streaming request
31-
stream?: Stream<ChatCompletionChunk | Completion>;
32-
}
33-
> = {};
34-
3521
// Define a generic type for the original function to be wrapped
3622
type OriginalFunction<T extends any[], R> = (...args: T) => Promise<R>;
3723

3824
// Utility function to wrap a method
3925
function wrapFunction<T extends any[], R>(
40-
originalFunction: OriginalFunction<T, R>
26+
originalFunction: OriginalFunction<T, R>,
27+
client: LiteralClient,
28+
options: InstrumentOpenAIOptions = {}
4129
): OriginalFunction<T, R> {
4230
return async function (this: any, ...args: T): Promise<R> {
4331
const start = Date.now();
@@ -46,58 +34,57 @@ function wrapFunction<T extends any[], R>(
4634
const result = await originalFunction.apply(this, args);
4735

4836
if (result instanceof Stream) {
49-
const streamResult = result as Stream<ChatCompletionChunk | Completion>;
50-
// If it is a streaming request, we need to process the first token to get the id
51-
// However we also need to tee the stream so that the end developer can process the stream
52-
const [a, b] = streamResult.tee();
53-
// Re split the stream to store a clean instance for final processing later on
54-
const c = a.tee()[0];
55-
let id;
56-
// Iterate over the stream to find the first chunk and store the id
57-
for await (const chunk of a) {
58-
id = chunk.id;
59-
if (!openaiReqs[id]) {
60-
openaiReqs[id] = {
61-
id,
62-
inputs: args[0],
63-
start,
64-
stream: c
65-
};
66-
break;
67-
}
68-
}
69-
// @ts-expect-error Hacky way to add the id to the stream
70-
b.id = id;
37+
const streamResult = result;
38+
const [returnedResult, processedResult] = streamResult.tee();
39+
40+
await processOpenAIOutput(client, processedResult, {
41+
...options,
42+
start,
43+
inputs: args[0]
44+
});
7145

72-
return b as any;
46+
return returnedResult as R;
7347
} else {
74-
const regularResult = result as ChatCompletion | Completion;
75-
const id = regularResult.id;
76-
openaiReqs[id] = {
77-
id,
78-
inputs: args[0],
79-
start
80-
};
48+
await processOpenAIOutput(client, result as ChatCompletion | Completion, {
49+
...options,
50+
start,
51+
inputs: args[0]
52+
});
53+
8154
return result;
8255
}
8356
};
8457
}
8558

86-
// Patching the chat.completions.create function
87-
const originalChatCompletionsCreate = OpenAI.Chat.Completions.prototype.create;
88-
OpenAI.Chat.Completions.prototype.create = wrapFunction(
89-
originalChatCompletionsCreate
90-
) as any;
91-
92-
// Patching the completions.create function
93-
const originalCompletionsCreate = OpenAI.Completions.prototype.create;
94-
OpenAI.Completions.prototype.create = wrapFunction(
95-
originalCompletionsCreate
96-
) as any;
97-
98-
// Patching the completions.create function
99-
const originalImagesGenerate = OpenAI.Images.prototype.generate;
100-
OpenAI.Images.prototype.generate = wrapFunction(originalImagesGenerate) as any;
59+
function instrumentOpenAI(
60+
client: LiteralClient,
61+
options: InstrumentOpenAIOptions = {}
62+
) {
63+
// Patching the chat.completions.create function
64+
const originalChatCompletionsCreate =
65+
OpenAI.Chat.Completions.prototype.create;
66+
OpenAI.Chat.Completions.prototype.create = wrapFunction(
67+
originalChatCompletionsCreate,
68+
client,
69+
options
70+
) as any;
71+
72+
// Patching the completions.create function
73+
const originalCompletionsCreate = OpenAI.Completions.prototype.create;
74+
OpenAI.Completions.prototype.create = wrapFunction(
75+
originalCompletionsCreate,
76+
client,
77+
options
78+
) as any;
79+
80+
// Patching the images.generate function
81+
const originalImagesGenerate = OpenAI.Images.prototype.generate;
82+
OpenAI.Images.prototype.generate = wrapFunction(
83+
originalImagesGenerate,
84+
client,
85+
options
86+
) as any;
87+
}
10188

10289
function processChatDelta(
10390
newDelta: ChatCompletionChunk.Choice.Delta,
@@ -296,22 +283,49 @@ export interface InstrumentOpenAIOptions {
296283
tags?: Maybe<string[]>;
297284
}
298285

299-
const instrumentOpenAI = async (
286+
export interface ProcessOpenAIOutput extends InstrumentOpenAIOptions {
287+
start: number;
288+
inputs: Record<string, any>;
289+
}
290+
291+
function isStream(obj: any): boolean {
292+
return (
293+
obj !== null &&
294+
typeof obj === 'object' &&
295+
typeof obj.pipe === 'function' &&
296+
typeof obj.on === 'function' &&
297+
typeof obj.read === 'function'
298+
);
299+
}
300+
301+
const processOpenAIOutput = async (
300302
client: LiteralClient,
301303
output: OpenAIOutput,
302-
parent?: Step | Thread,
303-
options: InstrumentOpenAIOptions = {}
304+
{ start, tags, inputs }: ProcessOpenAIOutput
304305
) => {
305-
//@ts-expect-error - This is a hacky way to get the id from the stream
306-
const outputId = output.id;
307-
const { stream, start, inputs } = openaiReqs[outputId];
308306
const baseGeneration = {
309307
provider: 'openai',
310308
model: inputs.model,
311309
settings: getSettings(inputs),
312-
tags: options.tags
310+
tags: tags
313311
};
314312

313+
let threadFromStore: Thread | null = null;
314+
try {
315+
threadFromStore = client.getCurrentThread();
316+
} catch (error) {
317+
// Ignore error thrown if getCurrentThread is called outside of a context
318+
}
319+
320+
let stepFromStore: Step | null = null;
321+
try {
322+
stepFromStore = client.getCurrentStep();
323+
} catch (error) {
324+
// Ignore error thrown if getCurrentStep is called outside of a context
325+
}
326+
327+
const parent = stepFromStore || threadFromStore;
328+
315329
if ('data' in output) {
316330
// Image Generation
317331

@@ -322,14 +336,16 @@ const instrumentOpenAI = async (
322336
output: output,
323337
startTime: new Date(start).toISOString(),
324338
endTime: new Date().toISOString(),
325-
tags: options.tags
339+
tags: tags
326340
};
327341

328342
const step = parent
329343
? parent.step(stepData)
330344
: client.step({ ...stepData, type: 'run' });
331345
await step.send();
332-
} else if (output instanceof Stream) {
346+
} else if (output instanceof Stream || isStream(output)) {
347+
const stream = output as Stream<ChatCompletionChunk | Completion>;
348+
333349
if (!stream) {
334350
throw new Error('Stream not found');
335351
}
@@ -460,8 +476,6 @@ const instrumentOpenAI = async (
460476
}
461477
}
462478
}
463-
464-
delete openaiReqs[outputId];
465479
};
466480

467481
export default instrumentOpenAI;

0 commit comments

Comments
 (0)