Skip to content

Commit 4643ffb

Browse files
committed
Introduce supported protocol ids and vers
1 parent 0e8dc27 commit 4643ffb

File tree

9 files changed

+407
-241
lines changed

9 files changed

+407
-241
lines changed

spectrum-network/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pub mod network_controller;
22
pub mod peer_conn_handler;
33
pub mod peer_manager;
44
pub mod protocol;
5+
pub mod protocol_api;
56
pub mod protocol_handler;
67
pub mod protocol_upgrade;
78
pub mod types;
8-
pub mod protocol_api;

spectrum-network/src/network_controller.rs

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::peer_conn_handler::{
55
use crate::peer_manager::{PeerEvents, PeerManagerOut, Peers};
66
use crate::protocol::ProtocolConfig;
77
use crate::protocol_api::ProtocolEvents;
8+
use crate::protocol_upgrade::supported_protocol_vers::SupportedProtocolIdMap;
89
use crate::types::{ProtocolId, ProtocolVer};
910

1011
use libp2p::core::connection::ConnectionId;
@@ -215,7 +216,7 @@ impl<TPeers, TPeerManager, THandler> NetworkEvents for NetworkController<TPeers,
215216
pub struct NetworkController<TPeers, TPeerManager, THandler> {
216217
conn_handler_conf: PeerConnHandlerConf,
217218
/// All supported protocols and their handlers
218-
supported_protocols: HashMap<ProtocolId, (ProtocolConfig, THandler)>,
219+
supported_protocols: SupportedProtocolIdMap<(ProtocolConfig, THandler)>,
219220
/// PeerManager API
220221
peers: TPeers,
221222
/// PeerManager stream itself
@@ -240,7 +241,7 @@ where
240241
) -> Self {
241242
Self {
242243
conn_handler_conf,
243-
supported_protocols,
244+
supported_protocols: supported_protocols.into(),
244245
peers,
245246
peer_manager,
246247
enabled_peers: HashMap::new(),
@@ -254,8 +255,7 @@ where
254255
self.conn_handler_conf.clone(),
255256
self.supported_protocols
256257
.iter()
257-
.clone()
258-
.map(|(prot_id, (conf, _))| (*prot_id, conf.clone()))
258+
.map(|(prot_id, (conf, _))| (prot_id, conf.clone()))
259259
.collect::<Vec<_>>(),
260260
)
261261
}
@@ -361,7 +361,7 @@ where
361361
{
362362
let protocol_id = protocol_tag.protocol_id();
363363
let protocol_ver = protocol_tag.protocol_ver();
364-
match enabled_protocols.entry(protocol_id) {
364+
match enabled_protocols.entry(protocol_id.get_inner()) {
365365
Entry::Occupied(mut entry) => {
366366
trace!(
367367
"Current state of protocol {:?} is {:?}",
@@ -371,12 +371,12 @@ where
371371
if let (EnabledProtocol::PendingEnable, handler) = entry.get() {
372372
handler.protocol_enabled(
373373
peer_id,
374-
protocol_ver,
374+
protocol_ver.get_inner(),
375375
out_channel.clone(),
376376
handshake,
377377
);
378378
let enabled_protocol = EnabledProtocol::Enabled {
379-
ver: protocol_ver,
379+
ver: protocol_ver.get_inner(),
380380
sink: out_channel,
381381
};
382382
entry.insert((enabled_protocol, handler.clone()));
@@ -398,42 +398,51 @@ where
398398
}) = self.enabled_peers.get_mut(&peer_id)
399399
{
400400
let protocol_id = protocol_tag.protocol_id();
401-
if let Some((_, prot_handler)) = self.supported_protocols.get(&protocol_id) {
402-
match enabled_protocols.entry(protocol_id) {
403-
Entry::Vacant(entry) => {
404-
entry.insert((EnabledProtocol::PendingApprove, prot_handler.clone()));
405-
prot_handler.protocol_requested(
401+
let (_, prot_handler) = self.supported_protocols.get_supported(protocol_id);
402+
match enabled_protocols.entry(protocol_id.get_inner()) {
403+
Entry::Vacant(entry) => {
404+
entry.insert((EnabledProtocol::PendingApprove, prot_handler.clone()));
405+
prot_handler.protocol_requested(
406+
peer_id,
407+
protocol_tag.protocol_ver().get_inner(),
408+
handshake,
409+
);
410+
}
411+
Entry::Occupied(_) => {
412+
warn!(
413+
"Peer {:?} opened already enabled protocol {:?}",
414+
peer_id, protocol_id
415+
);
416+
self.pending_actions
417+
.push_back(NetworkBehaviourAction::NotifyHandler {
406418
peer_id,
407-
protocol_tag.protocol_ver(),
408-
handshake,
409-
);
410-
}
411-
Entry::Occupied(_) => {
412-
warn!(
413-
"Peer {:?} opened already enabled protocol {:?}",
414-
peer_id, protocol_id
415-
);
416-
self.pending_actions
417-
.push_back(NetworkBehaviourAction::NotifyHandler {
418-
peer_id,
419-
handler: NotifyHandler::One(connection),
420-
event: ConnHandlerIn::Close(protocol_id),
421-
})
422-
}
419+
handler: NotifyHandler::One(connection),
420+
event: ConnHandlerIn::Close(protocol_id),
421+
})
423422
}
424-
} else {
425-
self.pending_actions
426-
.push_back(NetworkBehaviourAction::NotifyHandler {
423+
}
424+
}
425+
}
426+
ConnHandlerOut::ClosedByPeer(protocol_id) | ConnHandlerOut::RefusedToOpen(protocol_id) => {
427+
if let Some(ConnectedPeer::Connected {
428+
enabled_protocols, ..
429+
}) = self.enabled_peers.get_mut(&peer_id)
430+
{
431+
match enabled_protocols.entry(protocol_id.get_inner()) {
432+
Entry::Occupied(entry) => {
433+
trace!(
434+
"Peer {:?} closed the substream for protocol {:?}",
427435
peer_id,
428-
handler: NotifyHandler::One(connection),
429-
event: ConnHandlerIn::Close(protocol_id),
430-
})
436+
protocol_id
437+
);
438+
entry.remove();
439+
}
440+
Entry::Vacant(_) => {}
431441
}
432442
}
433443
}
434-
ConnHandlerOut::ClosedByPeer(protocol_id)
435-
| ConnHandlerOut::RefusedToOpen(protocol_id)
436-
| ConnHandlerOut::Closed(protocol_id) => {
444+
445+
ConnHandlerOut::Closed(protocol_id) => {
437446
if let Some(ConnectedPeer::Connected {
438447
enabled_protocols, ..
439448
}) = self.enabled_peers.get_mut(&peer_id)
@@ -564,7 +573,7 @@ where
564573
ConnectedPeer::Connected {
565574
enabled_protocols, ..
566575
} => {
567-
if let Some((_, prot_handler)) = self.supported_protocols.get(&protocol) {
576+
if let Some((_, prot_handler)) = self.supported_protocols.get(protocol) {
568577
match enabled_protocols.entry(protocol) {
569578
Entry::Occupied(_) => warn!(
570579
"PM requested already enabled protocol {:?} with peer {:?}",
@@ -617,7 +626,7 @@ where
617626
enabled_protocols,
618627
}) = self.enabled_peers.get_mut(&peer_id)
619628
{
620-
let (_, prot_handler) = self.supported_protocols.get(&protocol_id).unwrap();
629+
let (_, prot_handler) = self.supported_protocols.get(protocol_id).unwrap();
621630
match enabled_protocols.entry(protocol_id) {
622631
Entry::Occupied(protocol_entry) => match protocol_entry.remove_entry().1 {
623632
// Protocol handler approves either outbound or inbound protocol request.

0 commit comments

Comments
 (0)