Skip to content
1 change: 1 addition & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions code/crates/app/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ fn make_network_config(cfg: &ConsensusConfig, value_sync_cfg: &ValueSyncConfig)
mesh_n_low: config.mesh_n_low(),
mesh_outbound_min: config.mesh_outbound_min(),
enable_peer_scoring: config.enable_peer_scoring(),
enable_explicit_peering: config.enable_explicit_peering(),
enable_flood_publish: config.enable_flood_publish(),
},
config::PubSubProtocol::Broadcast => GossipSubConfig::default(),
},
Expand Down
1 change: 1 addition & 0 deletions code/crates/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ config = { workspace = true }
humantime-serde = { workspace = true }
multiaddr = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tracing = { workspace = true, default-features = false }

[dev-dependencies]
serde_json = { workspace = true }
Expand Down
61 changes: 58 additions & 3 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,22 @@ pub struct GossipSubConfig {

/// Enable peer scoring to prioritize nodes based on their type in mesh formation
enable_peer_scoring: bool,

/// Enable explicit peering for persistent peers.
/// When enabled, persistent peers are added as explicit peers in GossipSub,
/// meaning a node always sends and forwards messages to its explicit peers,
/// regardless of mesh membership.
enable_explicit_peering: bool,

/// Enable flood publishing.
/// When enabled the publisher sends the messages to all known peers, not just mesh peers.
enable_flood_publish: bool,
}

impl Default for GossipSubConfig {
fn default() -> Self {
// Peer scoring disabled by default
Self::new(6, 12, 4, 2, false)
// Peer scoring disabled and explicit peering disabled by default, flood_publish enabled by default
Self::new(6, 12, 4, 2, false, false, true)
}
}

Expand All @@ -314,13 +324,17 @@ impl GossipSubConfig {
mesh_n_low: usize,
mesh_outbound_min: usize,
enable_peer_scoring: bool,
enable_explicit_peering: bool,
enable_flood_publish: bool,
) -> Self {
let mut result = Self {
mesh_n,
mesh_n_high,
mesh_n_low,
mesh_outbound_min,
enable_peer_scoring,
enable_explicit_peering,
enable_flood_publish,
};

result.adjust();
Expand Down Expand Up @@ -349,6 +363,11 @@ impl GossipSubConfig {
{
self.mesh_outbound_min = max(1, min(self.mesh_n / 2, self.mesh_n_low - 1));
}

// Both flood_publish and explicit_peering can be enabled together.
// flood_publish sends to all known peers on publish, explicit peering ensures
// a node always sends and forwards messages to its explicit peers,
// regardless of mesh membership.
}

pub fn mesh_n(&self) -> usize {
Expand All @@ -370,10 +389,31 @@ impl GossipSubConfig {
pub fn enable_peer_scoring(&self) -> bool {
self.enable_peer_scoring
}

pub fn enable_explicit_peering(&self) -> bool {
self.enable_explicit_peering
}

pub fn enable_flood_publish(&self) -> bool {
self.enable_flood_publish
}
}

mod gossipsub {
use super::utils::bool_from_anything;

fn default_enable_peer_scoring() -> bool {
false
}

fn default_enable_explicit_peering() -> bool {
false
}

fn default_enable_flood_publish() -> bool {
true
}

#[derive(serde::Deserialize)]
pub struct RawConfig {
#[serde(default)]
Expand All @@ -384,8 +424,21 @@ mod gossipsub {
mesh_n_low: usize,
#[serde(default)]
mesh_outbound_min: usize,
#[serde(default, deserialize_with = "bool_from_anything")]
#[serde(
default = "default_enable_peer_scoring",
deserialize_with = "bool_from_anything"
)]
enable_peer_scoring: bool,
#[serde(
default = "default_enable_explicit_peering",
deserialize_with = "bool_from_anything"
)]
enable_explicit_peering: bool,
#[serde(
default = "default_enable_flood_publish",
deserialize_with = "bool_from_anything"
)]
enable_flood_publish: bool,
}

impl From<RawConfig> for super::GossipSubConfig {
Expand All @@ -396,6 +449,8 @@ mod gossipsub {
raw.mesh_n_low,
raw.mesh_outbound_min,
raw.enable_peer_scoring,
raw.enable_explicit_peering,
raw.enable_flood_publish,
)
}
}
Expand Down
1 change: 1 addition & 0 deletions code/crates/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ fn gossipsub_config(config: GossipSubConfig, max_transmit_size: usize) -> gossip
.mesh_n_low(config.mesh_n_low)
.mesh_outbound_min(config.mesh_outbound_min)
.mesh_n(config.mesh_n)
.flood_publish(config.enable_flood_publish)
.message_id_fn(message_id)
.build()
.unwrap()
Expand Down
59 changes: 59 additions & 0 deletions code/crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub struct GossipSubConfig {
pub mesh_n_low: usize,
pub mesh_outbound_min: usize,
pub enable_peer_scoring: bool,
pub enable_explicit_peering: bool,
pub enable_flood_publish: bool,
}

impl Default for GossipSubConfig {
Expand All @@ -113,6 +115,8 @@ impl Default for GossipSubConfig {
mesh_n_low: 4,
mesh_outbound_min: 2,
enable_peer_scoring: false,
enable_explicit_peering: false,
enable_flood_publish: true,
}
}
}
Expand Down Expand Up @@ -611,6 +615,51 @@ fn set_peer_score(swarm: &mut swarm::Swarm<Behaviour>, peer_id: libp2p::PeerId,
}
}

/// Add a persistent peer as an explicit peer in gossipsub (if explicit peering is enabled).
/// A node always sends and forwards messages to its explicit peers, regardless of mesh membership.
fn add_explicit_peer_to_gossipsub(
swarm: &mut swarm::Swarm<Behaviour>,
state: &mut State,
peer_id: libp2p::PeerId,
) {
let Some(peer_info) = state.peer_info.get_mut(&peer_id) else {
return;
};

if peer_info.peer_type.is_persistent() {
if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
gossipsub.add_explicit_peer(&peer_id);
state
.metrics
.record_explicit_peer(&peer_id, &peer_info.moniker);
peer_info.is_explicit = true;
info!("Added persistent peer {peer_id} as explicit peer in gossipsub");
}
}
}

/// Remove a persistent peer from explicit peers in gossipsub and mark the metric stale.
fn remove_explicit_peer_from_gossipsub(
swarm: &mut swarm::Swarm<Behaviour>,
state: &mut State,
peer_id: &libp2p::PeerId,
) {
let Some(peer_info) = state.peer_info.get_mut(peer_id) else {
return;
};

if peer_info.peer_type.is_persistent() {
if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
gossipsub.remove_explicit_peer(peer_id);
state
.metrics
.mark_explicit_peer_stale(peer_id, &peer_info.moniker);
peer_info.is_explicit = false;
info!("Removed persistent peer {peer_id} from explicit peers in gossipsub");
}
}
}

async fn handle_swarm_event(
event: SwarmEvent<NetworkEvent>,
config: &Config,
Expand Down Expand Up @@ -690,6 +739,11 @@ async fn handle_swarm_event(
.handle_closed_connection(swarm, peer_id, connection_id);

if num_established == 0 {
// Remove explicit peer from gossipsub and mark metric stale when this peer was one
if config.gossipsub.enable_explicit_peering {
remove_explicit_peer_from_gossipsub(swarm, state, &peer_id);
}

if let Err(e) = tx_event
.send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
.await
Expand Down Expand Up @@ -732,6 +786,11 @@ async fn handle_swarm_event(
let score = state.update_peer(peer_id, connection_id, &info);
set_peer_score(swarm, peer_id, score);

// If enabled, add persistent peers as explicit peers for guaranteed delivery
if config.gossipsub.enable_explicit_peering {
add_explicit_peer_to_gossipsub(swarm, state, peer_id);
}

if !is_already_connected {
if let Err(e) = tx_event
.send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
Expand Down
35 changes: 35 additions & 0 deletions code/crates/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ pub(crate) struct MeshMembershipLabels {
topic: String, // "/consensus", "/liveness", "/proposal_parts"
}

/// Labels for explicit peer metric
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub(crate) struct ExplicitPeerLabels {
peer_id: String,
peer_moniker: String,
}

impl PeerInfo {
/// Convert to Prometheus metric labels (with slot number)
pub(crate) fn to_labels(&self, peer_id: &PeerId, slot: usize) -> PeerInfoLabels {
Expand Down Expand Up @@ -86,6 +93,8 @@ pub(crate) struct Metrics {
discovered_peers: Family<PeerInfoLabels, Gauge>,
/// Per-peer, per-topic mesh membership (1 = in mesh, 0 = not in mesh)
peer_mesh_membership: Family<MeshMembershipLabels, Gauge>,
/// Explicit peers in gossipsub (1 = active, i64::MIN = disconnected/stale)
explicit_peers: Family<ExplicitPeerLabels, Gauge>,
/// PeerId to slot number mapping
peer_slots: Slots<PeerId>,
}
Expand All @@ -104,6 +113,7 @@ impl Metrics {
let local_node_info = Family::<LocalNodeLabels, Gauge>::default();
let peer_info = Family::<PeerInfoLabels, Gauge>::default();
let mesh_membership = Family::<MeshMembershipLabels, Gauge>::default();
let explicit_peers = Family::<ExplicitPeerLabels, Gauge>::default();

registry.register(
"local_node_info",
Expand All @@ -123,10 +133,17 @@ impl Metrics {
mesh_membership.clone(),
);

registry.register(
"explicit_peers",
"Peers added as explicit peers in gossipsub (1 = active, i64::MIN = disconnected)",
explicit_peers.clone(),
);

Self {
local_node_info,
discovered_peers: peer_info,
peer_mesh_membership: mesh_membership,
explicit_peers,
peer_slots: Slots::new(MAX_PEER_SLOTS),
}
}
Expand Down Expand Up @@ -222,6 +239,24 @@ impl Metrics {
}
}

/// Record a peer as an explicit peer in gossipsub
pub(crate) fn record_explicit_peer(&self, peer_id: &PeerId, moniker: &str) {
let labels = ExplicitPeerLabels {
peer_id: peer_id.to_string(),
peer_moniker: moniker.to_string(),
};
self.explicit_peers.get_or_create(&labels).set(1);
}

/// Mark an explicit peer as stale (disconnected)
pub(crate) fn mark_explicit_peer_stale(&self, peer_id: &PeerId, moniker: &str) {
let labels = ExplicitPeerLabels {
peer_id: peer_id.to_string(),
peer_moniker: moniker.to_string(),
};
self.explicit_peers.get_or_create(&labels).set(i64::MIN);
}

/// Record metrics for a new peer (assigns slot if needed).
pub(crate) fn record_new_peer(&mut self, peer_id: &PeerId, peer_info: &PeerInfo) {
let slot = if let Some(existing_slot) = self.peer_slots.get(peer_id) {
Expand Down
21 changes: 14 additions & 7 deletions code/crates/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ pub struct PeerInfo {
pub connection_direction: Option<ConnectionDirection>, // None if ephemeral (unknown)
pub score: f64,
pub topics: HashSet<String>, // Set of topics peer is in mesh for (e.g., "/consensus", "/liveness")
pub is_explicit: bool, // Whether this peer is an explicit peer in gossipsub
}

impl PeerInfo {
/// Format peer info with peer_id for logging
/// Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score
/// Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score, Explicit
pub fn format_with_peer_id(&self, peer_id: &libp2p::PeerId) -> String {
let direction = self.connection_direction.map_or("??", |d| d.as_str());
let mut topics: Vec<&str> = self.topics.iter().map(|s| s.as_str()).collect();
Expand All @@ -106,16 +107,18 @@ impl PeerInfo {
} else {
&self.consensus_address
};
let explicit = if self.is_explicit { "explicit" } else { "" };
format!(
"{}, {}, {}, {}, {}, {}, {}, {}",
"{}, {}, {}, {}, {}, {}, {}, {}, {}",
self.address,
self.moniker,
peer_type_str,
peer_id,
address,
topics_str,
direction,
self.score as i64
self.score as i64,
explicit
)
}
}
Expand Down Expand Up @@ -414,8 +417,8 @@ impl State {
agent_info.address.clone()
};

// If peer already exists (additional connection), just update Identify-provided fields.
// Keep existing state since they never fully disconnected.
// If peer already exists (additional connection), update Identify-provided fields.
// Keep existing state (topics) since they never fully disconnected.
if let Some(existing) = self.peer_info.get_mut(&peer_id) {
let old_peer_info = existing.clone();
existing.moniker = agent_info.moniker;
Expand All @@ -426,11 +429,14 @@ impl State {
existing.address = address;
existing.connection_direction = connection_direction;
}
// Preserve: peer_type, consensus_address, score, topics
// Re-evaluate peer type and consensus address with current state
existing.peer_type = peer_type;
existing.consensus_address = consensus_address;
existing.score = crate::peer_scoring::get_peer_score(peer_type);

self.metrics
.update_peer_labels(&peer_id, &old_peer_info, existing);
return crate::peer_scoring::get_peer_score(existing.peer_type);
return existing.score;
}

// New peer - create entry
Expand All @@ -443,6 +449,7 @@ impl State {
connection_direction,
score,
topics: Default::default(),
is_explicit: false,
};

// Record peer information in metrics (subject to 100 slot limit)
Expand Down
2 changes: 2 additions & 0 deletions code/crates/starknet/host/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ async fn spawn_network_actor(
mesh_n_low: config.mesh_n_low(),
mesh_outbound_min: config.mesh_outbound_min(),
enable_peer_scoring: config.enable_peer_scoring(),
enable_explicit_peering: config.enable_explicit_peering(),
enable_flood_publish: config.enable_flood_publish(),
},
config::PubSubProtocol::Broadcast => gossip::GossipSubConfig::default(),
},
Expand Down
Loading