From a45aa8f6dd5c6e05938a3d0a23e8d404855f5ee3 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 27 May 2025 21:39:39 +0200 Subject: [PATCH 1/4] refactor: merge Join and Neighbor, add NeighborReply --- src/proto/hyparview.rs | 186 +++++++++++++++++++++++------------------ 1 file changed, 103 insertions(+), 83 deletions(-) diff --git a/src/proto/hyparview.rs b/src/proto/hyparview.rs index 0698feb7..09b6ca16 100644 --- a/src/proto/hyparview.rs +++ b/src/proto/hyparview.rs @@ -67,8 +67,12 @@ pub enum Timer { /// Messages that we can send and receive from peers within the topic. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub enum Message { - /// Sent to a peer if you want to join the swarm - Join(Option), + /// Request to add sender to an active view of recipient. If [`Neighbor::priority`] is + /// [`Priority::High`], the request cannot be denied. + NeighborRequest(NeighborRequest), + /// Reply to a [`NeighborRequest`] if the neighbor request is accepted. + /// If it's declined, a [`Message::Disconnect`] is sent instead. + NeighborReply(NeighborReply), /// When receiving Join, ForwardJoin is forwarded to the peer's ActiveView to introduce the /// new member. ForwardJoin(ForwardJoin), @@ -78,9 +82,6 @@ pub enum Message { /// Peers reply to [`Message::Shuffle`] requests with a random peers from their active and /// passive views. ShuffleReply(ShuffleReply), - /// Request to add sender to an active view of recipient. If [`Neighbor::priority`] is - /// [`Priority::High`], the request cannot be denied. - Neighbor(Neighbor), /// Request to disconnect from a peer. /// If [`Disconnect::alive`] is true, the other peer is not shutting down, so it should be /// added to the passive set. @@ -138,7 +139,7 @@ pub struct ShuffleReply { /// The priority of a `Join` message /// /// This is `High` if the sender does not have any active peers, and `Low` otherwise. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub enum Priority { /// High priority join that may not be denied. /// @@ -151,13 +152,38 @@ pub enum Priority { /// A neighbor message is sent after adding a peer to our active view to inform them that we are /// now neighbors. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct Neighbor { - /// The priority of the `Join` or `ForwardJoin` message that triggered this neighbor request. - priority: Priority, - /// The user data of the peer sending this message. +pub struct NeighborRequest { + reason: NeighborReason, + /// The user data of tyou want to join the swarm data: Option, } +/// Reason why we want to become someone's neighbor. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub enum NeighborReason { + /// We are joining the swarm. + Join, + /// We received a forward join from the peer and react on it. + ReplyToForwardJoin { priority: Priority }, + /// We are activating a passive peer. + Activate { priority: Priority }, +} + +impl NeighborReason { + fn priority(&self) -> Priority { + match self { + Self::Join => Priority::High, + Self::ReplyToForwardJoin { priority } => *priority, + Self::Activate { priority } => *priority, + } + } +} + +/// A neighbor message is sent after adding a peer to our active view to inform them that we are +/// now neighbors. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct NeighborReply {} + /// Message sent when leaving the swarm or closing down to inform peers about us being gone. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct Disconnect { @@ -304,11 +330,11 @@ where self.stats.total_connections += 1; } match message { - Message::Join(data) => self.on_join(from, data, io), Message::ForwardJoin(details) => self.on_forward_join(from, details, io), Message::Shuffle(details) => self.on_shuffle(from, details, io), Message::ShuffleReply(details) => self.on_shuffle_reply(details, io), - Message::Neighbor(details) => self.on_neighbor(from, details, io), + Message::NeighborRequest(details) => self.on_neighbor_request(from, details, io), + Message::NeighborReply(_) => self.on_neighbor_reply(from, io), Message::Disconnect(details) => self.on_disconnect(from, details, io), } @@ -320,10 +346,7 @@ where } fn handle_join(&mut self, peer: PI, io: &mut impl IO) { - io.push(OutEvent::SendMessage( - peer, - Message::Join(self.me_data.clone()), - )); + self.send_neighbor_request(peer, NeighborReason::Join, io); } /// We received a disconnect message. @@ -364,11 +387,8 @@ where // Before disconnecting, send a `ShuffleReply` with some of our nodes to // prevent the other node from running out of connections. This is especially // relevant if the other node just joined the swarm. - self.send_shuffle_reply( - peer, - self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count, - io, - ); + let len = self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count; + self.send_shuffle_reply(peer, len, io); let message = Message::Disconnect(Disconnect { alive, _respond: false, @@ -377,45 +397,27 @@ where io.push(OutEvent::DisconnectPeer(peer)); } - fn on_join(&mut self, peer: PI, data: Option, io: &mut impl IO) { - // "A node that receives a join request will start by adding the new - // node to its active view, even if it has to drop a random node from it. (6)" - self.add_active(peer, data.clone(), Priority::High, true, io); - - // "The contact node c will then send to all other nodes in its active view a ForwardJoin - // request containing the new node identifier. Associated to the join procedure, - // there are two configuration parameters, named Active Random Walk Length (ARWL), - // that specifies the maximum number of hops a ForwardJoin request is propagated, - // and Passive Random Walk Length (PRWL), that specifies at which point in the walk the node - // is inserted in a passive view. To use these parameters, the ForwardJoin request carries - // a “time to live” field that is initially set to ARWL and decreased at every hop. (7)" - let ttl = self.config.active_random_walk_length; - let peer_info = PeerInfo { id: peer, data }; - for node in self.active_view.iter_without(&peer) { - let message = Message::ForwardJoin(ForwardJoin { - peer: peer_info.clone(), - ttl, - }); - io.push(OutEvent::SendMessage(*node, message)); - } - } - fn on_forward_join(&mut self, sender: PI, message: ForwardJoin, io: &mut impl IO) { let peer_id = message.peer.id; // If the peer is already in our active view, we renew our neighbor relationship. - if self.active_view.contains(&peer_id) { - self.insert_peer_info(message.peer, io); - self.send_neighbor(peer_id, Priority::High, io); - } // "i) If the time to live is equal to zero or if the number of nodes in p’s active view is equal to one, // it will add the new node to its active view (7)" - else if message.ttl.expired() || self.active_view.len() <= 1 { - self.insert_peer_info(message.peer, io); + if self.active_view.contains(&peer_id) + || message.ttl.expired() + || self.active_view.len() <= 1 + { // Modification from paper: Instead of adding the peer directly to our active view, // we only send the Neighbor message. We will add the peer to our active view once we receive a // reply from our neighbor. // This prevents us adding unreachable peers to our active view. - self.send_neighbor(peer_id, Priority::High, io); + self.insert_peer_info(message.peer, io); + self.send_neighbor_request( + peer_id, + NeighborReason::ReplyToForwardJoin { + priority: Priority::High, + }, + io, + ); } else { // "ii) If the time to live is equal to PRWL, p will insert the new node into its passive view" if message.ttl == self.config.passive_random_walk_length { @@ -447,15 +449,46 @@ where } } - fn on_neighbor(&mut self, from: PI, details: Neighbor, io: &mut impl IO) { - let is_reply = self.pending_neighbor_requests.remove(&from); - let do_reply = !is_reply; + fn on_neighbor_request(&mut self, from: PI, details: NeighborRequest, io: &mut impl IO) { + let NeighborRequest { reason, data } = details; // "A node q that receives a high priority neighbor request will always accept the request, even // if it has to drop a random member from its active view (again, the member that is dropped will // receive a Disconnect notification). If a node q receives a low priority Neighbor request, it will // only accept the request if it has a free slot in its active view, otherwise it will refuse the request." - if !self.add_active(from, details.data, details.priority, do_reply, io) { + self.insert_peer_info((from, data.clone()).into(), io); + if !self.add_active(from, reason.priority(), io) { self.send_disconnect(from, true, io); + } else { + self.send_neighbor_reply(from, io); + } + + if let NeighborReason::Join = reason { + // "The contact node c will then send to all other nodes in its active view a ForwardJoin + // request containing the new node identifier. Associated to the join procedure, + // there are two configuration parameters, named Active Random Walk Length (ARWL), + // that specifies the maximum number of hops a ForwardJoin request is propagated, + // and Passive Random Walk Length (PRWL), that specifies at which point in the walk the node + // is inserted in a passive view. To use these parameters, the ForwardJoin request carries + // a “time to live” field that is initially set to ARWL and decreased at every hop. (7)" + let ttl = self.config.active_random_walk_length; + let peer_info = PeerInfo { id: from, data }; + for node in self.active_view.iter_without(&from) { + let message = Message::ForwardJoin(ForwardJoin { + peer: peer_info.clone(), + ttl, + }); + io.push(OutEvent::SendMessage(*node, message)); + } + } + } + + fn on_neighbor_reply(&mut self, from: PI, io: &mut impl IO) { + if self.pending_neighbor_requests.remove(&from) { + if !self.add_active(from, Priority::High, io) { + self.send_disconnect(from, true, io); + } + } else { + debug!(?from, "Received unsolicited neighbor reply") } } @@ -619,7 +652,7 @@ where true => Priority::High, false => Priority::Low, }; - self.send_neighbor(node, priority, io); + self.send_neighbor_request(node, NeighborReason::Activate { priority }, io); // schedule a timer that checks if the node replied with a neighbor message, // otherwise try again with another passive node. io.push(OutEvent::ScheduleTimer( @@ -692,22 +725,13 @@ where /// If the active view is currently full, a random peer will be removed first. /// Sends a Neighbor message to the peer. If high_priority is true, the peer /// may not deny the Neighbor request. - fn add_active( - &mut self, - peer: PI, - data: Option, - priority: Priority, - reply: bool, - io: &mut impl IO, - ) -> bool { + /// + /// Returns `true` if the peer is now in the active view. + fn add_active(&mut self, peer: PI, priority: Priority, io: &mut impl IO) -> bool { if peer == self.me { return false; } - self.insert_peer_info((peer, data).into(), io); if self.active_view.contains(&peer) { - if reply { - self.send_neighbor(peer, priority, io); - } return true; } match (priority, self.active_is_full()) { @@ -715,43 +739,39 @@ where if is_full { self.free_random_slot_in_active_view(io); } - self.add_active_unchecked(peer, Priority::High, reply, io); + self.add_active_unchecked(peer, io); true } (Priority::Low, false) => { - self.add_active_unchecked(peer, Priority::Low, reply, io); + self.add_active_unchecked(peer, io); true } (Priority::Low, true) => false, } } - fn add_active_unchecked( - &mut self, - peer: PI, - priority: Priority, - reply: bool, - io: &mut impl IO, - ) { + fn add_active_unchecked(&mut self, peer: PI, io: &mut impl IO) { self.passive_view.remove(&peer); if self.active_view.insert(peer) { debug!(other = ?peer, "add to active view"); io.push(OutEvent::EmitEvent(Event::NeighborUp(peer))); - if reply { - self.send_neighbor(peer, priority, io); - } } } - fn send_neighbor(&mut self, peer: PI, priority: Priority, io: &mut impl IO) { + fn send_neighbor_request(&mut self, peer: PI, reason: NeighborReason, io: &mut impl IO) { if self.pending_neighbor_requests.insert(peer) { - let message = Message::Neighbor(Neighbor { - priority, + let message = Message::NeighborRequest(NeighborRequest { + reason, data: self.me_data.clone(), }); io.push(OutEvent::SendMessage(peer, message)); } } + + fn send_neighbor_reply(&mut self, peer: PI, io: &mut impl IO) { + let message = Message::NeighborReply(NeighborReply {}); + io.push(OutEvent::SendMessage(peer, message)); + } } #[derive(Debug)] From 5e0b5b3eb454e398a1db0e83c80a1aceb9164411 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 27 May 2025 21:44:02 +0200 Subject: [PATCH 2/4] refactor: remove obsolete _respond field from Disconnect --- src/proto/hyparview.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/proto/hyparview.rs b/src/proto/hyparview.rs index 09b6ca16..952850e4 100644 --- a/src/proto/hyparview.rs +++ b/src/proto/hyparview.rs @@ -190,8 +190,6 @@ pub struct Disconnect { /// Whether we are actually shutting down or closing the connection only because our limits are /// reached. alive: bool, - /// Obsolete field (kept in the struct to maintain wire compatibility). - _respond: bool, } /// Configuration for the swarm membership layer @@ -389,10 +387,7 @@ where // relevant if the other node just joined the swarm. let len = self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count; self.send_shuffle_reply(peer, len, io); - let message = Message::Disconnect(Disconnect { - alive, - _respond: false, - }); + let message = Message::Disconnect(Disconnect { alive }); io.push(OutEvent::SendMessage(peer, message)); io.push(OutEvent::DisconnectPeer(peer)); } From de765fae0afe4162962833a60ce46bcf55b2efb5 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 27 May 2025 22:28:08 +0200 Subject: [PATCH 3/4] refactor: add JoinFailed event --- src/net/handles.rs | 3 +++ src/proto/hyparview.rs | 61 +++++++++++++++++++++++------------------- src/proto/topic.rs | 3 +++ 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/src/net/handles.rs b/src/net/handles.rs index 1feaeee6..18720054 100644 --- a/src/net/handles.rs +++ b/src/net/handles.rs @@ -354,6 +354,8 @@ pub enum GossipEvent { NeighborUp(NodeId), /// We dropped direct neighbor in the swarm membership layer for this topic. NeighborDown(NodeId), + /// We failed to connect to a peer that we wanted to join. + JoinFailed(NodeId), /// We received a gossip message for this topic. Received(Message), } @@ -363,6 +365,7 @@ impl From> for GossipEvent { match event { crate::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id), crate::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id), + crate::proto::Event::JoinFailed(node_id) => Self::JoinFailed(node_id), crate::proto::Event::Received(message) => Self::Received(Message { content: message.content, scope: message.scope, diff --git a/src/proto/hyparview.rs b/src/proto/hyparview.rs index 952850e4..e9b7e556 100644 --- a/src/proto/hyparview.rs +++ b/src/proto/hyparview.rs @@ -55,6 +55,8 @@ pub enum Event { NeighborUp(PI), /// A peer was removed from our set of active connections. NeighborDown(PI), + /// Joining a peer failed. + JoinFailed(PI), } /// Kinds of timers HyParView needs to schedule. @@ -67,8 +69,7 @@ pub enum Timer { /// Messages that we can send and receive from peers within the topic. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub enum Message { - /// Request to add sender to an active view of recipient. If [`Neighbor::priority`] is - /// [`Priority::High`], the request cannot be denied. + /// Request to add sender to an active view of recipient. NeighborRequest(NeighborRequest), /// Reply to a [`NeighborRequest`] if the neighbor request is accepted. /// If it's declined, a [`Message::Disconnect`] is sent instead. @@ -159,7 +160,7 @@ pub struct NeighborRequest { } /// Reason why we want to become someone's neighbor. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub enum NeighborReason { /// We are joining the swarm. Join, @@ -269,7 +270,7 @@ pub struct State { /// Statistics pub(crate) stats: Stats, /// The set of neighbor requests we sent out but did not yet receive a reply for - pending_neighbor_requests: HashSet, + pending_neighbor_requests: HashMap, /// The opaque user peer data we received for other peers peer_data: HashMap, /// List of peers that are disconnecting, but which we want to keep in the passive set once the connection closes @@ -302,7 +303,7 @@ where InEvent::RecvMessage(from, message) => self.handle_message(from, message, io), InEvent::TimerExpired(timer) => match timer { Timer::DoShuffle => self.handle_shuffle_timer(io), - Timer::PendingNeighborRequest(peer) => self.handle_pending_neighbor_timer(peer, io), + Timer::PendingNeighborRequest(peer) => self.neighbor_request_failed(peer, io), }, InEvent::PeerDisconnected(peer) => self.handle_connection_closed(peer, io), InEvent::RequestJoin(peer) => self.handle_join(peer, io), @@ -347,9 +348,18 @@ where self.send_neighbor_request(peer, NeighborReason::Join, io); } + fn neighbor_request_failed(&mut self, peer: PI, io: &mut impl IO) { + if let Some(reason) = self.pending_neighbor_requests.remove(&peer) { + self.refill_active_from_passive(Some(&peer), io); + if let NeighborReason::Join = reason { + io.push(OutEvent::EmitEvent(Event::JoinFailed(peer))); + } + } + } + /// We received a disconnect message. fn on_disconnect(&mut self, peer: PI, details: Disconnect, io: &mut impl IO) { - self.pending_neighbor_requests.remove(&peer); + self.neighbor_request_failed(peer, io); if self.active_view.contains(&peer) { self.remove_active( &peer, @@ -365,7 +375,7 @@ where /// A connection was closed by the peer. fn handle_connection_closed(&mut self, peer: PI, io: &mut impl IO) { - self.pending_neighbor_requests.remove(&peer); + self.neighbor_request_failed(peer, io); if self.active_view.contains(&peer) { self.remove_active(&peer, RemovalReason::ConnectionClosed, io); } else if !self.alive_disconnect_peers.remove(&peer) { @@ -423,7 +433,7 @@ where // in p’s active view, p will forward the request to a random node in its active view // (different from the one from which the request was received)." if !self.active_view.contains(&peer_id) - && !self.pending_neighbor_requests.contains(&peer_id) + && !self.pending_neighbor_requests.contains_key(&peer_id) { match self .active_view @@ -478,7 +488,7 @@ where } fn on_neighbor_reply(&mut self, from: PI, io: &mut impl IO) { - if self.pending_neighbor_requests.remove(&from) { + if self.pending_neighbor_requests.remove(&from).is_some() { if !self.add_active(from, Priority::High, io) { self.send_disconnect(from, true, io); } @@ -552,7 +562,7 @@ where for node in message.nodes { self.add_passive(node.id, node.data, io); } - self.refill_active_from_passive(&[], io); + self.refill_active_from_passive(None, io); } fn handle_shuffle_timer(&mut self, io: &mut impl IO) { @@ -615,15 +625,15 @@ where /// Remove a peer from the active view. /// - /// If `reason` is [`RemovalReason::Random`], a [`Disconnect`] message will be sent to the peer. + /// If `reason` is [`RemovalReason::EvictRandom`], a [`Disconnect`] message will be sent to the peer. fn remove_active(&mut self, peer: &PI, reason: RemovalReason, io: &mut impl IO) { if let Some(idx) = self.active_view.get_index_of(peer) { let removed_peer = self.remove_active_by_index(idx, reason, io).unwrap(); - self.refill_active_from_passive(&[&removed_peer], io); + self.refill_active_from_passive(Some(&removed_peer), io); } } - fn refill_active_from_passive(&mut self, skip_peers: &[&PI], io: &mut impl IO) { + fn refill_active_from_passive(&mut self, skip_peer: Option<&PI>, io: &mut impl IO) { if self.active_view.len() + self.pending_neighbor_requests.len() >= self.config.active_view_capacity { @@ -635,8 +645,8 @@ where // node q is considered failed and removed from p’s passive view; another node q′ is selected // at random and a new attempt is made. The procedure is repeated until a connection is established // with success." (p7) - let mut skip_peers = skip_peers.to_vec(); - skip_peers.extend(self.pending_neighbor_requests.iter()); + let mut skip_peers: Vec<_> = skip_peer.into_iter().collect(); + skip_peers.extend(self.pending_neighbor_requests.keys()); if let Some(node) = self .passive_view @@ -657,13 +667,6 @@ where }; } - fn handle_pending_neighbor_timer(&mut self, peer: PI, io: &mut impl IO) { - if self.pending_neighbor_requests.remove(&peer) { - self.passive_view.remove(&peer); - self.refill_active_from_passive(&[], io); - } - } - fn remove_active_by_index( &mut self, peer_index: usize, @@ -675,7 +678,7 @@ where match reason { // send a disconnect message, then close connection. - RemovalReason::Random => self.send_disconnect(peer, true, io), + RemovalReason::EvictRandom => self.send_disconnect(peer, true, io), // close connection without sending anything further. RemovalReason::DisconnectReceived { is_alive: _ } => { io.push(OutEvent::DisconnectPeer(peer)) @@ -690,7 +693,7 @@ where // keep alive if other peer said to be still alive. RemovalReason::DisconnectReceived { is_alive } => is_alive, // keep alive (only we are removing for now) - RemovalReason::Random => true, + RemovalReason::EvictRandom => true, }; if keep_as_passive { @@ -711,7 +714,7 @@ where /// Remove a random peer from the active view. fn free_random_slot_in_active_view(&mut self, io: &mut impl IO) { if let Some(index) = self.active_view.pick_random_index(&mut self.rng) { - self.remove_active_by_index(index, RemovalReason::Random, io); + self.remove_active_by_index(index, RemovalReason::EvictRandom, io); } } @@ -754,7 +757,11 @@ where } fn send_neighbor_request(&mut self, peer: PI, reason: NeighborReason, io: &mut impl IO) { - if self.pending_neighbor_requests.insert(peer) { + if self + .pending_neighbor_requests + .insert(peer, reason) + .is_none() + { let message = Message::NeighborRequest(NeighborRequest { reason, data: self.me_data.clone(), @@ -776,5 +783,5 @@ enum RemovalReason { /// A peer is removed because we received a disconnect message. DisconnectReceived { is_alive: bool }, /// A peer is removed after random selection to make room for a newly joined peer. - Random, + EvictRandom, } diff --git a/src/proto/topic.rs b/src/proto/topic.rs index ac646607..f148aecf 100644 --- a/src/proto/topic.rs +++ b/src/proto/topic.rs @@ -117,6 +117,8 @@ pub enum Event { NeighborUp(PI), /// We dropped direct neighbor in the swarm membership layer for this topic NeighborDown(PI), + /// We failed to dial a peer that we wanted to neighbor with. + JoinFailed(PI), /// A gossip message was received for this topic Received(GossipEvent), } @@ -126,6 +128,7 @@ impl From> for Event { match value { hyparview::Event::NeighborUp(peer) => Self::NeighborUp(peer), hyparview::Event::NeighborDown(peer) => Self::NeighborDown(peer), + hyparview::Event::JoinFailed(peer) => Self::JoinFailed(peer), } } } From 7608bbc4ab09cf2661b1a79b1181db5ea6c4e0de Mon Sep 17 00:00:00 2001 From: Frando Date: Mon, 9 Jun 2025 13:29:34 +0200 Subject: [PATCH 4/4] refactor: add nodes field to Disconnect instead of sending ShuffleReply at disconnect --- src/proto/hyparview.rs | 51 +++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/src/proto/hyparview.rs b/src/proto/hyparview.rs index e9b7e556..c45ca71c 100644 --- a/src/proto/hyparview.rs +++ b/src/proto/hyparview.rs @@ -86,7 +86,7 @@ pub enum Message { /// Request to disconnect from a peer. /// If [`Disconnect::alive`] is true, the other peer is not shutting down, so it should be /// added to the passive set. - Disconnect(Disconnect), + Disconnect(Disconnect), } /// The time-to-live for this message. @@ -187,10 +187,12 @@ pub struct NeighborReply {} /// Message sent when leaving the swarm or closing down to inform peers about us being gone. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct Disconnect { +pub struct Disconnect { /// Whether we are actually shutting down or closing the connection only because our limits are /// reached. alive: bool, + /// A list of nodes from the sender's active and passive views + nodes: Vec>, } /// Configuration for the swarm membership layer @@ -358,8 +360,9 @@ where } /// We received a disconnect message. - fn on_disconnect(&mut self, peer: PI, details: Disconnect, io: &mut impl IO) { + fn on_disconnect(&mut self, peer: PI, details: Disconnect, io: &mut impl IO) { self.neighbor_request_failed(peer, io); + if self.active_view.contains(&peer) { self.remove_active( &peer, @@ -371,6 +374,7 @@ where } else if details.alive && self.passive_view.contains(&peer) { self.alive_disconnect_peers.insert(peer); } + self.on_shuffle_nodes(details.nodes, Some(&peer), io); } /// A connection was closed by the peer. @@ -395,9 +399,8 @@ where // Before disconnecting, send a `ShuffleReply` with some of our nodes to // prevent the other node from running out of connections. This is especially // relevant if the other node just joined the swarm. - let len = self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count; - self.send_shuffle_reply(peer, len, io); - let message = Message::Disconnect(Disconnect { alive }); + let nodes = self.get_nodes_for_shuffle(None); + let message = Message::Disconnect(Disconnect { alive, nodes }); io.push(OutEvent::SendMessage(peer, message)); io.push(OutEvent::DisconnectPeer(peer)); } @@ -524,9 +527,7 @@ where fn on_shuffle(&mut self, from: PI, shuffle: Shuffle, io: &mut impl IO) { if shuffle.ttl.expired() || self.active_view.len() <= 1 { let len = shuffle.nodes.len(); - for node in shuffle.nodes { - self.add_passive(node.id, node.data, io); - } + self.on_shuffle_nodes(shuffle.nodes, None, io); self.send_shuffle_reply(shuffle.origin, len, io); } else if let Some(node) = self .active_view @@ -541,7 +542,28 @@ where } } + fn on_shuffle_nodes( + &mut self, + nodes: Vec>, + skip_peer: Option<&PI>, + io: &mut impl IO, + ) { + for node in nodes { + self.add_passive(node.id, node.data, io); + } + self.refill_active_from_passive(skip_peer, io); + } + fn send_shuffle_reply(&mut self, to: PI, len: usize, io: &mut impl IO) { + let nodes = self.get_nodes_for_shuffle(Some(len)); + let message = Message::ShuffleReply(ShuffleReply { nodes }); + io.push(OutEvent::SendMessage(to, message)); + } + + fn get_nodes_for_shuffle(&mut self, len: Option) -> Vec> { + let len = len.unwrap_or_else(|| { + self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count + }); let mut nodes = self.passive_view.shuffled_and_capped(len, &mut self.rng); // If we don't have enough passive nodes for the expected length, we fill with // active nodes. @@ -551,18 +573,11 @@ where .shuffled_and_capped(len - nodes.len(), &mut self.rng), ); } - let nodes = nodes.into_iter().map(|id| self.peer_info(&id)); - let message = Message::ShuffleReply(ShuffleReply { - nodes: nodes.collect(), - }); - io.push(OutEvent::SendMessage(to, message)); + nodes.into_iter().map(|id| self.peer_info(&id)).collect() } fn on_shuffle_reply(&mut self, message: ShuffleReply, io: &mut impl IO) { - for node in message.nodes { - self.add_passive(node.id, node.data, io); - } - self.refill_active_from_passive(None, io); + self.on_shuffle_nodes(message.nodes, None, io); } fn handle_shuffle_timer(&mut self, io: &mut impl IO) {