Skip to content

Commit

Permalink
Convert node stream to web stream
Browse files Browse the repository at this point in the history
  • Loading branch information
vicb committed Sep 18, 2024
1 parent 1250d34 commit 2b81037
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 48 deletions.
8 changes: 7 additions & 1 deletion builder/src/build/build-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,18 @@ globalThis.fetch = (input, init) => {
if (init) delete init.cache;
return curFetch(input, init);
};
import { Readable } from 'node:stream';
globalThis.fetch.__nextPatched = isPatchedAlready;
fetch = globalThis.fetch;
const CustomRequest = class extends globalThis.Request {
constructor(input, init) {
console.log("CustomRequest", input);
if (init) delete init.cache;
if (init) {
delete init.cache;
if (init.body.__node_stream__) {
init.body = Readable.toWeb(init.body);
}
}
super(input, init);
}
};
Expand Down
138 changes: 92 additions & 46 deletions builder/src/templates/worker.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { Readable } from "node:stream";

import Stream from "node:stream";
import type { NextConfig } from "next";
import { NodeNextRequest, NodeNextResponse } from "next/dist/server/base-http/node";
import { createRequestResponseMocks } from "next/dist/server/lib/mock-request";
import { MockedResponse } from "next/dist/server/lib/mock-request";
import NextNodeServer, { NodeRequestHandler } from "next/dist/server/next-server";
import type { IncomingMessage } from "node:http";

const NON_BODY_RESPONSES = new Set([101, 204, 205, 304]);

/**
* Injected at build time
* (we practically follow what Next.js does here:
https://github.com/vercel/next.js/blob/68a7128/packages/next/src/build/utils.ts#L2137-L2139)
*/
// Injected at build time
const nextConfig: NextConfig = JSON.parse(process.env.__NEXT_PRIVATE_STANDALONE_CONFIG ?? "{}");

let requestHandler: NodeRequestHandler | null = null;
Expand All @@ -23,6 +21,7 @@ export default {
customServer: false,
dev: false,
dir: "",
minimalMode: false,
}).getRequestHandler();
}

Expand All @@ -32,50 +31,97 @@ export default {
let imageUrl =
url.searchParams.get("url") ?? "https://developers.cloudflare.com/_astro/logo.BU9hiExz.svg";
if (imageUrl.startsWith("/")) {
imageUrl = new URL(imageUrl, request.url).href;
return env.ASSETS.fetch(new URL(imageUrl, request.url));
}
return fetch(imageUrl, { cf: { cacheEverything: true } } as any);
}

const resBody = new TransformStream();
const writer = resBody.writable.getWriter();

const reqBodyNodeStream = request.body ? Readable.fromWeb(request.body as any) : undefined;

const { req, res } = createRequestResponseMocks({
method: request.method,
url: url.href.slice(url.origin.length),
headers: Object.fromEntries([...request.headers]),
bodyReadable: reqBodyNodeStream,
resWriter: (chunk) => {
writer.write(chunk).catch(console.error);
return true;
},
});

let headPromiseResolve: any = null;
const headPromise = new Promise<void>((resolve) => {
headPromiseResolve = resolve;
});
res.flushHeaders = () => headPromiseResolve?.();

if (reqBodyNodeStream != null) {
const origPush = reqBodyNodeStream.push;
reqBodyNodeStream.push = (chunk: any) => {
req.push(chunk);
return origPush.call(reqBodyNodeStream, chunk);
};
}

ctx.waitUntil((res as any).hasStreamed.then(() => writer.close()));
const { req, res, webResponse } = getWrappedStreams(request, ctx);

ctx.waitUntil(requestHandler(new NodeNextRequest(req), new NodeNextResponse(res)));

await Promise.race([res.headPromise, headPromise]);

return new Response(resBody.readable, {
status: res.statusCode,
headers: (res as any).headers,
});
return await webResponse();
},
};

function getWrappedStreams(request: Request, ctx: any) {
const url = new URL(request.url);

const req = (
request.body ? Stream.Readable.fromWeb(request.body as any) : Stream.Readable.from([])
) as IncomingMessage;
req.httpVersion = "1.0";
req.httpVersionMajor = 1;
req.httpVersionMinor = 0;
req.url = url.href.slice(url.origin.length);
req.headers = Object.fromEntries([...request.headers]);
req.method = request.method;
Object.defineProperty(req, "__node_stream__", {
value: true,
writable: false,
});
Object.defineProperty(req, "headersDistinct", {
get() {
const headers: Record<string, string[]> = {};
for (const [key, value] of Object.entries(req.headers)) {
if (!value) continue;
headers[key] = Array.isArray(value) ? value : [value];
}
return headers;
},
});

const { readable, writable } = new IdentityTransformStream();
const resBodyWriter = writable.getWriter();

const res = new MockedResponse({
resWriter: (chunk) => {
resBodyWriter.write(typeof chunk === "string" ? Buffer.from(chunk) : chunk).catch((err: any) => {
if (
err.message.includes("WritableStream has been closed") ||
err.message.includes("Network connection lost")
) {
// safe to ignore
return;
}
console.error("Error in resBodyWriter.write");
console.error(err);
});
return true;
},
});

// It's implemented as a no-op, but really it should mark the headers as done
res.flushHeaders = () => (res as any).headPromiseResolve();

// Only allow statusCode to be modified if not sent
let { statusCode } = res;
Object.defineProperty(res, "statusCode", {
get: function () {
return statusCode;
},
set: function (val) {
if (this.finished || this.headersSent) {
return;
}
statusCode = val;
},
});

// Make sure the writer is eventually closed
ctx.waitUntil((res as any).hasStreamed.finally(() => resBodyWriter.close().catch(() => {})));

return {
res,
req,
webResponse: async () => {
await res.headPromise;
// TODO: remove this once streaming with compression is working nicely
res.setHeader("content-encoding", "identity");
return new Response(NON_BODY_RESPONSES.has(res.statusCode) ? null : readable, {
status: res.statusCode,
headers: (res as any).headers,
});
},
};
}
3 changes: 2 additions & 1 deletion examples/api/app/api/hello/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ export async function GET() {
}

export async function POST(request: Request) {
return new Response(`Hello post-World! body=${await request.text()}`);
const text = await request.text();
return new Response(`Hello post-World! body=${text}`);
}

0 comments on commit 2b81037

Please sign in to comment.