Skip to content

Commit 9b486ba

Browse files
committed
add singleflight
1 parent c50abbf commit 9b486ba

File tree

6 files changed

+120
-13
lines changed

6 files changed

+120
-13
lines changed

special/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ edition = "2021"
88
[dependencies]
99
async-lock = "2.5.0"
1010
async-oneshot = "0.5.0"
11+
async-weighted-semaphore = "0.2.1"
12+
async_singleflight = "0.5.0"
1113
atomic_float = "0.1.0"
1214
atomicbox = "0.4.0"
1315
atomig = "0.4.0"
@@ -19,14 +21,17 @@ dashmap = "5.4.0"
1921
event-listener = "2.5.3"
2022
evmap = "10.0.2"
2123
flurry = "0.4.0"
24+
futures = "0.3.25"
2225
oneshot = "0.1.5"
2326
portable-atomic = { version = "0.3", features=["float"] }
2427
process_lock = "0.1.0"
2528
scc = "0.11.1"
2629
sharded-slab = "0.1.4"
2730
simple-mutex = "1.1.5"
31+
singleflight-async = "0.1.1"
2832
slab = "0.4.7"
2933
smol = "1.2.5"
34+
tokio = { version = "1.21.2", features = ["full"] }
3035
triggered = "0.1.2"
3136
triple_buffer = "6.2.0"
3237
try-lock = "0.2.3"

special/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ mod primitive;
55
mod notify;
66
mod queue;
77
mod scc_examples;
8+
mod sema_examples;
9+
mod singleflight_example;
810

911
pub use oslock::*;
1012
pub use oneshots::*;
1113
pub use map::*;
1214
pub use primitive::*;
1315
pub use notify::*;
1416
pub use queue::*;
15-
pub use scc_examples::*;
17+
pub use scc_examples::*;
18+
pub use sema_examples::*;
19+
pub use singleflight_example::*;

special/src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ fn main() {
2626
async_lock_mutex();
2727
async_lock_rwlock();
2828
async_lock_barrier();
29-
async_lock_semaphore();
3029

3130
portable_atomic_i128();
3231
portable_atomic_u128();
@@ -51,6 +50,14 @@ fn main() {
5150
scc_hashset();
5251
scc_queue();
5352

53+
54+
async_lock_semaphore();
55+
async_weighted_semaphore_example();
56+
tokio_semaphore_example();
57+
58+
singleflight_example();
59+
async_singleflight_example();
60+
5461
}
5562

5663

special/src/primitive/async_lock_examples.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,3 @@ pub fn async_lock_barrier() {
5050
}
5151
});
5252
}
53-
54-
pub fn async_lock_semaphore() {
55-
let s = Arc::new(Semaphore::new(2));
56-
57-
let _g1 = s.try_acquire_arc().unwrap();
58-
let g2 = s.try_acquire_arc().unwrap();
59-
60-
assert!(s.try_acquire_arc().is_none());
61-
drop(g2);
62-
assert!(s.try_acquire_arc().is_some());
63-
}

special/src/sema_examples.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use futures::pin_mut;
2+
use futures::poll;
3+
use std::sync::Arc;
4+
5+
pub fn tokio_semaphore_example() {
6+
let rt = tokio::runtime::Runtime::new().unwrap();
7+
8+
rt.block_on(async {
9+
let semaphore = Arc::new(tokio::sync::Semaphore::new(3));
10+
let mut join_handles = Vec::new();
11+
12+
for _ in 0..5 {
13+
let permit = semaphore.clone().acquire_owned().await.unwrap();
14+
join_handles.push(tokio::spawn(async move {
15+
// perform task...
16+
// explicitly own `permit` in the task
17+
drop(permit);
18+
}));
19+
}
20+
21+
for handle in join_handles {
22+
handle.await.unwrap();
23+
}
24+
});
25+
}
26+
27+
pub fn async_weighted_semaphore_example() {
28+
smol::block_on(async {
29+
let sem = async_weighted_semaphore::Semaphore::new(1);
30+
let a = sem.acquire(2);
31+
let b = sem.acquire(1);
32+
pin_mut!(a);
33+
pin_mut!(b);
34+
assert!(poll!(&mut a).is_pending());
35+
assert!(poll!(&mut b).is_pending());
36+
37+
sem.release(1);
38+
assert!(poll!(&mut a).is_ready());
39+
assert!(poll!(&mut b).is_ready());
40+
});
41+
}
42+
43+
pub fn async_lock_semaphore() {
44+
let s = Arc::new(async_lock::Semaphore::new(2));
45+
46+
let _g1 = s.try_acquire_arc().unwrap();
47+
let g2 = s.try_acquire_arc().unwrap();
48+
49+
assert!(s.try_acquire_arc().is_none());
50+
drop(g2);
51+
assert!(s.try_acquire_arc().is_some());
52+
}

special/src/singleflight_example.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use futures::future::join_all;
2+
use singleflight_async::SingleFlight;
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use async_singleflight::Group;
7+
8+
pub fn singleflight_example() {
9+
smol::block_on(async {
10+
let group = SingleFlight::new();
11+
let mut futures = Vec::new();
12+
for _ in 0..10 {
13+
futures.push(group.work("key", || async {
14+
println!("will sleep to simulate async task");
15+
smol::Timer::after(std::time::Duration::from_millis(100)).await;
16+
println!("real task done");
17+
"my-result"
18+
}));
19+
}
20+
21+
for fut in futures.into_iter() {
22+
assert_eq!(fut.await, "my-result");
23+
println!("task finished");
24+
}
25+
});
26+
}
27+
28+
const RES: usize = 7;
29+
30+
async fn expensive_fn() -> Result<usize, ()> {
31+
smol::Timer::after(std::time::Duration::from_millis(100)).await;
32+
Ok(RES)
33+
}
34+
35+
pub fn async_singleflight_example() {
36+
smol::block_on(async {
37+
let g = Arc::new(Group::<_, ()>::new());
38+
let mut handlers = Vec::new();
39+
for _ in 0..10 {
40+
let g = g.clone();
41+
handlers.push(smol::spawn(async move {
42+
let res = g.work("key", expensive_fn()).await.0;
43+
let r = res.unwrap();
44+
println!("{}", r);
45+
}));
46+
}
47+
48+
join_all(handlers).await;
49+
});
50+
}

0 commit comments

Comments
 (0)