diff --git a/code/Cargo.lock b/code/Cargo.lock index 68110b87b..f08a813e9 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -261,6 +261,7 @@ dependencies = [ "serde", "serde_json", "toml", + "tracing", ] [[package]] diff --git a/code/crates/app/src/spawn.rs b/code/crates/app/src/spawn.rs index 0a273c511..6cdfef79b 100644 --- a/code/crates/app/src/spawn.rs +++ b/code/crates/app/src/spawn.rs @@ -255,6 +255,8 @@ fn make_network_config(cfg: &ConsensusConfig, value_sync_cfg: &ValueSyncConfig) mesh_n_low: config.mesh_n_low(), mesh_outbound_min: config.mesh_outbound_min(), enable_peer_scoring: config.enable_peer_scoring(), + enable_explicit_peering: config.enable_explicit_peering(), + enable_flood_publish: config.enable_flood_publish(), }, config::PubSubProtocol::Broadcast => GossipSubConfig::default(), }, diff --git a/code/crates/config/Cargo.toml b/code/crates/config/Cargo.toml index bdff7a583..b572f70d1 100644 --- a/code/crates/config/Cargo.toml +++ b/code/crates/config/Cargo.toml @@ -23,6 +23,7 @@ config = { workspace = true } humantime-serde = { workspace = true } multiaddr = { workspace = true } serde = { workspace = true, features = ["derive"] } +tracing = { workspace = true, default-features = false } [dev-dependencies] serde_json = { workspace = true } diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index eaa7d8826..78278d357 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -297,12 +297,22 @@ pub struct GossipSubConfig { /// Enable peer scoring to prioritize nodes based on their type in mesh formation enable_peer_scoring: bool, + + /// Enable explicit peering for persistent peers. + /// When enabled, persistent peers are added as explicit peers in GossipSub, + /// meaning a node always sends and forwards messages to its explicit peers, + /// regardless of mesh membership. + enable_explicit_peering: bool, + + /// Enable flood publishing. + /// When enabled the publisher sends the messages to all known peers, not just mesh peers. + enable_flood_publish: bool, } impl Default for GossipSubConfig { fn default() -> Self { - // Peer scoring disabled by default - Self::new(6, 12, 4, 2, false) + // Peer scoring disabled and explicit peering disabled by default, flood_publish enabled by default + Self::new(6, 12, 4, 2, false, false, true) } } @@ -314,6 +324,8 @@ impl GossipSubConfig { mesh_n_low: usize, mesh_outbound_min: usize, enable_peer_scoring: bool, + enable_explicit_peering: bool, + enable_flood_publish: bool, ) -> Self { let mut result = Self { mesh_n, @@ -321,6 +333,8 @@ impl GossipSubConfig { mesh_n_low, mesh_outbound_min, enable_peer_scoring, + enable_explicit_peering, + enable_flood_publish, }; result.adjust(); @@ -349,6 +363,11 @@ impl GossipSubConfig { { self.mesh_outbound_min = max(1, min(self.mesh_n / 2, self.mesh_n_low - 1)); } + + // Both flood_publish and explicit_peering can be enabled together. + // flood_publish sends to all known peers on publish, explicit peering ensures + // a node always sends and forwards messages to its explicit peers, + // regardless of mesh membership. } pub fn mesh_n(&self) -> usize { @@ -370,10 +389,31 @@ impl GossipSubConfig { pub fn enable_peer_scoring(&self) -> bool { self.enable_peer_scoring } + + pub fn enable_explicit_peering(&self) -> bool { + self.enable_explicit_peering + } + + pub fn enable_flood_publish(&self) -> bool { + self.enable_flood_publish + } } mod gossipsub { use super::utils::bool_from_anything; + + fn default_enable_peer_scoring() -> bool { + false + } + + fn default_enable_explicit_peering() -> bool { + false + } + + fn default_enable_flood_publish() -> bool { + true + } + #[derive(serde::Deserialize)] pub struct RawConfig { #[serde(default)] @@ -384,8 +424,21 @@ mod gossipsub { mesh_n_low: usize, #[serde(default)] mesh_outbound_min: usize, - #[serde(default, deserialize_with = "bool_from_anything")] + #[serde( + default = "default_enable_peer_scoring", + deserialize_with = "bool_from_anything" + )] enable_peer_scoring: bool, + #[serde( + default = "default_enable_explicit_peering", + deserialize_with = "bool_from_anything" + )] + enable_explicit_peering: bool, + #[serde( + default = "default_enable_flood_publish", + deserialize_with = "bool_from_anything" + )] + enable_flood_publish: bool, } impl From for super::GossipSubConfig { @@ -396,6 +449,8 @@ mod gossipsub { raw.mesh_n_low, raw.mesh_outbound_min, raw.enable_peer_scoring, + raw.enable_explicit_peering, + raw.enable_flood_publish, ) } } diff --git a/code/crates/network/src/behaviour.rs b/code/crates/network/src/behaviour.rs index 9739c055f..0909b898b 100644 --- a/code/crates/network/src/behaviour.rs +++ b/code/crates/network/src/behaviour.rs @@ -181,6 +181,7 @@ fn gossipsub_config(config: GossipSubConfig, max_transmit_size: usize) -> gossip .mesh_n_low(config.mesh_n_low) .mesh_outbound_min(config.mesh_outbound_min) .mesh_n(config.mesh_n) + .flood_publish(config.enable_flood_publish) .message_id_fn(message_id) .build() .unwrap() diff --git a/code/crates/network/src/lib.rs b/code/crates/network/src/lib.rs index 979c6fb9e..a526f208d 100644 --- a/code/crates/network/src/lib.rs +++ b/code/crates/network/src/lib.rs @@ -102,6 +102,8 @@ pub struct GossipSubConfig { pub mesh_n_low: usize, pub mesh_outbound_min: usize, pub enable_peer_scoring: bool, + pub enable_explicit_peering: bool, + pub enable_flood_publish: bool, } impl Default for GossipSubConfig { @@ -113,6 +115,8 @@ impl Default for GossipSubConfig { mesh_n_low: 4, mesh_outbound_min: 2, enable_peer_scoring: false, + enable_explicit_peering: false, + enable_flood_publish: true, } } } @@ -611,6 +615,51 @@ fn set_peer_score(swarm: &mut swarm::Swarm, peer_id: libp2p::PeerId, } } +/// Add a persistent peer as an explicit peer in gossipsub (if explicit peering is enabled). +/// A node always sends and forwards messages to its explicit peers, regardless of mesh membership. +fn add_explicit_peer_to_gossipsub( + swarm: &mut swarm::Swarm, + state: &mut State, + peer_id: libp2p::PeerId, +) { + let Some(peer_info) = state.peer_info.get_mut(&peer_id) else { + return; + }; + + if peer_info.peer_type.is_persistent() { + if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() { + gossipsub.add_explicit_peer(&peer_id); + state + .metrics + .record_explicit_peer(&peer_id, &peer_info.moniker); + peer_info.is_explicit = true; + info!("Added persistent peer {peer_id} as explicit peer in gossipsub"); + } + } +} + +/// Remove a persistent peer from explicit peers in gossipsub and mark the metric stale. +fn remove_explicit_peer_from_gossipsub( + swarm: &mut swarm::Swarm, + state: &mut State, + peer_id: &libp2p::PeerId, +) { + let Some(peer_info) = state.peer_info.get_mut(peer_id) else { + return; + }; + + if peer_info.peer_type.is_persistent() { + if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() { + gossipsub.remove_explicit_peer(peer_id); + state + .metrics + .mark_explicit_peer_stale(peer_id, &peer_info.moniker); + peer_info.is_explicit = false; + info!("Removed persistent peer {peer_id} from explicit peers in gossipsub"); + } + } +} + async fn handle_swarm_event( event: SwarmEvent, config: &Config, @@ -690,6 +739,11 @@ async fn handle_swarm_event( .handle_closed_connection(swarm, peer_id, connection_id); if num_established == 0 { + // Remove explicit peer from gossipsub and mark metric stale when this peer was one + if config.gossipsub.enable_explicit_peering { + remove_explicit_peer_from_gossipsub(swarm, state, &peer_id); + } + if let Err(e) = tx_event .send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id))) .await @@ -732,6 +786,11 @@ async fn handle_swarm_event( let score = state.update_peer(peer_id, connection_id, &info); set_peer_score(swarm, peer_id, score); + // If enabled, add persistent peers as explicit peers for guaranteed delivery + if config.gossipsub.enable_explicit_peering { + add_explicit_peer_to_gossipsub(swarm, state, peer_id); + } + if !is_already_connected { if let Err(e) = tx_event .send(Event::PeerConnected(PeerId::from_libp2p(&peer_id))) diff --git a/code/crates/network/src/metrics.rs b/code/crates/network/src/metrics.rs index f5b8d8fac..cb00b1dc2 100644 --- a/code/crates/network/src/metrics.rs +++ b/code/crates/network/src/metrics.rs @@ -38,6 +38,13 @@ pub(crate) struct MeshMembershipLabels { topic: String, // "/consensus", "/liveness", "/proposal_parts" } +/// Labels for explicit peer metric +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub(crate) struct ExplicitPeerLabels { + peer_id: String, + peer_moniker: String, +} + impl PeerInfo { /// Convert to Prometheus metric labels (with slot number) pub(crate) fn to_labels(&self, peer_id: &PeerId, slot: usize) -> PeerInfoLabels { @@ -86,6 +93,8 @@ pub(crate) struct Metrics { discovered_peers: Family, /// Per-peer, per-topic mesh membership (1 = in mesh, 0 = not in mesh) peer_mesh_membership: Family, + /// Explicit peers in gossipsub (1 = active, i64::MIN = disconnected/stale) + explicit_peers: Family, /// PeerId to slot number mapping peer_slots: Slots, } @@ -104,6 +113,7 @@ impl Metrics { let local_node_info = Family::::default(); let peer_info = Family::::default(); let mesh_membership = Family::::default(); + let explicit_peers = Family::::default(); registry.register( "local_node_info", @@ -123,10 +133,17 @@ impl Metrics { mesh_membership.clone(), ); + registry.register( + "explicit_peers", + "Peers added as explicit peers in gossipsub (1 = active, i64::MIN = disconnected)", + explicit_peers.clone(), + ); + Self { local_node_info, discovered_peers: peer_info, peer_mesh_membership: mesh_membership, + explicit_peers, peer_slots: Slots::new(MAX_PEER_SLOTS), } } @@ -222,6 +239,24 @@ impl Metrics { } } + /// Record a peer as an explicit peer in gossipsub + pub(crate) fn record_explicit_peer(&self, peer_id: &PeerId, moniker: &str) { + let labels = ExplicitPeerLabels { + peer_id: peer_id.to_string(), + peer_moniker: moniker.to_string(), + }; + self.explicit_peers.get_or_create(&labels).set(1); + } + + /// Mark an explicit peer as stale (disconnected) + pub(crate) fn mark_explicit_peer_stale(&self, peer_id: &PeerId, moniker: &str) { + let labels = ExplicitPeerLabels { + peer_id: peer_id.to_string(), + peer_moniker: moniker.to_string(), + }; + self.explicit_peers.get_or_create(&labels).set(i64::MIN); + } + /// Record metrics for a new peer (assigns slot if needed). pub(crate) fn record_new_peer(&mut self, peer_id: &PeerId, peer_info: &PeerInfo) { let slot = if let Some(existing_slot) = self.peer_slots.get(peer_id) { diff --git a/code/crates/network/src/state.rs b/code/crates/network/src/state.rs index 719a8b444..57f29a4a0 100644 --- a/code/crates/network/src/state.rs +++ b/code/crates/network/src/state.rs @@ -90,11 +90,12 @@ pub struct PeerInfo { pub connection_direction: Option, // None if ephemeral (unknown) pub score: f64, pub topics: HashSet, // Set of topics peer is in mesh for (e.g., "/consensus", "/liveness") + pub is_explicit: bool, // Whether this peer is an explicit peer in gossipsub } impl PeerInfo { /// Format peer info with peer_id for logging - /// Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score + /// Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score, Explicit pub fn format_with_peer_id(&self, peer_id: &libp2p::PeerId) -> String { let direction = self.connection_direction.map_or("??", |d| d.as_str()); let mut topics: Vec<&str> = self.topics.iter().map(|s| s.as_str()).collect(); @@ -106,8 +107,9 @@ impl PeerInfo { } else { &self.consensus_address }; + let explicit = if self.is_explicit { "explicit" } else { "" }; format!( - "{}, {}, {}, {}, {}, {}, {}, {}", + "{}, {}, {}, {}, {}, {}, {}, {}, {}", self.address, self.moniker, peer_type_str, @@ -115,7 +117,8 @@ impl PeerInfo { address, topics_str, direction, - self.score as i64 + self.score as i64, + explicit ) } } @@ -414,8 +417,8 @@ impl State { agent_info.address.clone() }; - // If peer already exists (additional connection), just update Identify-provided fields. - // Keep existing state since they never fully disconnected. + // If peer already exists (additional connection), update Identify-provided fields. + // Keep existing state (topics) since they never fully disconnected. if let Some(existing) = self.peer_info.get_mut(&peer_id) { let old_peer_info = existing.clone(); existing.moniker = agent_info.moniker; @@ -426,11 +429,14 @@ impl State { existing.address = address; existing.connection_direction = connection_direction; } - // Preserve: peer_type, consensus_address, score, topics + // Re-evaluate peer type and consensus address with current state + existing.peer_type = peer_type; + existing.consensus_address = consensus_address; + existing.score = crate::peer_scoring::get_peer_score(peer_type); self.metrics .update_peer_labels(&peer_id, &old_peer_info, existing); - return crate::peer_scoring::get_peer_score(existing.peer_type); + return existing.score; } // New peer - create entry @@ -443,6 +449,7 @@ impl State { connection_direction, score, topics: Default::default(), + is_explicit: false, }; // Record peer information in metrics (subject to 100 slot limit) diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index 6ba8c0d83..06d159813 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -280,6 +280,8 @@ async fn spawn_network_actor( mesh_n_low: config.mesh_n_low(), mesh_outbound_min: config.mesh_outbound_min(), enable_peer_scoring: config.enable_peer_scoring(), + enable_explicit_peering: config.enable_explicit_peering(), + enable_flood_publish: config.enable_flood_publish(), }, config::PubSubProtocol::Broadcast => gossip::GossipSubConfig::default(), }, diff --git a/code/crates/test/app/config.toml b/code/crates/test/app/config.toml index c38723942..a73b357cd 100644 --- a/code/crates/test/app/config.toml +++ b/code/crates/test/app/config.toml @@ -164,6 +164,22 @@ mesh_outbound_min = 2 # Override with MALACHITE__CONSENSUS__P2P__PROTOCOL__ENABLE_PEER_SCORING env variable enable_peer_scoring = false +# GossipSub only. Enable explicit peering for persistent peers. +# When enabled, persistent peers are added as explicit peers in GossipSub, +# meaning a node always sends and forwards messages to its explicit peers, +# regardless of mesh membership. +# Note: explicit peering should ideally be reciprocal (both sides have each other as persistent peers). +# If only one side adds the other as explicit, that side will reject GRAFTs from the other with PRUNE, +# preventing the other from including it in its mesh. +# Override with MALACHITE__CONSENSUS__P2P__PROTOCOL__ENABLE_EXPLICIT_PEERING env variable +enable_explicit_peering = false + +# GossipSub only. Enable flood publishing. +# When enabled, published messages are sent to all known peers, not just mesh peers. +# Can be enabled together with explicit peering. +# Override with MALACHITE__CONSENSUS__P2P__PROTOCOL__ENABLE_FLOOD_PUBLISH env variable +enable_flood_publish = true + ####################################################### ### ValueSync Configuration Options ### ####################################################### diff --git a/code/examples/channel/config.toml b/code/examples/channel/config.toml index 7e2dd67e9..69d33a9fa 100644 --- a/code/examples/channel/config.toml +++ b/code/examples/channel/config.toml @@ -163,6 +163,22 @@ mesh_outbound_min = 2 # Override with MALACHITE__CONSENSUS__P2P__PROTOCOL__ENABLE_PEER_SCORING env variable enable_peer_scoring = false +# GossipSub only. Enable explicit peering for persistent peers. +# When enabled, persistent peers are added as explicit peers in GossipSub, +# meaning a node always sends and forwards messages to its explicit peers, +# regardless of mesh membership. +# Note: explicit peering should ideally be reciprocal (both sides have each other as persistent peers). +# If only one side adds the other as explicit, that side will reject GRAFTs from the other with PRUNE, +# preventing the other from including it in its mesh. +# Override with MALACHITE__CONSENSUS__P2P__PROTOCOL__ENABLE_EXPLICIT_PEERING env variable +enable_explicit_peering = false + +# GossipSub only. Enable flood publishing. +# When enabled, published messages are sent to all known peers, not just mesh peers. +# Can be enabled together with explicit peering. +# Override with MALACHITE__CONSENSUS__P2P__PROTOCOL__ENABLE_FLOOD_PUBLISH env variable +enable_flood_publish = true + ####################################################### ### Mempool Configuration Options ### #######################################################