From 77b072adc2f4912e06db175e9c08320491193a5b Mon Sep 17 00:00:00 2001 From: Tei Roberts Date: Sun, 29 Oct 2023 01:19:05 +0200 Subject: [PATCH] feat: allow access to storage in event subscription --- asteroids/src/main.rs | 2 +- src/archetype/guard.rs | 22 +--- src/archetype/mod.rs | 89 +++++++------- src/events.rs | 236 +++++++++++++++++++++++--------------- src/lib.rs | 2 + src/metadata/relation.rs | 2 +- src/sink.rs | 44 +++++++ tests/change_detection.rs | 2 +- tests/subscribing.rs | 93 ++++++++++++++- 9 files changed, 331 insertions(+), 161 deletions(-) create mode 100644 src/sink.rs diff --git a/asteroids/src/main.rs b/asteroids/src/main.rs index 9b458b8..1c27174 100644 --- a/asteroids/src/main.rs +++ b/asteroids/src/main.rs @@ -71,7 +71,7 @@ async fn main() -> Result<()> { world.subscribe( player_dead_tx .filter_components([player().key()]) - .filter(|v| v.kind == EventKind::Removed), + .filter(|v, _| v == EventKind::Removed), ); // Setup everything required for the game logic and physics diff --git a/src/archetype/guard.rs b/src/archetype/guard.rs index 932fc0f..6b05902 100644 --- a/src/archetype/guard.rs +++ b/src/archetype/guard.rs @@ -114,13 +114,11 @@ impl<'a, T: Debug + ?Sized> Debug for CellGuard<'a, T> { /// A mutable reference to an entity's component with deferred change tracking. /// -/// A modification invent is only generated *if* if this is mutably dereferenced. +/// A modification invent is only generated *iff* this is mutably dereferenced. pub struct RefMut<'a, T> { guard: CellMutGuard<'a, T>, - id: Entity, slot: Slot, - modified: bool, tick: u32, } @@ -138,7 +136,6 @@ impl<'a, T: ComponentValue> RefMut<'a, T> { guard, id, slot, - modified: false, tick, }) } @@ -162,19 +159,10 @@ impl<'a, T> Deref for RefMut<'a, T> { impl<'a, T> DerefMut for RefMut<'a, T> { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { - self.modified = true; - self.guard.get_mut() - } -} + self.guard + .data + .set_modified(&[self.id], Slice::single(self.slot), self.tick); -impl<'a, T> Drop for RefMut<'a, T> { - #[inline] - fn drop(&mut self) { - if self.modified { - // SAFETY: `value` is not accessed beyond this point - self.guard - .data - .set_modified(&[self.id], Slice::single(self.slot), self.tick) - } + self.guard.get_mut() } } diff --git a/src/archetype/mod.rs b/src/archetype/mod.rs index ab325ec..8c6af9f 100644 --- a/src/archetype/mod.rs +++ b/src/archetype/mod.rs @@ -10,7 +10,7 @@ use itertools::Itertools; use crate::{ component::{ComponentDesc, ComponentKey, ComponentValue}, - events::{EventData, EventKind, EventSubscriber}, + events::{EventData, EventSubscriber}, writer::ComponentUpdater, Component, Entity, }; @@ -97,30 +97,48 @@ pub(crate) struct CellData { impl CellData { /// Sets the specified entities and slots as modified and invokes subscribers /// **Note**: `ids` must be the slice of entities pointed to by `slice` - pub(crate) fn set_modified(&mut self, ids: &[Entity], slice: Slice, change_tick: u32) { - debug_assert_eq!(ids.len(), slice.len()); - let component = self.key; - self.on_event(EventData { + pub(crate) fn set_modified(&mut self, ids: &[Entity], slots: Slice, change_tick: u32) { + debug_assert_eq!(ids.len(), slots.len()); + let event = EventData { ids, - key: component, - kind: EventKind::Modified, - }); + slots, + key: self.key, + }; + + for handler in self.subscribers.iter() { + handler.on_modified(&event) + } self.changes - .set_modified_if_tracking(Change::new(slice, change_tick)); + .set_modified_if_tracking(Change::new(slots, change_tick)); } /// Sets the specified entities and slots as modified and invokes subscribers /// **Note**: `ids` must be the slice of entities pointed to by `slice` - pub(crate) fn set_added(&mut self, ids: &[Entity], slice: Slice, change_tick: u32) { - let component = self.key; - self.on_event(EventData { + pub(crate) fn set_added(&mut self, ids: &[Entity], slots: Slice, change_tick: u32) { + let event = EventData { ids, - key: component, - kind: EventKind::Added, - }); + slots, + key: self.key, + }; - self.changes.set_added(Change::new(slice, change_tick)); + for handler in self.subscribers.iter() { + handler.on_added(&self.storage, &event); + } + + self.changes.set_added(Change::new(slots, change_tick)); + } + + pub(crate) fn set_removed(&mut self, ids: &[Entity], slots: Slice) { + let event = EventData { + ids, + slots, + key: self.key, + }; + + for handler in self.subscribers.iter() { + handler.on_removed(&self.storage, &event); + } } } @@ -280,15 +298,6 @@ pub struct Archetype { unsafe impl Send for Cell {} unsafe impl Sync for Cell {} -impl CellData { - #[inline] - fn on_event(&self, event: EventData) { - for handler in self.subscribers.iter() { - handler.on_event(&event) - } - } -} - impl Archetype { pub(crate) fn empty() -> Self { Self { @@ -660,11 +669,7 @@ impl Archetype { cell.move_to(slot, dst_cell, dst_slot); } else { // Notify the subscribers that the component was removed - data.on_event(EventData { - ids: &[id], - key, - kind: EventKind::Removed, - }); + data.set_removed(&[id], Slice::single(slot)); cell.take(slot, &mut on_drop); } @@ -697,11 +702,7 @@ impl Archetype { for cell in self.cells.values_mut() { let data = cell.data.get_mut(); // data.on_event(&self.entities, Slice::single(slot), EventKind::Removed); - data.on_event(EventData { - ids: &[id], - key: data.key, - kind: EventKind::Removed, - }); + data.set_removed(&[id], Slice::single(slot)); cell.take(slot, &mut on_move) } @@ -769,11 +770,7 @@ impl Archetype { // unsafe { dst.storage.get_mut().append(storage) } } else { // Notify the subscribers that the component was removed - data.on_event(EventData { - ids: &entities[slots.as_range()], - key: data.key, - kind: EventKind::Removed, - }); + data.set_removed(&entities[slots.as_range()], slots); cell.clear(); } @@ -806,11 +803,7 @@ impl Archetype { let data = cell.data.get_mut(); // Notify the subscribers that the component was removed // data.on_event(&self.entities, slots, EventKind::Removed); - data.on_event(EventData { - ids: &self.entities[slots.as_range()], - key: data.key, - kind: EventKind::Removed, - }); + data.set_removed(&self.entities[slots.as_range()], slots); cell.clear() } @@ -859,11 +852,7 @@ impl Archetype { let slots = self.slots(); for cell in self.cells.values_mut() { let data = cell.data.get_mut(); - data.on_event(EventData { - ids: &self.entities[slots.as_range()], - key: data.key, - kind: EventKind::Removed, - }) + data.set_removed(&self.entities[slots.as_range()], slots) } ArchetypeDrain { diff --git a/src/events.rs b/src/events.rs index 452dedf..d102bbd 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,10 +1,12 @@ use alloc::vec::Vec; +use itertools::Itertools; use crate::{ - archetype::Archetype, + archetype::{Archetype, Slice, Storage}, component::{ComponentDesc, ComponentKey, ComponentValue}, filter::StaticFilter, - Entity, + sink::Sink, + Component, Entity, }; #[derive(Debug, Clone, PartialEq, Eq)] @@ -33,17 +35,33 @@ pub enum EventKind { pub struct EventData<'a> { /// The affected entities pub ids: &'a [Entity], + /// The affected slots + pub slots: Slice, /// The affected component pub key: ComponentKey, - /// The kind of event - pub kind: EventKind, } /// Allows subscribing to events *inside* the ECS, such as components being added, removed, or /// modified. +/// +/// Most implementations are through the [`Sink`] implementation, which sends a static event for +/// each entity affected by the event. pub trait EventSubscriber: ComponentValue { /// Handle an incoming event - fn on_event(&self, event: &EventData); + fn on_added(&self, storage: &Storage, event: &EventData); + /// Handle an incoming event + /// + /// **Note**: Component storage is inaccessible during this call as it may be called *during* + /// itereation or while a query borrow is alive. + /// + /// Prefer to use this for cache validation and alike, as it *will* be called for intermediate + /// events. + fn on_modified(&self, event: &EventData); + /// Handle an incoming event + fn on_removed(&self, storage: &Storage, event: &EventData); + + /// Returns true if the subscriber is still connected + fn is_connected(&self) -> bool; /// Returns true if the subscriber is interested in this archetype #[inline] @@ -57,14 +75,11 @@ pub trait EventSubscriber: ComponentValue { true } - /// Returns true if the subscriber is still connected - fn is_connected(&self) -> bool; - /// Filter each event before it is generated through a custom function fn filter(self, func: F) -> FilterFunc where Self: Sized, - F: Fn(&EventData) -> bool, + F: Fn(EventKind, &EventData) -> bool, { FilterFunc { subscriber: self, @@ -99,49 +114,112 @@ pub trait EventSubscriber: ComponentValue { } #[cfg(feature = "flume")] -impl EventSubscriber for flume::Sender { - fn on_event(&self, event: &EventData) { +impl EventSubscriber for S +where + S: 'static + Send + Sync + Sink, +{ + fn on_added(&self, _: &Storage, event: &EventData) { for &id in event.ids { - let _ = self.send(Event { + self.send(Event { id, key: event.key, - kind: event.kind, + kind: EventKind::Added, }); } } - fn is_connected(&self) -> bool { - !self.is_disconnected() + fn on_modified(&self, event: &EventData) { + for &id in event.ids { + self.send(Event { + id, + key: event.key, + kind: EventKind::Modified, + }); + } } -} -#[cfg(feature = "tokio")] -impl EventSubscriber for tokio::sync::mpsc::UnboundedSender { - fn on_event(&self, event: &EventData) { + fn on_removed(&self, _: &Storage, event: &EventData) { for &id in event.ids { - let _ = self.send(Event { + self.send(Event { id, key: event.key, - kind: event.kind, + kind: EventKind::Removed, }); } } fn is_connected(&self) -> bool { - !self.is_closed() + >::is_connected(self) + } +} + +/// Receive the component value of an event +/// +/// This is a convenience wrapper around [`EventSubscriber`] that sends the component value along +/// +/// **Note**: This only tracks addition and removal of components, not modification. This is due to +/// a limitation with references lifetimes during iteration, as the values can't be accessed by the +/// subscriber simultaneously. +pub struct WithValue { + component: Component, + sink: S, +} + +impl WithValue { + /// Create a new `WithValue` subscriber + pub fn new(component: Component, sink: S) -> Self { + Self { component, sink } } } -#[cfg(feature = "tokio")] -impl EventSubscriber for alloc::sync::Weak { - fn on_event(&self, _: &EventData) { - if let Some(notify) = self.upgrade() { - notify.notify_one() +#[cfg(feature = "flume")] +impl> EventSubscriber + for WithValue +{ + fn on_added(&self, storage: &Storage, event: &EventData) { + let values = storage.downcast_ref::(); + for (&id, slot) in event.ids.iter().zip_eq(event.slots.as_range()) { + let value = values[slot].clone(); + + self.sink.send(( + Event { + id, + key: event.key, + kind: EventKind::Added, + }, + value, + )); + } + } + + fn on_modified(&self, _: &EventData) {} + + fn on_removed(&self, storage: &Storage, event: &EventData) { + let values = storage.downcast_ref::(); + for (&id, slot) in event.ids.iter().zip_eq(event.slots.as_range()) { + let value = values[slot].clone(); + + self.sink.send(( + Event { + id, + key: event.key, + kind: EventKind::Removed, + }, + value, + )); } } fn is_connected(&self) -> bool { - self.strong_count() > 0 + self.sink.is_connected() + } + + fn matches_component(&self, desc: ComponentDesc) -> bool { + self.component.desc() == desc + } + + fn matches_arch(&self, arch: &Archetype) -> bool { + arch.has(self.component.key()) } } @@ -156,9 +234,21 @@ where S: EventSubscriber, F: ComponentValue + StaticFilter, { + fn on_added(&self, storage: &Storage, event: &EventData) { + self.subscriber.on_added(storage, event) + } + + fn on_modified(&self, event: &EventData) { + self.subscriber.on_modified(event); + } + + fn on_removed(&self, storage: &Storage, event: &EventData) { + self.subscriber.on_removed(storage, event) + } + #[inline] - fn on_event(&self, event: &EventData) { - self.subscriber.on_event(event); + fn is_connected(&self) -> bool { + self.subscriber.is_connected() } #[inline] @@ -170,11 +260,6 @@ where fn matches_component(&self, desc: ComponentDesc) -> bool { self.subscriber.matches_component(desc) } - - #[inline] - fn is_connected(&self) -> bool { - self.subscriber.is_connected() - } } /// Filter the archetypes for which the subscriber will receive events @@ -186,12 +271,23 @@ pub struct FilterFunc { impl EventSubscriber for FilterFunc where S: EventSubscriber, - F: ComponentValue + Fn(&EventData) -> bool, + F: ComponentValue + Fn(EventKind, &EventData) -> bool, { - #[inline] - fn on_event(&self, event: &EventData) { - if (self.filter)(event) { - self.subscriber.on_event(event); + fn on_added(&self, storage: &Storage, event: &EventData) { + if (self.filter)(EventKind::Added, event) { + self.subscriber.on_added(storage, event) + } + } + + fn on_modified(&self, event: &EventData) { + if (self.filter)(EventKind::Modified, event) { + self.subscriber.on_modified(event) + } + } + + fn on_removed(&self, storage: &Storage, event: &EventData) { + if (self.filter)(EventKind::Removed, event) { + self.subscriber.on_removed(storage, event) } } @@ -221,9 +317,16 @@ impl EventSubscriber for FilterComponents where S: EventSubscriber, { - #[inline] - fn on_event(&self, event: &EventData) { - self.subscriber.on_event(event) + fn on_added(&self, storage: &Storage, event: &EventData) { + self.subscriber.on_added(storage, event) + } + + fn on_modified(&self, event: &EventData) { + self.subscriber.on_modified(event) + } + + fn on_removed(&self, storage: &Storage, event: &EventData) { + self.subscriber.on_removed(storage, event) } #[inline] @@ -241,50 +344,3 @@ where self.subscriber.is_connected() } } - -// #[cfg(feature = "flume")] -// TODO weak sender -// impl EventHandler for flume::WeakSender { -// fn on_event(&self, event: T) -> bool { -// self.send(event).is_ok() -// } -// } - -// #[cfg(feature = "tokio")] -// impl EventHandler for tokio::sync::mpsc::UnboundedSender { -// fn on_event(&self, event: BufferedEvent) { -// for id in event.arch.entities()[event.slots.as_range()] { -// self.send(Event { -// id, -// key: event.key, -// kind: event.kind, -// }) -// } -// } -// } - -// #[cfg(feature = "tokio")] -// impl EventHandler for tokio::sync::mpsc::Sender { -// fn on_event(&self, event: T) -> bool { -// todo!() -// } -// } - -// #[cfg(feature = "tokio")] -// impl EventHandler for tokio::sync::broadcast::Sender { -// fn on_event(&self, event: T) -> bool { -// todo!() -// } -// } - -// #[cfg(feature = "tokio")] -// impl EventHandler for alloc::sync::Weak { -// fn on_event(&self, _: BufferedEvent) -> bool { -// if let Some(notify) = self.upgrade() { -// notify.notify_one(); -// true -// } else { -// false -// } -// } -// } diff --git a/src/lib.rs b/src/lib.rs index 9e3d825..ad3fc2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -245,6 +245,8 @@ pub mod schedule; /// entities therein pub mod serialize; +/// Provides a sink trait for sending events +pub mod sink; /// Provides tuple utilities like `cloned` mod util; /// vtable implementation for dynamic dispatching diff --git a/src/metadata/relation.rs b/src/metadata/relation.rs index e5262ed..fc1e58d 100644 --- a/src/metadata/relation.rs +++ b/src/metadata/relation.rs @@ -65,7 +65,7 @@ mod test { let (tx, rx) = flume::unbounded(); world.subscribe( tx.filter_arch(a.with_relation()) - .filter(|v| v.key.id == a.id()), + .filter(|_, v| v.key.id == a.id()), ); type Expected<'a> = &'a [(Entity, Vec<(Entity, &'a Arc<()>)>)]; diff --git a/src/sink.rs b/src/sink.rs new file mode 100644 index 0000000..9fad9b0 --- /dev/null +++ b/src/sink.rs @@ -0,0 +1,44 @@ +/// Trait for sending or handling events. +/// +/// Used as the backbone for a subscriber. +pub trait Sink { + /// Send an event + fn send(&self, event: T); + /// Returns true if the sink is still connected + fn is_connected(&self) -> bool; +} + +#[cfg(feature = "flume")] +impl Sink for flume::Sender { + fn send(&self, event: T) { + let _ = self.send(event); + } + + fn is_connected(&self) -> bool { + !self.is_disconnected() + } +} + +#[cfg(feature = "tokio")] +impl Sink for tokio::sync::mpsc::UnboundedSender { + fn send(&self, event: T) { + let _ = self.send(event); + } + + fn is_connected(&self) -> bool { + !self.is_closed() + } +} + +#[cfg(feature = "tokio")] +impl Sink for alloc::sync::Weak { + fn send(&self, _: T) { + if let Some(notify) = self.upgrade() { + notify.notify_one() + } + } + + fn is_connected(&self) -> bool { + self.strong_count() > 0 + } +} diff --git a/tests/change_detection.rs b/tests/change_detection.rs index 13a6dde..3cfd196 100644 --- a/tests/change_detection.rs +++ b/tests/change_detection.rs @@ -41,7 +41,7 @@ fn change_detection() { world.subscribe( removed_tx .filter_components([rotation().key()]) - .filter(|v| v.kind == EventKind::Removed), + .filter(|kind, _| kind == EventKind::Removed), ); let mut rng = StdRng::seed_from_u64(83); diff --git a/tests/subscribing.rs b/tests/subscribing.rs index 909b7f4..189b68e 100644 --- a/tests/subscribing.rs +++ b/tests/subscribing.rs @@ -18,7 +18,7 @@ fn subscribing() { let mut world = World::new(); - let (tx, rx) = flume::unbounded::(); + let (tx, rx) = flume::unbounded(); world.subscribe(tx.filter_components([a().key()])); let id = Entity::builder() @@ -101,6 +101,97 @@ fn subscribing() { world.set(id2, b(), "Bar".to_string()).unwrap(); } +#[test] +#[cfg(feature = "flume")] +fn subscribing_with_value() { + use flax::{ + events::{Event, EventKind, WithValue}, + Entity, Query, World, + }; + use itertools::Itertools; + use pretty_assertions::assert_eq; + + let mut world = World::new(); + + let (tx, rx) = flume::unbounded::<(Event, i32)>(); + + world.subscribe(WithValue::new(a(), tx)); + + let id = Entity::builder() + .set(a(), 5) + .set(b(), "Foo".to_string()) + .spawn(&mut world); + + assert_eq!( + rx.drain().collect_vec(), + [( + Event { + id, + key: a().key(), + kind: flax::events::EventKind::Added, + }, + 5 + )] + ); + + let id2 = Entity::builder().set(a(), 7).spawn(&mut world); + world.set(id2, a(), 3).unwrap(); + + world.remove(id, a()).unwrap(); + + assert_eq!( + rx.drain().collect_vec(), + [ + ( + Event { + id: id2, + kind: EventKind::Added, + key: a().key(), + }, + 7 + ), + ( + Event { + id, + kind: EventKind::Removed, + key: a().key(), + }, + 5 + ) + ] + ); + + *world.get_mut(id2, a()).unwrap() = 1; + + world.set(id2, a(), 2).unwrap(); + + Query::new(a().as_mut()) + .borrow(&world) + .iter() + .for_each(|v| *v *= -1); + + Query::new(b().as_mut()) + .borrow(&world) + .iter() + .for_each(|v| v.push('!')); + + assert_eq!(world.remove(id2, a()).unwrap(), -2); + + assert_eq!( + rx.drain().collect_vec(), + [( + Event { + id: id2, + kind: EventKind::Removed, + key: a().key() + }, + -2 + )] + ); + + world.set(id2, b(), "Bar".to_string()).unwrap(); +} + #[tokio::test] #[cfg(feature = "tokio")] async fn tokio_subscribe() {