Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(throttle): fix issue #709, where throttled Stream does not emit done event. #710

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 52 additions & 32 deletions lib/src/transformers/backpressure/backpressure.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
@override
void onData(S data) {
_hasData = true;
maybeCreateWindow(data, sink);
maybeCreateWindow(data);

if (skip == 0) {
queue.add(data);
Expand All @@ -68,7 +68,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
skip--;
}

maybeCloseWindow(sink);
maybeCloseWindow();
}

@override
Expand All @@ -79,20 +79,27 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
_mainClosed = true;

if (_strategy == WindowStrategy.eventAfterLastWindow) {
resolveWindowEnd(isControllerClosing: true, isWindowClosed: false);
return;
}

// treat the final event as a Window that opens
// and immediately closes again
if (_dispatchOnClose && queue.isNotEmpty) {
resolveWindowStart(queue.last, sink);
resolveWindowStart(queue.last);
}

resolveWindowEnd(sink, true);
resolveWindowEnd(isControllerClosing: true, isWindowClosed: false);

clearAndClose();
}

void clearAndClose() {
queue.clear();

_windowSubscription?.cancel();
_windowSubscription = null;

sink.close();
}

Expand All @@ -108,63 +115,65 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
@override
void onResume() => _windowSubscription?.resume();

void maybeCreateWindow(S event, EventSink<T> sink) {
void maybeCreateWindow(S event) {
switch (_strategy) {
// for example throttle
case WindowStrategy.eventAfterLastWindow:
if (_windowSubscription != null) return;

_windowSubscription = singleWindow(event, sink);
_windowSubscription = singleWindow(event);

resolveWindowStart(event, sink);
resolveWindowStart(event);

break;
// for example scan
case WindowStrategy.firstEventOnly:
if (_windowSubscription != null) return;

_windowSubscription = multiWindow(event, sink);
_windowSubscription = multiWindow(event);

resolveWindowStart(event, sink);
resolveWindowStart(event);

break;
// for example debounce
case WindowStrategy.everyEvent:
_windowSubscription?.cancel();

_windowSubscription = singleWindow(event, sink);
_windowSubscription = singleWindow(event);

resolveWindowStart(event, sink);
resolveWindowStart(event);

break;
case WindowStrategy.onHandler:
break;
}
}

void maybeCloseWindow(EventSink<T> sink) {
void maybeCloseWindow() {
if (_closeWindowWhen != null && _closeWindowWhen!(unmodifiableQueue)) {
resolveWindowEnd(sink);
resolveWindowEnd(isControllerClosing: false, isWindowClosed: false);
}
}

StreamSubscription<dynamic> singleWindow(S event, EventSink<T> sink) =>
buildStream(event, sink).take(1).listen(
StreamSubscription<dynamic> singleWindow(S event) =>
buildStream(event).take(1).listen(
null,
onError: sink.addError,
onDone: () => resolveWindowEnd(sink, _mainClosed),
onDone: () => resolveWindowEnd(
isControllerClosing: _mainClosed, isWindowClosed: true),
);

// opens a new Window which is kept open until the main Stream
// closes.
StreamSubscription<dynamic> multiWindow(S event, EventSink<T> sink) =>
buildStream(event, sink).listen(
(dynamic _) => resolveWindowEnd(sink),
StreamSubscription<dynamic> multiWindow(S event) => buildStream(event).listen(
(dynamic _) => resolveWindowEnd(
isControllerClosing: _mainClosed, isWindowClosed: false),
onError: sink.addError,
onDone: () => resolveWindowEnd(sink),
onDone: () => resolveWindowEnd(
isControllerClosing: _mainClosed, isWindowClosed: true),
);

Stream<dynamic> buildStream(S event, EventSink<T> sink) {
Stream<dynamic> buildStream(S event) {
Stream stream;

_windowSubscription?.cancel();
Expand All @@ -174,27 +183,38 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
return stream;
}

void resolveWindowStart(S event, EventSink<T> sink) {
void resolveWindowStart(S event) {
if (_onWindowStart != null) {
sink.add(_onWindowStart!(event));
}
}

void resolveWindowEnd(EventSink<T> sink, [bool isControllerClosing = false]) {
void resolveWindowEnd({
required bool isControllerClosing,
required bool isWindowClosed,
}) {
if (isControllerClosing &&
_strategy == WindowStrategy.eventAfterLastWindow) {
if (_dispatchOnClose &&
_hasData &&
queue.length > 1 &&
_onWindowEnd != null) {
sink.add(_onWindowEnd!(unmodifiableQueue));
// has no last data, close immediately
if (!_hasData || queue.length == 1) {
clearAndClose();
return;
}

queue.clear();
_windowSubscription?.cancel();
_windowSubscription = null;
// once the Stream has emitted done event, there may still be a pending data
// waiting to be emitted. If so, wait for the window to end and then
// emit it.
if (!isWindowClosed) {
// defer until the window closes
return;
}

// send the last event
if (_dispatchOnClose && _onWindowEnd != null) {
sink.add(_onWindowEnd!(unmodifiableQueue));
}

sink.close();
clearAndClose();
return;
}

Expand Down
29 changes: 29 additions & 0 deletions test/transformers/backpressure/throttle_time_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ void main() {
emitsInOrder(<dynamic>[1, 4, 7, emitsDone]));
});

test('Rx.throttleTime.trailing.empty', () async {
await expectLater(
Stream<int>.empty().throttleTime(
const Duration(milliseconds: 250),
leading: false,
trailing: true,
),
emitsDone,
);

await expectLater(
Stream<int>.empty().throttleTime(
const Duration(milliseconds: 250),
leading: true,
trailing: true,
),
emitsDone,
);
});

test('Rx.throttleTime.trailing', () async {
await expectLater(
_stream()
Expand Down Expand Up @@ -99,4 +119,13 @@ void main() {
(s) => s.throttleTime(Duration.zero),
);
});

test('issue/709 throttled stream closes', () async {
final c = StreamController<String>();
unawaited(Future<void>.delayed(Duration(milliseconds: 500))
.then<void>((f) => c.close()));

final s = c.stream.throttleTime(Duration(milliseconds: 100));
await for (var _ in s) {}
});
}