Skip to content

Commit a83c9dc

Browse files
author
Thomas Järvstrand
committed
Add BufferedSubject
1 parent d03d866 commit a83c9dc

File tree

4 files changed

+560
-0
lines changed

4 files changed

+560
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ Dart provides the [StreamController](https://api.dart.dev/stable/dart-async/Stre
188188

189189
- [BehaviorSubject](https://pub.dev/documentation/rxdart/latest/rx/BehaviorSubject-class.html) - A broadcast StreamController that caches the latest added value or error. When a new listener subscribes to the Stream, the latest value or error will be emitted to the listener. Furthermore, you can synchronously read the last emitted value.
190190
- [ReplaySubject](https://pub.dev/documentation/rxdart/latest/rx/ReplaySubject-class.html) - A broadcast StreamController that caches the added values. When a new listener subscribes to the Stream, the cached values will be emitted to the listener.
191+
- [BufferedSubject](https://pub.dev/documentation/rxdart/latest/rx/BufferedSubject-class.html) - A broadcast StreamController that caches the added values while there are no listeners. When the first new listener subscribes to the Stream, the cached values will be emitted to the listener.
191192

192193
## Rx Observables vs Dart Streams
193194

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import 'dart:async';
2+
import 'dart:collection';
3+
4+
import 'package:rxdart/rxdart.dart';
5+
6+
abstract class _BufferEntry<T> {
7+
void addToSink(StreamSink<T> controller);
8+
}
9+
10+
class _BufferedEvent<T> extends _BufferEntry<T> {
11+
final T value;
12+
_BufferedEvent(this.value);
13+
14+
@override
15+
void addToSink(StreamSink<T> controller) => controller.add(value);
16+
}
17+
18+
class _BufferedError<T> extends _BufferEntry<T> {
19+
final Object error;
20+
final StackTrace? stackTrace;
21+
_BufferedError(this.error, this.stackTrace);
22+
23+
@override
24+
void addToSink(StreamSink<T> controller) => controller.addError(error, stackTrace);
25+
}
26+
27+
/// A special StreamController that captures all of the items that are
28+
/// added to the controller if it has no listener, and emits those as the
29+
/// first items to the first new listener.
30+
///
31+
/// This subject allows sending data, error and done events to the listener.
32+
/// As items are added to the subject, the BufferedSubject will store them.
33+
/// When the stream is listened to, those recorded items will be emitted to
34+
/// the listener. After that, any new events will be appropriately sent to the
35+
/// listeners. It is possible to cap the number of stored events by setting
36+
/// a maxSize value.
37+
///
38+
/// BufferedSubject is, by default, a broadcast (aka hot) controller, in order
39+
/// to fulfill the Rx Subject contract. This means the Subject's `stream` can
40+
/// be listened to multiple times.
41+
///
42+
/// ### Example
43+
///
44+
/// final subject = BufferedSubject<int>();
45+
///
46+
/// subject.add(1);
47+
/// subject.add(2);
48+
/// subject.add(3);
49+
///
50+
/// subject.stream.listen(print); // prints 1, 2, 3
51+
/// subject.add(4);
52+
/// subject.stream.listen(print); // prints 4
53+
///
54+
/// ### Example with maxSize
55+
///
56+
/// final subject = BufferedSubject<int>(maxSize: 2);
57+
///
58+
/// subject.add(1);
59+
/// subject.add(2);
60+
/// subject.add(3);
61+
///
62+
/// subject.stream.listen(print); // prints 2, 3
63+
/// subject.add(4);
64+
/// subject.stream.listen(print); // prints 4
65+
class BufferedSubject<T> extends Subject<T> {
66+
bool _isAddingStreamItems = false;
67+
final int? _maxSize;
68+
final Queue<_BufferEntry> _buffer;
69+
final StreamController<T> _controller;
70+
@override
71+
void Function()? onListen;
72+
73+
BufferedSubject._(this._controller, Stream<T> stream, this._maxSize, this._buffer, this.onListen)
74+
: super(_controller, stream) {
75+
_controller.onListen = () {
76+
for (final el in _buffer) {
77+
el.addToSink(_controller);
78+
}
79+
_buffer.clear();
80+
onListen?.call();
81+
};
82+
}
83+
84+
/// Constructs a [BufferedSubject], optionally pass handlers for
85+
/// [onListen], [onCancel] and a flag to handle events [sync].
86+
///
87+
/// See also [StreamController.broadcast]
88+
factory BufferedSubject({void Function()? onListen, void Function()? onCancel, bool sync = false, int? maxSize}) {
89+
final Queue<_BufferEntry<T>> buffer = Queue();
90+
final controller = StreamController<T>.broadcast(onCancel: onCancel, sync: sync);
91+
92+
return BufferedSubject<T>._(controller, controller.stream, maxSize, buffer, onListen);
93+
}
94+
95+
@override
96+
void add(T event) {
97+
if (hasListener) {
98+
super.add(event);
99+
} else {
100+
_verifyState();
101+
_buffer.add(_BufferedEvent<T>(event));
102+
_truncateBuffer();
103+
}
104+
}
105+
106+
@override
107+
void addError(Object error, [StackTrace? stackTrace]) {
108+
if (hasListener) {
109+
super.addError(error, stackTrace);
110+
} else {
111+
_verifyState();
112+
_buffer.add(_BufferedError<T>(error, stackTrace));
113+
_truncateBuffer();
114+
}
115+
}
116+
117+
@override
118+
Future<void> addStream(Stream<T> source, {bool? cancelOnError}) async {
119+
if (hasListener) {
120+
return super.addStream(source, cancelOnError: cancelOnError);
121+
} else {
122+
_verifyState();
123+
final completer = Completer<void>();
124+
_isAddingStreamItems = true;
125+
126+
source.listen(
127+
(T event) {
128+
_buffer.add(_BufferedEvent<T>(event));
129+
_truncateBuffer();
130+
},
131+
cancelOnError: cancelOnError,
132+
onDone: completer.complete,
133+
onError: (Object e, StackTrace s) {
134+
_buffer.add(_BufferedError<T>(e, s));
135+
_truncateBuffer();
136+
if (cancelOnError == true) completer.complete();
137+
},
138+
);
139+
140+
return completer.future.then((_) {
141+
_isAddingStreamItems = false;
142+
});
143+
}
144+
}
145+
146+
void _truncateBuffer() {
147+
final max = _maxSize;
148+
while (max != null && _buffer.length > max) {
149+
_buffer.removeFirst();
150+
}
151+
}
152+
153+
void _verifyState() {
154+
if (_isAddingStreamItems) {
155+
throw StateError('You cannot add items while items are being added from addStream');
156+
}
157+
}
158+
159+
@override
160+
Future<dynamic> close() async {
161+
if (!hasListener) _verifyState();
162+
return super.close();
163+
}
164+
}

lib/subjects.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
library rx_subjects;
22

33
export 'src/subjects/behavior_subject.dart';
4+
export 'src/subjects/buffered_subject.dart';
45
export 'src/subjects/publish_subject.dart';
56
export 'src/subjects/replay_subject.dart';
67
export 'src/subjects/subject.dart';

0 commit comments

Comments
 (0)