Skip to content

Commit c455958

Browse files
committed
fix a bug where racing asyncs might cause resource contention
1 parent df6e49f commit c455958

File tree

7 files changed

+151
-30
lines changed

7 files changed

+151
-30
lines changed

.github/dependabot.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
version: 2
2+
updates:
3+
- package-ecosystem: "github-actions"
4+
directory: "/"
5+
# Check for updates every Monday
6+
schedule:
7+
interval: "weekly"
8+
open-pull-requests-limit: 10

crates/apecs/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ description = "An asyncronous and parallel entity-component system"
1313
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1414

1515
[features]
16-
default = ["derive"]
16+
default = ["derive", "panic_on_async_holdup"]
17+
panic_on_async_holdup = []
1718
derive = []
1819

1920
[lib]
@@ -40,4 +41,5 @@ smallvec = "^1.9"
4041
[dev-dependencies]
4142
async-timer = "^0.7"
4243
env_logger = "^0.9"
44+
futures-lite = "1.12.0"
4345
wasm-bindgen-test = "^0.3"

crates/apecs/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
#![allow(clippy::type_complexity)]
1010

1111
use ::anyhow::Context;
12-
use chan::oneshot;
12+
use chan::spsc;
1313
use internal::{FetchReadyResource, LoanManager, Resource, ResourceId};
1414
use rayon::iter::IntoParallelIterator;
1515
use std::{
@@ -336,14 +336,15 @@ impl<T> Gen<T> for NoDefault {
336336
/// fetch will err if the resource doesn't already exist.
337337
///
338338
/// After a successful fetch, the resource will be automatically sent back to
339-
/// the world on drop. For this reason, when writing async systems you must take
340-
/// care to drop the fetched resources before your code crosses an await point.
339+
/// the world on drop. To make sure that your async functions don't hold fetched
340+
/// resources over await points, [`Facade`] uses [`visit`](Facade::visit) which
341+
/// fetches inside a syncronous closure.
341342
///
342343
/// `Write` has two type parameters:
343344
/// * `T` - The type of the resource.
344345
/// * `G` - The method by which the resource can be generated if it doesn't
345-
/// exist. By default this is [`SomeDefault`], which denotes creating the
346-
/// resource using its default instance. Another option is [`NoDefault`] which
346+
/// already exist. By default this is [`SomeDefault`], which denotes creating the
347+
/// resource using its default implementation. Another option is [`NoDefault`] which
347348
/// fails to generate the resource.
348349
///
349350
/// ```rust
@@ -449,8 +450,7 @@ impl<T: IsResource, G: Gen<T>> Write<T, G> {
449450
/// fetch will err if the resource doesn't already exist.
450451
///
451452
/// After a successful fetch, the resource will be automatically sent back to
452-
/// the world on drop. For this reason, when writing async systems you must take
453-
/// care to drop the fetched resources before your code crosses an await point.
453+
/// the world on drop.
454454
///
455455
/// `Read` has two type parameters:
456456
/// * `T` - The type of the resource.
@@ -523,7 +523,7 @@ impl<T: IsResource, G: Gen<T>> Read<T, G> {
523523
pub(crate) struct Request {
524524
pub borrows: Vec<internal::Borrow>,
525525
pub construct: fn(&mut LoanManager<'_>) -> anyhow::Result<Resource>,
526-
pub deploy_tx: oneshot::Sender<Resource>,
526+
pub deploy_tx: spsc::Sender<Resource>,
527527
}
528528

529529
impl std::fmt::Debug for Request {

crates/apecs/src/resource_manager.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ impl ResourceManager {
169169
/// and unwrapping shared references.
170170
///
171171
/// Returns `Ok(true)` if resources are still on loan.
172+
///
173+
/// Errs if a system returns a resource that was not loaned. This doesn't
174+
/// actually happen.
172175
pub fn try_unify_resources(&mut self, label: &str) -> anyhow::Result<bool> {
173176
log::trace!("try unify resources {}", label);
174177
while let Ok((rez_id, resource)) = self.exclusive_return_chan.1.try_recv() {
@@ -192,12 +195,10 @@ impl ResourceManager {
192195
"duplicate resources"
193196
),
194197
Err(arc_rez) => {
195-
log::warn!(
198+
log::error!(
196199
"could not retreive borrowed resource {:?}, it is still borrowed by \
197-
{} - for better performance, try not to hold loaned resources over \
198-
an await point",
199-
id.name,
200-
label,
200+
'{}' - do not to hold loaned resources over an await point",
201+
id.name, label,
201202
);
202203
let _ = self.loaned_refs.insert(id, arc_rez);
203204
}

crates/apecs/src/system.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -368,29 +368,39 @@ impl<'a> IsBatch for AsyncBatch<'a> {
368368
resource_manager: &mut ResourceManager,
369369
) -> anyhow::Result<()> {
370370
let mut loan_mngr = LoanManager(resource_manager);
371-
let mut systems = self.take_systems();
371+
let systems = self.take_systems();
372372
let mut data = VecDeque::new();
373373
for sys in systems.iter() {
374374
data.push_back(sys.prep(&mut loan_mngr)?);
375375
}
376376
drop(loan_mngr);
377377

378-
systems.retain_mut(|system| {
378+
for system in systems.into_iter() {
379379
let data: Resource = data.pop_front().unwrap();
380380
if system.0.resource_request_rx.is_closed() {
381381
log::trace!(
382382
"system '{}' closed after requesting resources",
383383
system.name()
384384
);
385-
return false;
385+
continue;
386386
}
387387
// send the resources off, if need be
388-
log::trace!("sending resources to async '{}'", system.name());
389-
let _ = system.run(data).unwrap();
390-
true
391-
});
392-
393-
self.set_systems(systems);
388+
if !system.1.deploy_tx.is_closed() {
389+
log::trace!(
390+
"sending resource '{}' to async '{}'",
391+
data.type_name().unwrap_or("unknown"),
392+
system.name()
393+
);
394+
// UNWRAP: safe because we checked above that the channel is still open
395+
system.1.deploy_tx.try_send(data).unwrap();
396+
} else {
397+
log::trace!(
398+
"cancelling send of resource '{}' to async '{}'",
399+
data.type_name().unwrap_or("unknown"),
400+
system.name()
401+
);
402+
}
403+
}
394404

395405
fn tick(executor: &async_executor::Executor<'static>) {
396406
while executor.try_tick() {
@@ -406,9 +416,10 @@ impl<'a> IsBatch for AsyncBatch<'a> {
406416
} else {
407417
tick(extra);
408418
}
419+
409420
let resources_still_loaned = resource_manager.try_unify_resources("async batch")?;
410421
if resources_still_loaned {
411-
log::warn!(
422+
panic!(
412423
"an async system is holding onto resources over an await point! systems:{:#?}",
413424
self.systems()
414425
.iter()

crates/apecs/src/world.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ impl Facade {
5151
// request the resources from the world
5252
// these requests are gathered in World::tick_async into an AsyncSchedule
5353
// and then run with the executor and resource maanger
54-
let (deploy_tx, deploy_rx) = oneshot::oneshot();
54+
let (deploy_tx, deploy_rx) = spsc::bounded(1);
55+
// UNWRAP: safe because the request channel is unbounded
5556
self.resource_request_tx
5657
.try_send(Request {
5758
borrows,
@@ -65,13 +66,11 @@ impl Facade {
6566
},
6667
deploy_tx,
6768
})
68-
.context("could not send request for resources")?;
69-
log::trace!("'{}' requested", std::any::type_name::<D>());
70-
69+
.unwrap();
7170
let rez: Resource = deploy_rx
71+
.recv()
7272
.await
73-
.map_err(|_| anyhow::anyhow!("could not fetch resources"))?;
74-
log::trace!("'{}' received", std::any::type_name::<D>());
73+
.unwrap();
7574
let box_d: Box<D> = rez.downcast().map_err(|rez| {
7675
anyhow::anyhow!(
7776
"Facade could not downcast resource '{}' to '{}'",
@@ -1420,4 +1419,14 @@ mod test {
14201419
assert_eq!(9, deleted_strings[0].id());
14211420
}
14221421
}
1422+
1423+
#[test]
1424+
fn spsc_drop_sanity() {
1425+
// ensure the spsc channel delivers messages when the sender was dropped after
1426+
// sending and before receiving
1427+
let (tx, rx) = spsc::bounded::<()>(1);
1428+
tx.try_send(()).unwrap();
1429+
drop(tx);
1430+
assert!(rx.try_recv().is_ok());
1431+
}
14231432
}

crates/apecs/tests/regression.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
//! Tests for bugs we've encountered.
2+
use std::sync::Arc;
3+
4+
use ::anyhow::Context;
5+
use apecs::{anyhow, chan::mpmc::Channel, ok, Facade, Read, ShouldContinue, World, Write};
6+
use futures_lite::StreamExt;
7+
8+
#[test]
9+
fn can_race_asyncs() {
10+
// ensures that resources are dropped after being sent to
11+
// an async that is caling Facade::visit, but then gets
12+
// cancelled
13+
14+
//let _ = env_logger::builder()
15+
// .is_test(true)
16+
// .filter_level(log::LevelFilter::Trace)
17+
// .try_init();
18+
19+
// each tick this increments a counter by 1
20+
// when the counter reaches 3 it fires an event
21+
fn ticker(
22+
(mut chan, mut tick): (Write<Channel<()>>, Write<usize>),
23+
) -> anyhow::Result<ShouldContinue> {
24+
*tick += 1;
25+
println!("ticked {}", *tick);
26+
if *tick == 3 {
27+
chan.try_send(()).unwrap();
28+
}
29+
ok()
30+
}
31+
32+
// loops acquiring a unit resource and reading it
33+
async fn loser(facade: &mut Facade) -> anyhow::Result<()> {
34+
loop {
35+
println!("loser awaiting Read<()>");
36+
facade
37+
.visit(|unit: Read<()>| {
38+
println!("loser got Read<()>");
39+
let () = *unit;
40+
Ok(())
41+
})
42+
.await?;
43+
}
44+
}
45+
46+
// races the losing async against awaiting an event from ticker
47+
// after ticker wins the race it should be able to access the unit
48+
// resource
49+
async fn system(mut facade: Facade) -> anyhow::Result<()> {
50+
let mut rx = facade
51+
.visit(|chan: Read<Channel<()>>| Ok(chan.new_receiver()))
52+
.await?;
53+
54+
{
55+
futures_lite::future::or(
56+
async {
57+
rx.next().await.context("impossible")?;
58+
anyhow::Ok(())
59+
},
60+
async {
61+
loser(&mut facade).await?;
62+
panic!("this was supposed to lose the race");
63+
},
64+
)
65+
.await?;
66+
}
67+
68+
println!("race is over");
69+
70+
facade
71+
.visit(|unit: Read<()>| {
72+
let () = *unit;
73+
Ok(())
74+
})
75+
.await?;
76+
77+
Ok(())
78+
}
79+
80+
let mut world = World::default();
81+
world
82+
.with_system("ticker", ticker)
83+
.unwrap()
84+
.with_async_system("system", system);
85+
86+
world.tick();
87+
world.tick();
88+
world.tick();
89+
world.tick();
90+
}

0 commit comments

Comments
 (0)