Skip to content
Merged
55 changes: 46 additions & 9 deletions crates/floresta-wire/src/p2p_wire/address_man.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bitcoin::p2p::ServiceFlags;
use bitcoin::Network;
use floresta_chain::DnsSeed;
use floresta_common::service_flags;
use rand::seq::IteratorRandom;
use serde::Deserialize;
use serde::Serialize;
use tracing::debug;
Expand Down Expand Up @@ -738,15 +739,16 @@ impl AddressMan {
}

fn get_address_by_service(&self, service: ServiceFlags) -> Option<(usize, LocalAddress)> {
let peers = self.good_peers_by_service.get(&service)?;
if peers.is_empty() {
return None;
}

let idx = rand::random::<usize>() % peers.len();
let utreexo_peer = peers.get(idx)?;
let candidates = self.good_peers_by_service.get(&service)?;

Some((*utreexo_peer, self.addresses.get(utreexo_peer)?.to_owned()))
candidates
.iter()
.filter_map(|id| {
let addr = self.addresses.get(id)?;
(addr.state != AddressState::Connected).then_some((id, addr))
})
.choose(&mut rand::thread_rng())
.map(|(id, addr)| (*id, addr.to_owned()))
}
Comment on lines 741 to 752
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation

Comment on lines 741 to 752
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compact version, avoiding calling self.addresses.get twice

    fn get_address_by_service(&self, service: ServiceFlags) -> Option<(usize, LocalAddress)> {
        let candidates = self.good_peers_by_service.get(&service)?;

        candidates
            .iter()
            .filter_map(|id| {
                let addr = self.addresses.get(id)?;
                (addr.state != AddressState::Connected).then_some((id, addr))
            })
            .choose(&mut rand::thread_rng())
            .map(|(id, addr)| (*id, addr.to_owned()))
    }


pub fn start_addr_man(&mut self, datadir: String) -> Vec<LocalAddress> {
Expand Down Expand Up @@ -965,7 +967,7 @@ impl AddressMan {
/// Updates the service flags after we receive a version message
pub fn update_set_service_flag(&mut self, idx: usize, flags: ServiceFlags) -> &mut Self {
// if this peer turns out to not have the minimum required services, we remove it
if !flags.has(ServiceFlags::NETWORK) || !flags.has(ServiceFlags::WITNESS) {
if !flags.has(ServiceFlags::NETWORK_LIMITED) || !flags.has(ServiceFlags::WITNESS) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does NETWORK also imply NETWORK_LIMITED? If not, NETWORK should remain there and NETWORK_LIMITED should be added.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we require NETWORK we will drop all pruned nodes from addr_man

self.addresses.remove(&idx);
for peers in self.peers_by_service.values_mut() {
peers.retain(|&x| x != idx);
Expand Down Expand Up @@ -1316,6 +1318,41 @@ mod test {
}
}

#[test]
fn test_adding_fixed_peer() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this supersedes test_add_fixed_addresses, right? We should just add here the not empty check to keep this strictly a super-set of checks

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, test_add_fixed_addresses is using AddressMan::add_fixed_addresses, this new test does it manually to have access to the raw data

let signet_addresses =
load_addresses_from_json("./src/p2p_wire/seeds/signet_seeds.json").unwrap();

let mut addr_man =
AddressMan::new(None, &[ReachableNetworks::IPv4, ReachableNetworks::IPv6]);
addr_man.add_fixed_addresses(Network::Signet);

assert_eq!(addr_man.good_addresses.len(), signet_addresses.len());

let utreexo_addresses = signet_addresses
.iter()
.filter(|address| address.services.has(service_flags::UTREEXO.into()))
.collect::<Vec<_>>();

assert_eq!(
addr_man
.good_peers_by_service
.get(&service_flags::UTREEXO.into())
.unwrap()
.len(),
utreexo_addresses.len()
);

assert_eq!(
addr_man
.peers_by_service
.get(&service_flags::UTREEXO.into())
.unwrap()
.len(),
utreexo_addresses.len()
);
}

#[test]
fn test_parse() {
let signet_address =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ where
}

PeerMessages::Ready(version) => {
self.handle_peer_ready(peer, &version)?;
self.handle_peer_ready(peer, version)?;
if matches!(self.context.state, ChainSelectorState::LookingForForks(_)) {
let locator = self.chain.get_block_locator().unwrap();
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))?;
Expand Down
16 changes: 4 additions & 12 deletions crates/floresta-wire/src/p2p_wire/node/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ where
/// This is only done if we don't have any peers for a long time, or we
/// can't find a Utreexo peer in a context we need them. This function
/// won't do anything if `--connect` was used
fn maybe_use_hardcoded_addresses(&mut self, needs_utreexo: bool) {
fn maybe_use_hardcoded_addresses(&mut self) {
if self.fixed_peer.is_some() {
return;
}
Expand All @@ -558,18 +558,11 @@ where
return;
}

let has_peers = !self.peers.is_empty();
// Return if we have peers and utreexo isn't needed OR we have utreexo peers
if has_peers && (!needs_utreexo || self.has_utreexo_peers()) {
if self.address_man.enough_addresses() {
return;
}

let mut wait = HARDCODED_ADDRESSES_GRACE_PERIOD;
if needs_utreexo {
// This gives some extra time for the node to try connections after chain selection
wait += Duration::from_secs(60);
}

let wait = HARDCODED_ADDRESSES_GRACE_PERIOD;
if self.startup_time.elapsed() < wait {
return;
}
Expand Down Expand Up @@ -629,8 +622,7 @@ where
// If we've tried getting some connections, but the addresses we have are not
// working. Try getting some more addresses from DNS
self.maybe_ask_dns_seed_for_addresses();
let needs_utreexo = required_service.has(service_flags::UTREEXO.into());
self.maybe_use_hardcoded_addresses(needs_utreexo);
self.maybe_use_hardcoded_addresses();

for _ in 0..T::NEW_CONNECTIONS_BATCH_SIZE {
// Ignore the error so we don't break out of the loop
Expand Down
85 changes: 50 additions & 35 deletions crates/floresta-wire/src/p2p_wire/node/peer_man.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::address_man::AddressState;
use crate::address_man::LocalAddress;
use crate::block_proof::Bitmap;
use crate::node::running_ctx::RunningNode;
use crate::node::try_and_log;
use crate::node_context::NodeContext;
use crate::node_context::PeerId;
use crate::node_interface::NodeResponse;
Expand Down Expand Up @@ -212,7 +213,7 @@ where
pub(crate) fn handle_peer_ready(
&mut self,
peer: u32,
version: &Version,
mut version: Version,
) -> Result<(), WireError> {
self.inflight.remove(&InflightRequests::Connect(peer));

Expand All @@ -226,6 +227,30 @@ where
self.inflight
.insert(InflightRequests::GetAddresses, (peer, Instant::now()));

let good_peers_count = self.connected_peers();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.connected_peers() should return the actual peers, not the count.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or be renamed to reflect that it returns a count, we use it in several places already. Fells a bit off-topic for this PR tho

if good_peers_count > T::MAX_OUTGOING_PEERS {
// We allow utreexo, extra and manual peers to bypass our connection limits
let is_utreexo_peer = matches!(version.kind, ConnectionKind::Regular(services) if services.has(service_flags::UTREEXO.into()));
let is_manual_peer = version.kind == ConnectionKind::Manual;
let is_extra = version.kind == ConnectionKind::Extra;

if !(is_utreexo_peer || is_manual_peer || is_extra) {
debug!(
"Already have {} peers, disconnecting peer to avoid blowing up our max of {}",
good_peers_count,
T::MAX_OUTGOING_PEERS
);

// If a peer exceeds our max, just turn them into a feeler so we can receive their
// AddrV2 message and then disconnect.
self.peers.entry(peer).and_modify(|p| {
p.kind = ConnectionKind::Feeler;
});

version.kind = ConnectionKind::Feeler;
}
}

if version.kind == ConnectionKind::Feeler {
Comment on lines +246 to 254
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you didn't actually fix it, we modify the kind to be feeler on self.peers, but then we check against the Version message which won't be feeler.

Also what if this is an extra connection, we shouldn't convert it into a feeler even if we have excess peers.

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -249,27 +274,6 @@ where
return Ok(());
}

let good_peers_count = self.connected_peers();
if good_peers_count > T::MAX_OUTGOING_PEERS {
// Don't allow our node to have more than T::MAX_OUTGOING_PEERS, unless this is a
// manual peer, those can exceed our quota.
if version.kind != ConnectionKind::Manual {
debug!(
"Already have {} peers, disconnecting peer to avoid blowing up our max of {}",
good_peers_count,
T::MAX_OUTGOING_PEERS
);

// If a peer exceeds our max, just turn them into a feeler so we can receive their
// AddrV2 message and then disconnect.
self.peers.entry(peer).and_modify(|p| {
p.kind = ConnectionKind::Feeler;
});

return Ok(());
}
}

info!(
"New peer id={} version={} blocks={} services={}",
version.id, version.user_agent, version.blocks, version.services
Expand Down Expand Up @@ -479,7 +483,12 @@ where

for req in inflight {
self.inflight.remove(&req.0);
self.redo_inflight_request(req.0.clone())?;

if let Err(e) = self.redo_inflight_request(&req.0) {
// CRITICAL: never drop the request, so we retry it later
self.inflight.insert(req.0, req.1);
return Err(e);
}
}

#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -565,7 +574,7 @@ where
.collect::<Vec<_>>();

for req in timed_out {
let Some((peer, _)) = self.inflight.remove(&req) else {
let Some((peer, time)) = self.inflight.remove(&req) else {
continue;
};

Expand All @@ -587,8 +596,14 @@ where
}

debug!("Request timed out: {req:?}");
self.increase_banscore(peer, 1)?;
self.redo_inflight_request(req)?;
// Increase the banscore and try banning the peer if needed, then re-request
try_and_log!(self.increase_banscore(peer, 1));

if let Err(e) = self.redo_inflight_request(&req) {
// CRITICAL: never drop the request, so we retry it later
self.inflight.insert(req, (peer, time));
return Err(e);
}
}

Ok(())
Expand Down Expand Up @@ -617,39 +632,39 @@ where
Ok(())
}

pub(crate) fn redo_inflight_request(&mut self, req: InflightRequests) -> Result<(), WireError> {
pub(crate) fn redo_inflight_request(
&mut self,
req: &InflightRequests,
) -> Result<(), WireError> {
match req {
InflightRequests::UtreexoProof(block_hash) => {
if !self.has_utreexo_peers() {
return Ok(());
}

if !self.blocks.contains_key(&block_hash) {
if !self.blocks.contains_key(block_hash) {
// If we don't have the block anymore, we can't ask for the proof
return Ok(());
}

if self
.inflight
.contains_key(&InflightRequests::UtreexoProof(block_hash))
{
if self.inflight.contains_key(req) {
// If we already have an inflight request for this block, we don't need to redo it
return Ok(());
}

let peer = self.send_to_fast_peer(
NodeRequest::GetBlockProof((block_hash, Bitmap::new(), Bitmap::new())),
NodeRequest::GetBlockProof((*block_hash, Bitmap::new(), Bitmap::new())),
service_flags::UTREEXO.into(),
)?;

self.inflight.insert(
InflightRequests::UtreexoProof(block_hash),
InflightRequests::UtreexoProof(*block_hash),
(peer, Instant::now()),
);
}

InflightRequests::Blocks(block) => {
self.request_blocks(vec![block])?;
self.request_blocks(vec![*block])?;
}

InflightRequests::Headers => {
Expand Down
2 changes: 1 addition & 1 deletion crates/floresta-wire/src/p2p_wire/node/running_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ where
"handshake with peer={peer} succeeded feeler={:?}",
version.kind
);
self.handle_peer_ready(peer, &version)?;
self.handle_peer_ready(peer, version)?;
}

PeerMessages::Disconnected(idx) => {
Expand Down
5 changes: 2 additions & 3 deletions crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ impl NodeContext for SyncNode {
ServiceFlags::NETWORK | ServiceFlags::WITNESS | service_flags::UTREEXO_ARCHIVE.into()
}

const TRY_NEW_CONNECTION: u64 = 30; // 30 seconds
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 9be7162 you just removed TRY_NEW_CONNECTION, but where is the new 10 second value coming from?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In node_context.rs it is set to 10s by default

const REQUEST_TIMEOUT: u64 = 60; // 1 minute
const REQUEST_TIMEOUT: u64 = 60 * 2; // 2 minutes
const MAX_INFLIGHT_REQUESTS: usize = 100; // double the default

// A more conservative value than the default of 1 second, since we'll have many peer messages
Expand Down Expand Up @@ -321,7 +320,7 @@ where
}

PeerMessages::Ready(version) => {
try_and_log!(self.handle_peer_ready(peer, &version));
try_and_log!(self.handle_peer_ready(peer, version));
}

PeerMessages::Disconnected(idx) => {
Expand Down
8 changes: 4 additions & 4 deletions crates/floresta-wire/src/p2p_wire/seeds/signet_seeds.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
"state": {
"Tried": 0
},
"services": 1037,
"services": 1033,
Copy link
Copy Markdown
Member

@JoseSK999 JoseSK999 Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we can't connect to these, if these don't support P2Pv2. Just noting this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to test all those peers to check if they are still alive. A little offtopic for this PR tho

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #907 for this

"port": 38333
},
{
Expand Down Expand Up @@ -139,7 +139,7 @@
"state": {
"Tried": 0
},
"services": 0,
"services": 1033,
"port": 38333
},
{
Expand All @@ -150,7 +150,7 @@
"state": {
"Tried": 0
},
"services": 0,
"services": 1033,
"port": 38333
},
{
Expand All @@ -161,7 +161,7 @@
"state": {
"Tried": 0
},
"services": 0,
"services": 1033,
"port": 38333
}
]
Loading