Skip to content

Commit c1352a5

Browse files
tegefaulkesCMCDragonkai
authored andcommitted
feat: maxWritableStreamBytes config option for writable stream in QUICStream
* Fixes #5 [ci skip]
1 parent f3e5f7e commit c1352a5

File tree

5 files changed

+59
-34
lines changed

5 files changed

+59
-34
lines changed

src/QUICClient.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class QUICClient extends EventTarget {
5656
reasonToCode,
5757
codeToReason,
5858
maxReadableStreamBytes,
59+
maxWritableStreamBytes,
5960
logger = new Logger(`${this.name}`),
6061
config = {},
6162
}: {
@@ -72,6 +73,7 @@ class QUICClient extends EventTarget {
7273
reasonToCode?: StreamReasonToCode;
7374
codeToReason?: StreamCodeToReason;
7475
maxReadableStreamBytes?: number;
76+
maxWritableStreamBytes?: number;
7577
logger?: Logger;
7678
config?: Partial<QUICConfig>;
7779
}) {
@@ -156,6 +158,7 @@ class QUICClient extends EventTarget {
156158
reasonToCode,
157159
codeToReason,
158160
maxReadableStreamBytes,
161+
maxWritableStreamBytes,
159162
logger: logger.getChild(
160163
`${QUICConnection.name} ${scid.toString().slice(32)}`,
161164
),

src/QUICConnection.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class QUICConnection extends EventTarget {
4444
protected reasonToCode: StreamReasonToCode;
4545
protected codeToReason: StreamCodeToReason;
4646
protected maxReadableStreamBytes: number | undefined;
47+
protected maxWritableStreamBytes: number | undefined;
4748
protected destroyingMap: Map<StreamId, QUICStream> = new Map();
4849

4950
// This basically allows one to await this promise
@@ -104,6 +105,7 @@ class QUICConnection extends EventTarget {
104105
codeToReason = (type, code) =>
105106
new Error(`${type.toString()} ${code.toString()}`),
106107
maxReadableStreamBytes,
108+
maxWritableStreamBytes,
107109
logger = new Logger(`${this.name} ${scid}`),
108110
}: {
109111
scid: QUICConnectionId;
@@ -113,6 +115,7 @@ class QUICConnection extends EventTarget {
113115
reasonToCode?: StreamReasonToCode;
114116
codeToReason?: StreamCodeToReason;
115117
maxReadableStreamBytes?: number;
118+
maxWritableStreamBytes?: number;
116119
logger?: Logger;
117120
}) {
118121
logger.info(`Connect ${this.name}`);
@@ -143,6 +146,7 @@ class QUICConnection extends EventTarget {
143146
reasonToCode,
144147
codeToReason,
145148
maxReadableStreamBytes,
149+
maxWritableStreamBytes,
146150
logger,
147151
});
148152
socket.connectionMap.set(connection.connectionId, connection);
@@ -163,6 +167,7 @@ class QUICConnection extends EventTarget {
163167
codeToReason = (type, code) =>
164168
new Error(`${type.toString()} ${code.toString()}`),
165169
maxReadableStreamBytes,
170+
maxWritableStreamBytes,
166171
logger = new Logger(`${this.name} ${scid}`),
167172
}: {
168173
scid: QUICConnectionId;
@@ -173,6 +178,7 @@ class QUICConnection extends EventTarget {
173178
reasonToCode?: StreamReasonToCode;
174179
codeToReason?: StreamCodeToReason;
175180
maxReadableStreamBytes?: number;
181+
maxWritableStreamBytes?: number;
176182
logger?: Logger;
177183
}): Promise<QUICConnection> {
178184
logger.info(`Accept ${this.name}`);
@@ -203,6 +209,7 @@ class QUICConnection extends EventTarget {
203209
reasonToCode,
204210
codeToReason,
205211
maxReadableStreamBytes,
212+
maxWritableStreamBytes,
206213
logger,
207214
});
208215
socket.connectionMap.set(connection.connectionId, connection);
@@ -219,6 +226,7 @@ class QUICConnection extends EventTarget {
219226
reasonToCode,
220227
codeToReason,
221228
maxReadableStreamBytes,
229+
maxWritableStreamBytes,
222230
logger,
223231
}: {
224232
type: 'client' | 'server';
@@ -229,6 +237,7 @@ class QUICConnection extends EventTarget {
229237
reasonToCode: StreamReasonToCode;
230238
codeToReason: StreamCodeToReason;
231239
maxReadableStreamBytes: number | undefined;
240+
maxWritableStreamBytes: number | undefined;
232241
logger: Logger;
233242
}) {
234243
super();
@@ -243,6 +252,7 @@ class QUICConnection extends EventTarget {
243252
this.reasonToCode = reasonToCode;
244253
this.codeToReason = codeToReason;
245254
this.maxReadableStreamBytes = maxReadableStreamBytes;
255+
this.maxWritableStreamBytes = maxWritableStreamBytes;
246256
// Sets the timeout on the first
247257
this.checkTimeout();
248258

@@ -445,6 +455,7 @@ class QUICConnection extends EventTarget {
445455
codeToReason: this.codeToReason,
446456
reasonToCode: this.reasonToCode,
447457
maxReadableStreamBytes: this.maxReadableStreamBytes,
458+
maxWritableStreamBytes: this.maxWritableStreamBytes,
448459
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
449460
});
450461
this.dispatchEvent(
@@ -664,6 +675,7 @@ class QUICConnection extends EventTarget {
664675
reasonToCode: this.reasonToCode,
665676
destroyingMap: this.destroyingMap,
666677
maxReadableStreamBytes: this.maxReadableStreamBytes,
678+
maxWritableStreamBytes: this.maxWritableStreamBytes,
667679
logger: this.logger.getChild(`${QUICStream.name} ${streamId!}`),
668680
});
669681
// Ok the stream is opened and working

src/QUICServer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class QUICServer extends EventTarget {
3939
protected reasonToCode: StreamReasonToCode | undefined;
4040
protected codeToReason: StreamCodeToReason | undefined;
4141
protected maxReadableStreamBytes?: number | undefined;
42+
protected maxWritableStreamBytes?: number | undefined;
4243
protected connectionMap: QUICConnectionMap;
4344

4445
/**
@@ -64,6 +65,7 @@ class QUICServer extends EventTarget {
6465
reasonToCode,
6566
codeToReason,
6667
maxReadableStreamBytes,
68+
maxWritableStreamBytes,
6769
logger,
6870
}: {
6971
crypto: {
@@ -79,6 +81,7 @@ class QUICServer extends EventTarget {
7981
reasonToCode?: StreamReasonToCode;
8082
codeToReason?: StreamCodeToReason;
8183
maxReadableStreamBytes?: number;
84+
maxWritableStreamBytes?: number;
8285
logger?: Logger;
8386
}) {
8487
super();
@@ -107,6 +110,7 @@ class QUICServer extends EventTarget {
107110
this.reasonToCode = reasonToCode;
108111
this.codeToReason = codeToReason;
109112
this.maxReadableStreamBytes = maxReadableStreamBytes;
113+
this.maxWritableStreamBytes = maxWritableStreamBytes;
110114
}
111115

112116
@ready(new errors.ErrorQUICServerNotRunning())
@@ -281,6 +285,7 @@ class QUICServer extends EventTarget {
281285
reasonToCode: this.reasonToCode,
282286
codeToReason: this.codeToReason,
283287
maxReadableStreamBytes: this.maxReadableStreamBytes,
288+
maxWritableStreamBytes: this.maxWritableStreamBytes,
284289
logger: this.logger.getChild(
285290
`${QUICConnection.name} ${scid.toString().slice(32)}`,
286291
),

src/QUICStream.ts

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ class QUICStream
6969
reasonToCode = () => 0,
7070
codeToReason = (type, code) =>
7171
new Error(`${type.toString()} ${code.toString()}`),
72-
maxReadableStreamBytes = 1_000_000, // About 1KB
72+
maxReadableStreamBytes = 100_000, // About 100KB
73+
maxWritableStreamBytes = 100_000, // About 100KB
7374
logger = new Logger(`${this.name} ${streamId}`),
7475
}: {
7576
streamId: StreamId;
@@ -78,6 +79,7 @@ class QUICStream
7879
reasonToCode?: StreamReasonToCode;
7980
codeToReason?: StreamCodeToReason;
8081
maxReadableStreamBytes?: number;
82+
maxWritableStreamBytes?: number;
8183
logger?: Logger;
8284
}): Promise<QUICStream> {
8385
logger.info(`Create ${this.name}`);
@@ -95,6 +97,7 @@ class QUICStream
9597
codeToReason,
9698
destroyingMap,
9799
maxReadableStreamBytes,
100+
maxWritableStreamBytes,
98101
logger,
99102
});
100103
connection.streamMap.set(stream.streamId, stream);
@@ -109,6 +112,7 @@ class QUICStream
109112
codeToReason,
110113
destroyingMap,
111114
maxReadableStreamBytes,
115+
maxWritableStreamBytes,
112116
logger,
113117
}: {
114118
streamId: StreamId;
@@ -117,6 +121,7 @@ class QUICStream
117121
codeToReason: StreamCodeToReason;
118122
destroyingMap: Map<StreamId, QUICStream>;
119123
maxReadableStreamBytes: number;
124+
maxWritableStreamBytes: number;
120125
logger: Logger;
121126
}) {
122127
super();
@@ -146,40 +151,44 @@ class QUICStream
146151
},
147152
{
148153
highWaterMark: maxReadableStreamBytes,
149-
size: (chunk) => chunk?.byteLength ?? 0,
150154
},
151155
);
152156

153-
this.writable = new WritableStream({
154-
start: (controller) => {
155-
this.writableController = controller;
156-
},
157-
write: async (chunk: Uint8Array) => {
158-
await this.streamSend(chunk);
159-
},
160-
close: async () => {
161-
// This gracefully closes, by sending a message at the end
162-
// If there wasn't an error, we will send an empty frame
163-
// with the `fin` set to true
164-
// If this itself results in an error, we can continue
165-
// But continue to do the below
166-
this.logger.debug('sending fin frame');
167-
await this.streamSend(new Uint8Array(0), true).catch((e) => {
168-
// Ignore send error if stream is already closed
169-
if (e.message !== 'send') throw e;
170-
});
171-
await this.closeSend();
157+
this.writable = new WritableStream(
158+
{
159+
start: (controller) => {
160+
this.writableController = controller;
161+
},
162+
write: async (chunk: Uint8Array) => {
163+
await this.streamSend(chunk);
164+
},
165+
close: async () => {
166+
// This gracefully closes, by sending a message at the end
167+
// If there wasn't an error, we will send an empty frame
168+
// with the `fin` set to true
169+
// If this itself results in an error, we can continue
170+
// But continue to do the below
171+
this.logger.debug('sending fin frame');
172+
await this.streamSend(new Uint8Array(0), true).catch((e) => {
173+
// Ignore send error if stream is already closed
174+
if (e.message !== 'send') throw e;
175+
});
176+
await this.closeSend();
177+
},
178+
abort: async (reason?: any) => {
179+
// Abort can be called even if there are writes are queued up
180+
// The chunks are meant to be thrown away
181+
// We could tell it to shutdown
182+
// This sends a `RESET_STREAM` frame, this abruptly terminates the sending part of a stream
183+
// The receiver can discard any data it already received on that stream
184+
// We don't have "unidirectional" streams so that's not important...
185+
await this.closeSend(true, reason);
186+
},
172187
},
173-
abort: async (reason?: any) => {
174-
// Abort can be called even if there are writes are queued up
175-
// The chunks are meant to be thrown away
176-
// We could tell it to shutdown
177-
// This sends a `RESET_STREAM` frame, this abruptly terminates the sending part of a stream
178-
// The receiver can discard any data it already received on that stream
179-
// We don't have "unidirectional" streams so that's not important...
180-
await this.closeSend(true, reason);
188+
{
189+
highWaterMark: maxWritableStreamBytes,
181190
},
182-
});
191+
);
183192
}
184193

185194
public get sendClosed(): boolean {

tests/QUICStream.test.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ describe(QUICStream.name, () => {
4848
config: {
4949
tlsConfig: tlsConfig.tlsConfig,
5050
verifyPeer: false,
51-
logKeys: './tmp/key.log',
5251
},
5352
});
5453
server.addEventListener(
@@ -215,7 +214,6 @@ describe(QUICStream.name, () => {
215214
config: {
216215
tlsConfig: tlsConfig.tlsConfig,
217216
verifyPeer: false,
218-
logKeys: './tmp/key.log',
219217
},
220218
});
221219
server.addEventListener(
@@ -311,7 +309,6 @@ describe(QUICStream.name, () => {
311309
config: {
312310
tlsConfig: tlsConfig.tlsConfig,
313311
verifyPeer: false,
314-
logKeys: './tmp/key.log',
315312
},
316313
});
317314
server.addEventListener(
@@ -415,7 +412,6 @@ describe(QUICStream.name, () => {
415412
config: {
416413
tlsConfig: tlsConfig.tlsConfig,
417414
verifyPeer: false,
418-
logKeys: './tmp/key.log',
419415
},
420416
});
421417
server.addEventListener(

0 commit comments

Comments
 (0)