Skip to content

Commit a59bbb7

Browse files
Add Concurrency Tests (#1890)
Add concurrency tests to ensure there are no memory leakage or deadlocks at react on rails and react on rails pro packages
1 parent ffa5dd3 commit a59bbb7

File tree

15 files changed

+856
-61
lines changed

15 files changed

+856
-61
lines changed

knip.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ const config: KnipConfig = {
7575
'tests/emptyForTesting.js',
7676
// Jest setup and test utilities - not detected by Jest plugin in workspace setup
7777
'tests/jest.setup.js',
78+
'tests/utils/removeRSCStackFromAllChunks.ts',
7879
// Build output directories that should be ignored
7980
'lib/**',
8081
// Pro features exported for external consumption

packages/react-on-rails-pro/package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,9 @@
7575
"bugs": {
7676
"url": "https://github.com/shakacode/react_on_rails/issues"
7777
},
78-
"homepage": "https://github.com/shakacode/react_on_rails#readme"
78+
"homepage": "https://github.com/shakacode/react_on_rails#readme",
79+
"devDependencies": {
80+
"@types/mock-fs": "^4.13.4",
81+
"mock-fs": "^5.5.0"
82+
}
7983
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import * as EventEmitter from 'node:events';
2+
3+
class AsyncQueue<T> {
4+
private eventEmitter = new EventEmitter();
5+
6+
private buffer: T[] = [];
7+
8+
private isEnded = false;
9+
10+
enqueue(value: T) {
11+
if (this.isEnded) {
12+
throw new Error('Queue Ended');
13+
}
14+
15+
if (this.eventEmitter.listenerCount('data') > 0) {
16+
this.eventEmitter.emit('data', value);
17+
} else {
18+
this.buffer.push(value);
19+
}
20+
}
21+
22+
end() {
23+
this.isEnded = true;
24+
this.eventEmitter.emit('end');
25+
}
26+
27+
dequeue() {
28+
return new Promise<T>((resolve, reject) => {
29+
const bufferValueIfExist = this.buffer.shift();
30+
if (bufferValueIfExist) {
31+
resolve(bufferValueIfExist);
32+
} else if (this.isEnded) {
33+
reject(new Error('Queue Ended'));
34+
} else {
35+
let teardown = () => {};
36+
const onData = (value: T) => {
37+
resolve(value);
38+
teardown();
39+
};
40+
41+
const onEnd = () => {
42+
reject(new Error('Queue Ended'));
43+
teardown();
44+
};
45+
46+
this.eventEmitter.on('data', onData);
47+
this.eventEmitter.on('end', onEnd);
48+
teardown = () => {
49+
this.eventEmitter.off('data', onData);
50+
this.eventEmitter.off('end', onEnd);
51+
};
52+
}
53+
});
54+
}
55+
}
56+
57+
export default AsyncQueue;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { PassThrough, Readable } from 'node:stream';
2+
import AsyncQueue from './AsyncQueue.ts';
3+
4+
class StreamReader {
5+
private asyncQueue: AsyncQueue<string>;
6+
7+
constructor(pipeableStream: Pick<Readable, 'pipe'>) {
8+
this.asyncQueue = new AsyncQueue();
9+
const decoder = new TextDecoder();
10+
11+
const readableStream = new PassThrough();
12+
pipeableStream.pipe(readableStream);
13+
14+
readableStream.on('data', (chunk: Buffer) => {
15+
const decodedChunk = decoder.decode(chunk);
16+
this.asyncQueue.enqueue(decodedChunk);
17+
});
18+
19+
if (readableStream.closed) {
20+
this.asyncQueue.end();
21+
} else {
22+
readableStream.on('end', () => {
23+
this.asyncQueue.end();
24+
});
25+
}
26+
}
27+
28+
nextChunk() {
29+
return this.asyncQueue.dequeue();
30+
}
31+
}
32+
33+
export default StreamReader;
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/**
2+
* @jest-environment node
3+
*/
4+
/// <reference types="react/experimental" />
5+
6+
import * as React from 'react';
7+
import { Suspense, PropsWithChildren } from 'react';
8+
9+
import * as path from 'path';
10+
import * as mock from 'mock-fs';
11+
12+
import ReactOnRails, { RailsContextWithServerStreamingCapabilities } from '../src/ReactOnRailsRSC.ts';
13+
import AsyncQueue from './AsyncQueue.ts';
14+
import StreamReader from './StreamReader.ts';
15+
import removeRSCChunkStack from './utils/removeRSCChunkStack.ts';
16+
17+
const manifestFileDirectory = path.resolve(__dirname, '../src');
18+
const clientManifestPath = path.join(manifestFileDirectory, 'react-client-manifest.json');
19+
20+
beforeEach(() => {
21+
mock({
22+
[clientManifestPath]: JSON.stringify({
23+
filePathToModuleMetadata: {},
24+
moduleLoading: { prefix: '', crossOrigin: null },
25+
}),
26+
});
27+
});
28+
29+
afterEach(() => mock.restore());
30+
31+
const AsyncQueueItem = async ({
32+
asyncQueue,
33+
children,
34+
}: PropsWithChildren<{ asyncQueue: AsyncQueue<string> }>) => {
35+
const value = await asyncQueue.dequeue();
36+
37+
return (
38+
<>
39+
<p>Data: {value}</p>
40+
{children}
41+
</>
42+
);
43+
};
44+
45+
const AsyncQueueContainer = ({ asyncQueue }: { asyncQueue: AsyncQueue<string> }) => {
46+
return (
47+
<div>
48+
<h1>Async Queue</h1>
49+
<Suspense fallback={<p>Loading Item1</p>}>
50+
<AsyncQueueItem asyncQueue={asyncQueue}>
51+
<Suspense fallback={<p>Loading Item2</p>}>
52+
<AsyncQueueItem asyncQueue={asyncQueue}>
53+
<Suspense fallback={<p>Loading Item3</p>}>
54+
<AsyncQueueItem asyncQueue={asyncQueue} />
55+
</Suspense>
56+
</AsyncQueueItem>
57+
</Suspense>
58+
</AsyncQueueItem>
59+
</Suspense>
60+
</div>
61+
);
62+
};
63+
64+
ReactOnRails.register({ AsyncQueueContainer });
65+
66+
const renderComponent = (props: Record<string, unknown>) => {
67+
return ReactOnRails.serverRenderRSCReactComponent({
68+
railsContext: {
69+
reactClientManifestFileName: 'react-client-manifest.json',
70+
reactServerClientManifestFileName: 'react-server-client-manifest.json',
71+
} as unknown as RailsContextWithServerStreamingCapabilities,
72+
name: 'AsyncQueueContainer',
73+
renderingReturnsPromises: true,
74+
throwJsErrors: true,
75+
domNodeId: 'dom-id',
76+
props,
77+
});
78+
};
79+
80+
const createParallelRenders = (size: number) => {
81+
const asyncQueues = new Array(size).fill(null).map(() => new AsyncQueue<string>());
82+
const streams = asyncQueues.map((asyncQueue) => {
83+
return renderComponent({ asyncQueue });
84+
});
85+
const readers = streams.map((stream) => new StreamReader(stream));
86+
87+
const enqueue = (value: string) => asyncQueues.forEach((asyncQueue) => asyncQueue.enqueue(value));
88+
89+
const expectNextChunk = (nextChunk: string) =>
90+
Promise.all(
91+
readers.map(async (reader) => {
92+
const chunk = await reader.nextChunk();
93+
expect(removeRSCChunkStack(chunk)).toEqual(removeRSCChunkStack(nextChunk));
94+
}),
95+
);
96+
97+
const expectEndOfStream = () =>
98+
Promise.all(readers.map((reader) => expect(reader.nextChunk()).rejects.toThrow(/Queue Ended/)));
99+
100+
return { enqueue, expectNextChunk, expectEndOfStream };
101+
};
102+
103+
test('Renders concurrent rsc streams as single rsc stream', async () => {
104+
expect.assertions(258);
105+
const asyncQueue = new AsyncQueue<string>();
106+
const stream = renderComponent({ asyncQueue });
107+
const reader = new StreamReader(stream);
108+
109+
const chunks: string[] = [];
110+
let chunk = await reader.nextChunk();
111+
chunks.push(chunk);
112+
expect(chunk).toContain('Async Queue');
113+
expect(chunk).toContain('Loading Item2');
114+
expect(chunk).not.toContain('Random Value');
115+
116+
asyncQueue.enqueue('Random Value1');
117+
chunk = await reader.nextChunk();
118+
chunks.push(chunk);
119+
expect(chunk).toContain('Random Value1');
120+
121+
asyncQueue.enqueue('Random Value2');
122+
chunk = await reader.nextChunk();
123+
chunks.push(chunk);
124+
expect(chunk).toContain('Random Value2');
125+
126+
asyncQueue.enqueue('Random Value3');
127+
chunk = await reader.nextChunk();
128+
chunks.push(chunk);
129+
expect(chunk).toContain('Random Value3');
130+
131+
await expect(reader.nextChunk()).rejects.toThrow(/Queue Ended/);
132+
133+
const { enqueue, expectNextChunk, expectEndOfStream } = createParallelRenders(50);
134+
135+
expect(chunks).toHaveLength(4);
136+
await expectNextChunk(chunks[0]);
137+
enqueue('Random Value1');
138+
await expectNextChunk(chunks[1]);
139+
enqueue('Random Value2');
140+
await expectNextChunk(chunks[2]);
141+
enqueue('Random Value3');
142+
await expectNextChunk(chunks[3]);
143+
await expectEndOfStream();
144+
});

0 commit comments

Comments
 (0)