diff --git a/ts_ffi/examples/lookup_peer.c b/ts_ffi/examples/lookup_peer.c index f0416058..3b2e0e0c 100644 --- a/ts_ffi/examples/lookup_peer.c +++ b/ts_ffi/examples/lookup_peer.c @@ -33,13 +33,10 @@ int main(int argc, char** argv) { ); assert(dev); - // Wait for device to receive its IP: first netmap has been received. ts_in_addr_t addrv4; ts_in6_addr_t addrv6; char addr_str[INET6_ADDRSTRLEN]; - assert(!ts_ipv4_addr(dev, &addrv4)); - if (ts_peer_ipv4_addr(dev, argv[3], &addrv4) <= 0) { return EXIT_FAILURE; } diff --git a/ts_runtime/src/peer_tracker.rs b/ts_runtime/src/peer_tracker.rs index 20432c8f..0b1afd48 100644 --- a/ts_runtime/src/peer_tracker.rs +++ b/ts_runtime/src/peer_tracker.rs @@ -8,6 +8,7 @@ use std::{ use kameo::{ actor::ActorRef, message::{Context, Message}, + reply::ReplySender, }; use ts_control::{Node, NodeId}; use ts_keys::NodePublicKey; @@ -18,9 +19,17 @@ use crate::{Error, env::Env}; pub struct PeerTracker { peers: HashMap, id_to_nodekey: HashMap, + seen_state_update: bool, + pending_requests: Vec, env: Env, } +impl PeerTracker { + fn peer_by_name_opt(&self, name: &str) -> Option<&Node> { + self.peers.values().find(|&peer| peer.matches_name(name)) + } +} + impl kameo::Actor for PeerTracker { type Args = Env; type Error = Error; @@ -31,27 +40,52 @@ impl kameo::Actor for PeerTracker { Ok(Self { peers: Default::default(), id_to_nodekey: Default::default(), + pending_requests: Default::default(), + seen_state_update: false, env, }) } } +enum Pending { + PeerByName(PeerByName, ReplySender>), +} + // For messages with arguments, a struct is generated with the args as fields. They aren't // documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where // docs are turned off everywhere. #[allow(missing_docs)] mod msg_impl { + use kameo::prelude::DelegatedReply; + use super::*; #[kameo::messages] impl PeerTracker { /// Lookup a peer by name. - #[message] - pub fn peer_by_name(&self, name: String) -> Option { - self.peers - .values() - .find(|&peer| peer.matches_name(&name)) - .cloned() + /// + /// Waits until we've received at least one peer update from control. + #[message(ctx)] + pub async fn peer_by_name( + &mut self, + ctx: &mut Context>>, + name: String, + ) -> DelegatedReply> { + let (deleg, sender) = ctx.reply_sender(); + let Some(sender) = sender else { return deleg }; + + if !self.seen_state_update { + tracing::debug!(query = name, "no peer state seen yet, queueing request"); + + self.pending_requests + .push(Pending::PeerByName(PeerByName { name }, sender)); + + return deleg; + } + + sender.send(self.peer_by_name_opt(&name).cloned()); + + deleg } } } @@ -130,6 +164,25 @@ impl Message> for PeerTracker { "new peer state" ); + if !self.seen_state_update { + self.seen_state_update = true; + + if !self.pending_requests.is_empty() { + tracing::debug!( + n_pending = self.pending_requests.len(), + "state update received, servicing pending requests" + ); + } + + for req in core::mem::take(&mut self.pending_requests) { + match req { + Pending::PeerByName(PeerByName { name }, reply) => { + reply.send(self.peer_by_name_opt(&name).cloned()); + } + } + } + } + if let Err(e) = self .env .publish(PeerState {