Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stale-ducks-hide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

Improve composable-cache performance by avoiding unnecessary copies
68 changes: 36 additions & 32 deletions packages/open-next/src/adapters/composable-cache.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
import type { CacheValue } from "types/overrides";
import { writeTags } from "utils/cache";
import { fromReadableStream, toReadableStream } from "utils/stream";
import { debug } from "./logger";

const pendingWritePromiseMap = new Map<
string,
Promise<CacheValue<"composable">>
>();
const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();

export default {
async get(cacheKey: string) {
try {
// We first check if we have a pending write for this cache key
// If we do, we return the pending promise instead of fetching the cache
if (pendingWritePromiseMap.has(cacheKey)) {
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) {
return stored.then((entry) => ({
...entry,
value: toReadableStream(entry.value),
}));
}
}
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) return stored;

const result = await globalThis.incrementalCache.get(
cacheKey,
"composable",
Expand Down Expand Up @@ -69,28 +59,45 @@ export default {
},

async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
const promiseEntry = pendingEntry.then(async (entry) => ({
...entry,
value: await fromReadableStream(entry.value),
}));
pendingWritePromiseMap.set(cacheKey, promiseEntry);
const teedPromise = pendingEntry.then((entry) => {
// Optimization: We avoid consuming and stringifying the stream here,
// because it creates double copies just to be discarded when this function
// ends. This avoids unnecessary memory usage, and reduces GC pressure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this cause an issue if we read this more than once in the get ? We then have a single ReadableStream that multiple reader would try to access.
We could teed on get as well, but it means we'd end up with a copy in memory and multiple ReadableStream, not sure if it's worth it or not ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, the goal is to avoid having unnecessary memory allocations. Both optimizations have quirks, but right now, concurrent calls get stalled and have more impact due to this unnecessary copy. i'm open to suggestions.

Copy link
Contributor

@jasnell jasnell Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might take a bit of experimentation to get exactly right. I'm also not convinced that this new bit is correct either. Another potential approach you could take would be to use a Blob to buffer the data and use it's .stream() method to get individual ReadableStream instances that read from the single in memory buffer. That way you're not relying on, or victim to, ReadableStream tees terrible default backpressure management that ends up creating multiple buffers in memory. But it's going to take a bit of careful thought to get right.

Regardless, I think this is going to require some careful consideration to get correct. @anonrig is absolutely correct that the current code is needlessly bottlenecking with too many additional copies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken another shot and stored a blob on the composable cache rather than text or stream. So that, any caller can create a readable stream from the blob with almost no cost.

const [stream1, stream2] = entry.value.tee();
return [
{ ...entry, value: stream1 },
{ ...entry, value: stream2 },
] as const;
});

pendingWritePromiseMap.set(
cacheKey,
teedPromise.then(([entry]) => entry),
);

const entry = await promiseEntry.finally(() => {
const [, entryForStorage] = await teedPromise.finally(() => {
pendingWritePromiseMap.delete(cacheKey);
});

await globalThis.incrementalCache.set(
cacheKey,
{
...entry,
value: entry.value,
...entryForStorage,
value: await fromReadableStream(entryForStorage.value),
},
"composable",
);

if (globalThis.tagCache.mode === "original") {
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag));
const tagsToWrite = [];
for (const tag of entryForStorage.tags) {
if (!storedTags.includes(tag)) {
tagsToWrite.push({ tag, path: cacheKey });
}
}
if (tagsToWrite.length > 0) {
await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey })));
await writeTags(tagsToWrite);
}
}
},
Expand Down Expand Up @@ -125,17 +132,14 @@ export default {
}));
}),
);
// We need to deduplicate paths, we use a set for that
const setToWrite = new Set<{ path: string; tag: string }>();

const dedupeMap = new Map();
for (const entry of pathsToUpdate.flat()) {
setToWrite.add(entry);
dedupeMap.set(`${entry.path}|${entry.tag}`, entry);
}
await writeTags(Array.from(setToWrite));
await writeTags(Array.from(dedupeMap.values()));
},

// This one is necessary for older versions of next
async receiveExpiredTags(...tags: string[]) {
// This function does absolutely nothing
return;
},
async receiveExpiredTags() {},
} satisfies ComposableCacheHandler;
10 changes: 3 additions & 7 deletions packages/open-next/src/utils/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ export async function fromReadableStream(
return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8");
}

// Pre-allocate buffer with exact size to avoid reallocation
const buffer = Buffer.alloc(totalLength);
let offset = 0;
for (const chunk of chunks) {
buffer.set(chunk, offset);
offset += chunk.length;
}
// Use Buffer.concat which is more efficient than manual allocation and copy
// It handles the allocation and copy in optimized native code
const buffer = Buffer.concat(chunks, totalLength);

return buffer.toString(base64 ? "base64" : "utf8");
}
Expand Down
Loading