Skip to content

Commit 3d5997c

Browse files
committed
support pre-delay signal trigger
1 parent 9cb360f commit 3d5997c

File tree

5 files changed

+147
-13
lines changed

5 files changed

+147
-13
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
# 0.2.1 (30. September, 2024)
9+
10+
Expose a signal that can be awaited on without awaiting the configured
11+
delay first. If no delay is used this API is equivalent to the already
12+
existing `cancelled` function.
13+
14+
This can be used for scenarios where you do not need a graceful buffer and would like to
15+
cancel as soon as a signal is received.
16+
817
# 0.2.0 (29. September, 2024)
918

1019
This is usability wise not a breaking release,

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
categories = ["asynchronous", "network-programming"]
33
edition = "2021"
44
name = "tokio-graceful"
5-
version = "0.2.0"
5+
version = "0.2.1"
66
description = "util for graceful shutdown of tokio applications"
77
homepage = "https://github.com/plabayo/tokio-graceful"
88
readme = "README.md"

src/guard.rs

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,33 @@ pub struct ShutdownGuard(ManuallyDrop<WeakShutdownGuard>);
2626
#[derive(Debug, Clone)]
2727
pub struct WeakShutdownGuard {
2828
pub(crate) trigger_rx: Receiver,
29+
pub(crate) shutdown_signal_trigger_rx: Option<Receiver>,
2930
pub(crate) zero_tx: Sender,
3031
pub(crate) ref_count: Arc<AtomicUsize>,
3132
}
3233

3334
impl ShutdownGuard {
34-
pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc<AtomicUsize>) -> Self {
35+
pub(crate) fn new(
36+
trigger_rx: Receiver,
37+
shutdown_signal_trigger_rx: Option<Receiver>,
38+
zero_tx: Sender,
39+
ref_count: Arc<AtomicUsize>,
40+
) -> Self {
3541
let value = ref_count.fetch_add(1, Ordering::SeqCst);
3642
tracing::trace!("new shutdown guard: ref_count+1: {}", value + 1);
3743
Self(ManuallyDrop::new(WeakShutdownGuard::new(
38-
trigger_rx, zero_tx, ref_count,
44+
trigger_rx,
45+
shutdown_signal_trigger_rx,
46+
zero_tx,
47+
ref_count,
3948
)))
4049
}
4150

42-
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
51+
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested
52+
/// and the delay (if any) duration has been awaited.
53+
///
54+
/// Use [`Self::shutdown_signal_triggered`] for tasks that do not
55+
/// require this opt-in delay buffer duration.
4356
///
4457
/// The future will complete immediately if the token is already cancelled when this method is called.
4558
///
@@ -56,6 +69,29 @@ impl ShutdownGuard {
5669
self.0.cancelled().await
5770
}
5871

72+
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
73+
///
74+
/// Use [`Self::cancelled`] if you want to make sure the future
75+
/// only completes when the buffer delay has been awaited.
76+
///
77+
/// In case no delay has been configured for the parent `Shutdown`,
78+
/// this function will be equal in behaviour to [`Self::cancelled`].
79+
///
80+
/// The future will complete immediately if the token is already cancelled when this method is called.
81+
///
82+
/// # Cancel safety
83+
///
84+
/// This method is cancel safe.
85+
///
86+
/// # Panics
87+
///
88+
/// This method panics if the iternal mutex
89+
/// is poisoned while being used.
90+
#[inline]
91+
pub async fn shutdown_signal_triggered(&self) {
92+
self.0.shutdown_signal_triggered().await
93+
}
94+
5995
/// Returns a [`crate::sync::JoinHandle`] that can be awaited on
6096
/// to wait for the spawned task to complete. See
6197
/// [`crate::sync::spawn`] for more information.
@@ -166,15 +202,26 @@ impl Drop for ShutdownGuard {
166202
}
167203

168204
impl WeakShutdownGuard {
169-
pub(crate) fn new(trigger_rx: Receiver, zero_tx: Sender, ref_count: Arc<AtomicUsize>) -> Self {
205+
pub(crate) fn new(
206+
trigger_rx: Receiver,
207+
shutdown_signal_trigger_rx: Option<Receiver>,
208+
zero_tx: Sender,
209+
ref_count: Arc<AtomicUsize>,
210+
) -> Self {
170211
Self {
171212
trigger_rx,
213+
shutdown_signal_trigger_rx,
172214
zero_tx,
173215
ref_count,
174216
}
175217
}
176218

177-
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
219+
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested
220+
/// and the delay (buffer) duration has been awaited on.
221+
///
222+
/// Use [`Self::shutdown_signal_triggered`] in case you want to get
223+
/// a future which is triggered immediately when the shutdown signal is received,
224+
/// without waiting for the delay duration first.
178225
///
179226
/// The future will complete immediately if the token is already cancelled when this method is called.
180227
///
@@ -191,6 +238,34 @@ impl WeakShutdownGuard {
191238
self.trigger_rx.clone().await;
192239
}
193240

241+
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested
242+
/// without awaiting the delay duration first, if one is set.
243+
///
244+
/// In case no delay has been configured for the parent `Shutdown`,
245+
/// this function will be equal in behaviour to [`Self::cancelled`].
246+
///
247+
/// Use [`Self::cancelled`] in case you want to get
248+
/// a future which is triggered when the shutdown signal is received
249+
/// and thethe delay duration is awaited.
250+
///
251+
/// The future will complete immediately if the token is already cancelled when this method is called.
252+
///
253+
/// # Cancel safety
254+
///
255+
/// This method is cancel safe.
256+
///
257+
/// # Panics
258+
///
259+
/// This method panics if the iternal mutex
260+
/// is poisoned while being used.
261+
#[inline]
262+
pub async fn shutdown_signal_triggered(&self) {
263+
self.shutdown_signal_trigger_rx
264+
.clone()
265+
.unwrap_or_else(|| self.trigger_rx.clone())
266+
.await
267+
}
268+
194269
/// Returns a Future that gets fulfilled when cancellation (shutdown) is requested.
195270
///
196271
/// In contrast to [`ShutdownGuard::cancelled`] this method consumes the guard,

src/lib.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,30 @@ mod tests {
224224
assert!(result.is_err(), "{result:?}");
225225
}
226226

227+
#[tokio::test]
228+
async fn test_shutdown_cancelled_vs_shutdown_signal_triggered() {
229+
let (tx, rx) = oneshot::channel::<()>();
230+
let shutdown = Shutdown::builder()
231+
.with_delay(Duration::from_secs(5))
232+
.with_signal(rx)
233+
.build();
234+
tx.send(()).unwrap();
235+
236+
let weak_guard = shutdown.guard_weak();
237+
238+
// will fail because delay is still being awaited
239+
let result = tokio::time::timeout(Duration::from_micros(100), weak_guard.cancelled()).await;
240+
assert!(result.is_err(), "{result:?}");
241+
242+
// this will succeed however, as it does not await the delay
243+
let result = tokio::time::timeout(
244+
Duration::from_millis(100),
245+
weak_guard.shutdown_signal_triggered(),
246+
)
247+
.await;
248+
assert!(result.is_ok(), "{result:?}");
249+
}
250+
227251
#[tokio::test]
228252
async fn test_shutdown_nested_guards() {
229253
let (tx, rx) = oneshot::channel::<()>();

src/shutdown.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl ShutdownBuilder<sealed::WithoutSignal> {
157157
pub fn build(self) -> Shutdown {
158158
let (zero_tx, zero_rx) = trigger();
159159

160-
let guard = ShutdownGuard::new(Receiver::closed(), zero_tx, Default::default());
160+
let guard = ShutdownGuard::new(Receiver::closed(), None, zero_tx, Default::default());
161161

162162
Shutdown {
163163
guard,
@@ -173,16 +173,29 @@ impl<I: sealed::IntoFuture> ShutdownBuilder<sealed::WithSignal<I>> {
173173
/// all jobs are complete.
174174
pub fn build(self) -> Shutdown {
175175
let trigger_signal = self.data.signal.into_future();
176-
let delay = self.data.delay;
176+
177+
let (delay_tuple, maybe_shutdown_signal_rx) = match self.data.delay {
178+
Some(delay) => {
179+
let (shutdown_signal_tx, shutdown_signal_rx) = trigger();
180+
(Some((delay, shutdown_signal_tx)), Some(shutdown_signal_rx))
181+
}
182+
None => (None, None),
183+
};
177184

178185
let (signal_tx, signal_rx) = trigger();
179186
let (zero_tx, zero_rx) = trigger();
180187

181-
let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default());
188+
let guard = ShutdownGuard::new(
189+
signal_rx,
190+
maybe_shutdown_signal_rx,
191+
zero_tx,
192+
Default::default(),
193+
);
182194

183195
crate::sync::spawn(async move {
184196
let _ = trigger_signal.await;
185-
if let Some(delay) = delay {
197+
if let Some((delay, shutdown_signal_tx)) = delay_tuple {
198+
shutdown_signal_tx.trigger();
186199
tracing::trace!(
187200
"::trigger signal recieved: delay buffer activated: {:?}",
188201
delay
@@ -213,13 +226,25 @@ where
213226
pub fn build(self) -> Shutdown {
214227
let trigger_signal = self.data.signal.into_future();
215228
let overwrite_fn = self.data.overwrite_fn;
216-
let delay = self.data.delay;
229+
230+
let (delay_tuple, maybe_shutdown_signal_rx) = match self.data.delay {
231+
Some(delay) => {
232+
let (shutdown_signal_tx, shutdown_signal_rx) = trigger();
233+
(Some((delay, shutdown_signal_tx)), Some(shutdown_signal_rx))
234+
}
235+
None => (None, None),
236+
};
217237

218238
let (signal_tx, signal_rx) = trigger();
219239
let (zero_tx, zero_rx) = trigger();
220240
let (zero_overwrite_tx, zero_overwrite_rx) = trigger();
221241

222-
let guard = ShutdownGuard::new(signal_rx, zero_tx, Default::default());
242+
let guard = ShutdownGuard::new(
243+
signal_rx,
244+
maybe_shutdown_signal_rx,
245+
zero_tx,
246+
Default::default(),
247+
);
223248

224249
crate::sync::spawn(async move {
225250
let _ = trigger_signal.await;
@@ -228,7 +253,8 @@ where
228253
let _ = overwrite_signal.await;
229254
zero_overwrite_tx.trigger();
230255
});
231-
if let Some(delay) = delay {
256+
if let Some((delay, shutdown_signal_tx)) = delay_tuple {
257+
shutdown_signal_tx.trigger();
232258
tracing::trace!(
233259
"::trigger signal recieved: delay buffer activated: {:?}",
234260
delay

0 commit comments

Comments
 (0)