Skip to content

Commit c8355d1

Browse files
authored
Fix fetch response stream cancellation (#1822)
1 parent 81b5979 commit c8355d1

File tree

4 files changed

+273
-36
lines changed

4 files changed

+273
-36
lines changed

pkgs/http/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
* **Breaking** Change the behavior of `Request.body` so that a charset
44
parameter is only added for text and XML media types. This brings the
55
behavior of `package:http` in line with RFC-8259.
6+
* On the web, fix cancellations for `StreamSubscription`s of response bodies
7+
waiting for the next chunk.
68
* Export `MediaType` from `package:http_parser`.
79
* Added a section on testing to `README.md`.
810

pkgs/http/lib/src/browser_client.dart

Lines changed: 90 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import 'package:web/web.dart'
1111
DOMException,
1212
HeadersInit,
1313
ReadableStreamDefaultReader,
14+
ReadableStreamReadResult,
1415
RequestInfo,
1516
RequestInit,
1617
Response;
@@ -116,7 +117,7 @@ class BrowserClient extends BaseClient {
116117
}.toJS);
117118

118119
return StreamedResponseV2(
119-
_readBody(request, response),
120+
_bodyToStream(request, response),
120121
response.status,
121122
headers: headers,
122123
request: request,
@@ -144,9 +145,9 @@ class BrowserClient extends BaseClient {
144145
}
145146
}
146147

147-
Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
148-
if (e case DOMException(:final name) when name == 'AbortError') {
149-
Error.throwWithStackTrace(RequestAbortedException(request.url), st);
148+
Object _toClientException(Object e, BaseRequest request) {
149+
if (e case DOMException(name: 'AbortError')) {
150+
return RequestAbortedException(request.url);
150151
}
151152
if (e is! ClientException) {
152153
var message = e.toString();
@@ -155,49 +156,103 @@ Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
155156
}
156157
e = ClientException(message, request.url);
157158
}
158-
Error.throwWithStackTrace(e, st);
159+
return e;
160+
}
161+
162+
Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
163+
Error.throwWithStackTrace(_toClientException(e, request), st);
159164
}
160165

161-
Stream<List<int>> _readBody(BaseRequest request, Response response) async* {
162-
final bodyStreamReader =
163-
response.body?.getReader() as ReadableStreamDefaultReader?;
166+
Stream<List<int>> _bodyToStream(BaseRequest request, Response response) =>
167+
Stream.multi(
168+
isBroadcast: false,
169+
(listener) => _readStreamBody(request, response, listener),
170+
);
164171

165-
if (bodyStreamReader == null) {
172+
Future<void> _readStreamBody(BaseRequest request, Response response,
173+
MultiStreamController<List<int>> controller) async {
174+
final reader = response.body?.getReader() as ReadableStreamDefaultReader?;
175+
if (reader == null) {
176+
// No response? Treat that as an empty stream.
177+
await controller.close();
166178
return;
167179
}
168180

169-
var isDone = false, isError = false;
170-
try {
171-
while (true) {
172-
final chunk = await bodyStreamReader.read().toDart;
173-
if (chunk.done) {
174-
isDone = true;
175-
break;
181+
Completer<void>? resumeSignal;
182+
var cancelled = false;
183+
var hadError = false;
184+
controller
185+
..onResume = () {
186+
if (resumeSignal case final resume?) {
187+
resumeSignal = null;
188+
resume.complete();
176189
}
177-
yield (chunk.value! as JSUint8Array).toDart;
178190
}
179-
} catch (e, st) {
180-
isError = true;
181-
_rethrowAsClientException(e, st, request);
182-
} finally {
183-
if (!isDone) {
191+
..onCancel = () async {
184192
try {
185-
// catchError here is a temporary workaround for
186-
// http://dartbug.com/57046: an exception from cancel() will
187-
// clobber an exception which is currently in flight.
188-
await bodyStreamReader
189-
.cancel()
190-
.toDart
191-
.catchError((_) => null, test: (_) => isError);
192-
} catch (e, st) {
193-
// If we have already encountered an error swallow the
194-
// error from cancel and simply let the original error to be
195-
// rethrown.
196-
if (!isError) {
197-
_rethrowAsClientException(e, st, request);
193+
cancelled = true;
194+
// We only cancel the reader when the subscription is cancelled - we
195+
// don't need to do that for normal done events because the stream is in
196+
// a completed state at that point.
197+
await reader.cancel().toDart;
198+
} catch (e, s) {
199+
// It is possible for reader.cancel() to throw. This happens either
200+
// because the stream has already been in an error state (in which case
201+
// we would have called addErrorSync() before and don't need to re-
202+
// report the error here), or because of an issue here (MDN says the
203+
// method can throw if "The source object is not a
204+
// ReadableStreamDefaultReader, or the stream has no owner."). Both of
205+
// these don't look applicable here, but we want to ensure a new error
206+
// in cancel() is surfaced to the caller.
207+
if (!hadError) {
208+
_rethrowAsClientException(e, s, request);
198209
}
199210
}
211+
};
212+
213+
// Async loop reading chunks from `bodyStreamReader` and sending them to
214+
// `controller`.
215+
// Checks for pause/cancel after delivering each event.
216+
// Exits if stream closes or becomes an error, or if cancelled.
217+
while (true) {
218+
final ReadableStreamReadResult chunk;
219+
try {
220+
chunk = await reader.read().toDart;
221+
} catch (e, s) {
222+
// After a stream was cancelled, adding error events would result in
223+
// unhandled async errors. This is most likely an AbortError anyway, so
224+
// not really an exceptional state. We report errors of .cancel() in
225+
// onCancel, that should cover this case.
226+
if (!cancelled) {
227+
hadError = true;
228+
controller.addErrorSync(_toClientException(e, request), s);
229+
await controller.close();
230+
}
231+
232+
break;
233+
}
234+
235+
if (chunk.done) {
236+
// Sync because we're forwarding an async event.
237+
controller.closeSync();
238+
break;
239+
} else {
240+
// Handle chunk whether paused, cancelled or not.
241+
// If subscription is cancelled, it's a no-op to add events.
242+
// If subscription is paused, events will be buffered until resumed,
243+
// which is what we need.
244+
// We can use addSync here because we're only forwarding this async
245+
// event.
246+
controller.addSync((chunk.value! as JSUint8Array).toDart);
247+
}
248+
249+
// Check pause/cancel state immediately *after* delivering event,
250+
// listener might have paused or cancelled.
251+
if (controller.isPaused) {
252+
// Will never complete if cancelled before resumed.
253+
await (resumeSignal ??= Completer<void>()).future;
200254
}
255+
if (!controller.hasListener) break; // Is cancelled.
201256
}
202257
}
203258

pkgs/http_client_conformance_tests/lib/src/response_body_streamed_server.dart

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,25 @@ void hybridMain(StreamChannel<Object?> channel) async {
2323
late HttpServer server;
2424
server = (await HttpServer.bind('localhost', 0))
2525
..listen((request) async {
26+
var path = request.uri.pathSegments;
27+
// Slow down lines if requested, e.g. GET /1000 would send a line every
28+
// second. This is used to test cancellations.
29+
var delayBetweenLines = switch (path) {
30+
[var delayMs] => Duration(milliseconds: int.parse(delayMs)),
31+
_ => Duration.zero,
32+
};
33+
2634
await request.drain<void>();
2735
request.response.headers.set('Access-Control-Allow-Origin', '*');
2836
request.response.headers.set('Content-Type', 'text/plain');
37+
request.response.bufferOutput = false;
38+
2939
serverWriting = true;
3040
for (var i = 0; serverWriting; ++i) {
3141
request.response.write('$i\n');
3242
await request.response.flush();
3343
// Let the event loop run.
34-
await Future(() {});
44+
await Future<void>.delayed(delayBetweenLines);
3545
}
3646
await request.response.close();
3747
unawaited(server.close());

pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,118 @@ void testResponseBodyStreamed(Client client,
5959
expect(response.statusCode, 200);
6060
});
6161

62+
test('pausing response stream after events', () async {
63+
final response = await client.send(Request('GET', Uri.http(host, '')));
64+
expect(response.reasonPhrase, 'OK');
65+
expect(response.statusCode, 200);
66+
67+
// The server responds with a streamed response of lines containing
68+
// incrementing integers. Verify that pausing the stream after each one
69+
// does not cause any missed lines.
70+
final stream = response.stream
71+
.transform(const Utf8Decoder())
72+
.transform(const LineSplitter())
73+
.map(int.parse);
74+
var expectedLine = 0;
75+
final cancelCompleter = Completer<void>();
76+
late StreamSubscription<int> subscription;
77+
78+
subscription = stream.listen((line) async {
79+
expect(line, expectedLine);
80+
expectedLine++;
81+
82+
if (expectedLine == 10) {
83+
subscription.pause();
84+
Future.delayed(
85+
const Duration(seconds: 1), () => subscription.resume());
86+
}
87+
88+
if (expectedLine == 100) {
89+
cancelCompleter.complete(subscription.cancel());
90+
}
91+
await pumpEventQueue();
92+
});
93+
94+
await cancelCompleter.future;
95+
expect(expectedLine, 100);
96+
});
97+
98+
test('pausing response stream asynchronously', () async {
99+
final response = await client.send(Request('GET', Uri.http(host, '')));
100+
expect(response.reasonPhrase, 'OK');
101+
expect(response.statusCode, 200);
102+
103+
final originalSubscription = response.stream
104+
.transform(const Utf8Decoder())
105+
.transform(const LineSplitter())
106+
.map(int.parse)
107+
.listen(null);
108+
var expectedLine = 0;
109+
await for (final line in SubscriptionStream(originalSubscription)) {
110+
expect(line, expectedLine);
111+
expectedLine++;
112+
if (expectedLine == 100) {
113+
break;
114+
}
115+
116+
// Instead of pausing the subscription in response to an event, pause it
117+
// after the event has already been delivered.
118+
Timer.run(() {
119+
originalSubscription.pause(Future(pumpEventQueue));
120+
});
121+
}
122+
});
123+
124+
test('cancel paused stream', () async {
125+
final response = await client.send(Request('GET', Uri.http(host, '')));
126+
expect(response.reasonPhrase, 'OK');
127+
expect(response.statusCode, 200);
128+
129+
final completer = Completer<void>();
130+
late StreamSubscription<String> subscription;
131+
subscription = response.stream
132+
.transform(const Utf8Decoder())
133+
.transform(const LineSplitter())
134+
.listen((line) async {
135+
subscription.pause();
136+
137+
completer.complete(Future(() async {
138+
await pumpEventQueue();
139+
await subscription.cancel();
140+
}));
141+
});
142+
143+
await completer.future;
144+
});
145+
146+
test('cancel paused stream via abortable request', () async {
147+
final abortTrigger = Completer<void>();
148+
final response = await client.send(AbortableRequest(
149+
'GET', Uri.http(host, ''),
150+
abortTrigger: abortTrigger.future));
151+
expect(response.reasonPhrase, 'OK');
152+
expect(response.statusCode, 200);
153+
154+
late StreamSubscription<String> subscription;
155+
subscription = response.stream
156+
.transform(const Utf8Decoder())
157+
.transform(const LineSplitter())
158+
.listen((line) {
159+
if (!abortTrigger.isCompleted) {
160+
abortTrigger.complete();
161+
}
162+
});
163+
164+
final aborted = expectLater(subscription.asFuture<void>(),
165+
throwsA(isA<RequestAbortedException>()));
166+
await abortTrigger.future;
167+
168+
// We need to resume the subscription after the response has been
169+
// cancelled to record that error event.
170+
subscription.resume();
171+
await aborted;
172+
});
173+
62174
test('cancel streamed response', () async {
63175
final request = Request('GET', Uri.http(host, ''));
64176
final response = await client.send(request);
@@ -77,5 +189,63 @@ void testResponseBodyStreamed(Client client,
77189
});
78190
await cancelled.future;
79191
});
192+
193+
test('cancelling stream subscription after chunk', () async {
194+
// Request a 10s delay between subsequent lines.
195+
const delayMillis = 10000;
196+
final request = Request('GET', Uri.http(host, '$delayMillis'));
197+
final response = await client.send(request);
198+
expect(response.reasonPhrase, 'OK');
199+
expect(response.statusCode, 200);
200+
201+
final cancelled = Completer<void>();
202+
var stopwatch = Stopwatch();
203+
final subscription = response.stream
204+
.transform(const Utf8Decoder())
205+
.transform(const LineSplitter())
206+
.listen(null);
207+
subscription.onData((line) {
208+
stopwatch.start();
209+
cancelled.complete(subscription.cancel());
210+
expect(line, '0');
211+
});
212+
213+
await cancelled.future;
214+
stopwatch.stop();
215+
216+
// Receiving the first line and cancelling the stream should not wait for
217+
// the second line, which is sent much later.
218+
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
219+
});
220+
221+
test('cancelling stream subscription after chunk with delay', () async {
222+
// Request a 10s delay between subsequent lines.
223+
const delayMillis = 10000;
224+
final request = Request('GET', Uri.http(host, '$delayMillis'));
225+
final response = await client.send(request);
226+
expect(response.reasonPhrase, 'OK');
227+
expect(response.statusCode, 200);
228+
229+
var stopwatch = Stopwatch()..start();
230+
final done = Completer<void>();
231+
late StreamSubscription<String> sub;
232+
sub = response.stream
233+
.transform(utf8.decoder)
234+
.transform(const LineSplitter())
235+
.listen((line) {
236+
// Don't cancel in direct response to event, we want to test cancelling
237+
// while the client is actively waiting for data.
238+
Timer.run(() {
239+
stopwatch.start();
240+
done.complete(sub.cancel());
241+
});
242+
});
243+
244+
await done.future;
245+
stopwatch.stop();
246+
// Receiving the first line and cancelling the stream should not wait for
247+
// the second line, which is sent much later.
248+
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
249+
});
80250
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
81251
}

0 commit comments

Comments
 (0)