Skip to content

Commit

Permalink
Improve the way we stream from tasks to the server
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam committed Dec 11, 2024
1 parent 0dbc164 commit d4f533f
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 44 deletions.
13 changes: 13 additions & 0 deletions apps/webapp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ if (process.env.HTTP_SERVER_DISABLED !== "true") {
});

if (process.env.DASHBOARD_AND_API_DISABLED !== "true") {
if (process.env.ALLOW_ONLY_REALTIME_API === "true") {
// Block all requests that do not start with /realtime
app.use((req, res, next) => {
// Make sure /healthcheck is still accessible
if (!req.url.startsWith("/realtime") && req.url !== "/healthcheck") {
res.status(404).send("Not Found");
return;
}

next();
});
}

app.use(apiRateLimiter);

app.all(
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,14 @@ export class StandardMetadataManager implements RunMetadataManager {
// Add the key to the special stream metadata object
this.appendKey(`$$streams`, key);
this.setKey("$$streamsVersion", this.streamsVersion);
this.setKey("$$streamsBaseUrl", this.streamsBaseUrl);

await this.flush();

const streamInstance = new MetadataStream({
key,
runId: this.runId,
iterator: $value[Symbol.asyncIterator](),
source: $value,
baseUrl: this.streamsBaseUrl,
headers: this.apiClient.getHeaders(),
signal,
Expand Down
76 changes: 36 additions & 40 deletions packages/core/src/v3/runMetadata/metadataStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,48 @@ export type MetadataOptions<T> = {
baseUrl: string;
runId: string;
key: string;
iterator: AsyncIterator<T>;
source: AsyncIterable<T>;
headers?: Record<string, string>;
signal?: AbortSignal;
version?: "v1" | "v2";
};

export class MetadataStream<T> {
private controller = new AbortController();
private serverQueue: Array<Promise<IteratorResult<T>>> = [];
private consumerQueue: Array<Promise<IteratorResult<T>>> = [];
private serverIterator: AsyncIterator<T>;
private consumerIterator: AsyncIterator<T>;
private serverStream: ReadableStream<T>;
private consumerStream: ReadableStream<T>;
private streamPromise: Promise<void | Response>;

constructor(private options: MetadataOptions<T>) {
const { serverIterator, consumerIterator } = this.createTeeIterators();
this.serverIterator = serverIterator;
this.consumerIterator = consumerIterator;
const [serverStream, consumerStream] = this.createTeeStreams();
this.serverStream = serverStream;
this.consumerStream = consumerStream;

this.streamPromise = this.initializeServerStream();
}

private createTeeIterators() {
const teeIterator = (queue: Array<Promise<IteratorResult<T>>>): AsyncIterator<T> => ({
next: () => {
if (queue.length === 0) {
const result = this.options.iterator.next();
this.serverQueue.push(result);
this.consumerQueue.push(result);
private createTeeStreams() {
const readableSource = new ReadableStream<T>({
start: async (controller) => {
for await (const value of this.options.source) {
controller.enqueue(value);
}
return queue.shift()!;

controller.close();
},
});

return {
serverIterator: teeIterator(this.serverQueue),
consumerIterator: teeIterator(this.consumerQueue),
};
return readableSource.tee();
}

private initializeServerStream(): Promise<void | Response> {
const serverIterator = this.serverIterator;

const serverStream = new ReadableStream({
async pull(controller) {
try {
const { value, done } = await serverIterator.next();
if (done) {
controller.close();
return;
}

controller.enqueue(JSON.stringify(value) + "\n");
} catch (err) {
controller.error(err);
}
},
cancel: () => this.controller.abort(),
});
private initializeServerStream(): Promise<Response> {
const serverStream = this.serverStream.pipeThrough(
new TransformStream<T, string>({
async transform(chunk, controller) {
controller.enqueue(JSON.stringify(chunk) + "\n");
},
})
);

return fetch(
`${this.options.baseUrl}/realtime/${this.options.version ?? "v1"}/streams/${
Expand All @@ -82,6 +65,19 @@ export class MetadataStream<T> {
}

public [Symbol.asyncIterator]() {
return this.consumerIterator;
return streamToAsyncIterator(this.consumerStream);
}
}

async function* streamToAsyncIterator<T>(stream: ReadableStream<T>): AsyncIterableIterator<T> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
}
3 changes: 2 additions & 1 deletion references/nextjs-realtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"build": "next build",
"start": "next start",
"lint": "next lint",
"dev:trigger": "trigger dev"
"dev:trigger": "trigger dev",
"deploy": "trigger deploy"
},
"dependencies": {
"@ai-sdk/openai": "^1.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export default function TriggerButton({ accessToken }: { accessToken: string })
>("openai-streaming", {
accessToken,
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
experimental_throttleInMs: 100,
});

const openWeatherReport = useCallback(() => {
Expand Down
1 change: 0 additions & 1 deletion references/nextjs-realtime/src/trigger/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ export const openaiStreaming = schemaTask({
});

const stream = await metadata.stream("openai", result.fullStream);
await metadata.stream("openaiText", result.textStream);
},
});

Expand Down

0 comments on commit d4f533f

Please sign in to comment.