11use hyper:: body:: Buf ;
2- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
32use std:: sync:: Arc ;
43use tokio:: sync:: Notify ;
54
@@ -24,22 +23,15 @@ impl EosSignaler {
2423/// enough times to advance to the end of the buffer (so that [`Buf::has_remaining`] afterwards returns `0`).
2524pub struct NotifyOnEos < B > {
2625 inner : B ,
27- notifier : Arc < Notify > ,
28- // It'd be better if we consumed the signaler, making it inaccessible after notification.
29- // Unfortunately, that would require something like AtomicOption.
30- // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
31- // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
32- // but it requires both unsafe and heap allocation, which is not worth it.
33- has_already_signaled : AtomicBool ,
26+ notifier : Option < Arc < Notify > > ,
3427}
3528
3629impl < B > NotifyOnEos < B > {
3730 pub fn new ( inner : B ) -> ( Self , EosSignaler ) {
3831 let notifier = Arc :: new ( Notify :: new ( ) ) ;
3932 let this = Self {
4033 inner,
41- notifier : notifier. clone ( ) ,
42- has_already_signaled : AtomicBool :: new ( false ) ,
34+ notifier : Some ( notifier. clone ( ) ) ,
4335 } ;
4436 let signal = EosSignaler { notifier } ;
4537 ( this, signal)
@@ -57,11 +49,14 @@ impl<B: Buf> Buf for NotifyOnEos<B> {
5749
5850 fn advance ( & mut self , cnt : usize ) {
5951 self . inner . advance ( cnt) ;
60- if !self . inner . has_remaining ( ) && !self . has_already_signaled . swap ( true , Ordering :: AcqRel ) {
61- // tokio::sync::Notify has private method `notify_all` that, once stabilized,
62- // would allow us to make `EosSignaler` Cloneable with better ergonomics
63- // to await EOS from multiple places.
64- self . notifier . notify_one ( ) ;
52+ if !self . inner . has_remaining ( ) {
53+ // consume the notifier to ensure we only notify once
54+ if let Some ( notifier) = self . notifier . take ( ) {
55+ // tokio::sync::Notify has private method `notify_all` that, once stabilized,
56+ // would allow us to make `EosSignaler` Cloneable with better ergonomics
57+ // to await EOS from multiple places.
58+ notifier. notify_one ( ) ;
59+ }
6560 }
6661 }
6762}
@@ -83,6 +78,7 @@ mod tests {
8378
8479 #[ cfg( not( miri) ) ]
8580 #[ tokio:: test]
81+ /// Test against the foot-gun of using [`tokio::sync::Notify::notify_waiters`] instead of `notify_one`.
8682 async fn test_get_notified_after_1ms ( ) {
8783 let buf = Bytes :: from_static ( b"abc" ) ;
8884 let ( mut buf, signaler) = NotifyOnEos :: new ( buf) ;
0 commit comments