From cf29b4563477c185543f1e28030e3fb046922cb0 Mon Sep 17 00:00:00 2001 From: jason Date: Sat, 27 Dec 2025 21:38:56 +0800 Subject: [PATCH 1/5] beetswap reconnect --- lattica/src/network/core.rs | 50 ++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/lattica/src/network/core.rs b/lattica/src/network/core.rs index 9bd6140..7e7d61b 100644 --- a/lattica/src/network/core.rs +++ b/lattica/src/network/core.rs @@ -814,6 +814,47 @@ impl Lattica { Err(anyhow!("Failed to reconnect to peer {}", peer_id)) } + /// Ensure bootstrap and relay nodes are connected, reconnect if necessary + async fn ensure_network_connected(&self) { + // Reconnect bootstrap nodes + for addr in &self.config.bootstrap_nodes { + if let Some(Protocol::P2p(peer_id)) = addr.iter().last() { + let (tx, rx) = oneshot::channel(); + if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_err() { + continue; + } + let is_connected = rx.await.unwrap_or(false); + + if !is_connected { + tracing::info!("Bootstrap {} not connected, reconnecting...", peer_id); + let (tx, rx) = oneshot::channel(); + if self.cmd.try_send(Command::Dial(addr.clone(), tx)).is_ok() { + let _ = tokio::time::timeout(Duration::from_secs(5), rx).await; + } + } + } + } + + // Reconnect relay servers (dial to establish connection) + for addr in &self.config.relay_servers { + if let Some(Protocol::P2p(peer_id)) = addr.iter().last() { + let (tx, rx) = oneshot::channel(); + if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_err() { + continue; + } + let is_connected = rx.await.unwrap_or(false); + + if !is_connected { + tracing::info!("Relay {} not connected, reconnecting...", peer_id); + let (tx, rx) = oneshot::channel(); + if self.cmd.try_send(Command::Dial(addr.clone(), tx)).is_ok() { + let _ = tokio::time::timeout(Duration::from_secs(5), rx).await; + } + } + } + } + } + async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { // check swarm let (tx, rx) = oneshot::channel(); @@ -1091,6 +1132,9 @@ impl Lattica { address_book.info(peer_id)?.rtt() } + /// Get a block from the network + /// + /// On timeout, this method will trigger bootstrap reconnection for subsequent retries. pub async fn get_block( &self, cid: &Cid, @@ -1112,10 +1156,14 @@ impl Lattica { Err(anyhow!("Receiver channel closed")) } Err(_) => { - // Timeout occurred, cancel the query + // Timeout - cancel query and trigger reconnection for next attempt if let Some(qid) = query_id { let _ = self.cmd.try_send(Command::CancelGet(qid)); } + + // Trigger bootstrap reconnection in background for next retry + let _ = self.ensure_network_connected().await; + Err(anyhow!( "get_block timeout: block not found or request timed out after {:?}", timeout From 0a11e2cfbf91edc4185401d47f4b5584375c7d79 Mon Sep 17 00:00:00 2001 From: jason Date: Mon, 29 Dec 2025 14:14:09 +0800 Subject: [PATCH 2/5] get block, connect all peers --- lattica/src/network/core.rs | 119 +++++++++++++++++++++++++++++------- 1 file changed, 96 insertions(+), 23 deletions(-) diff --git a/lattica/src/network/core.rs b/lattica/src/network/core.rs index 7e7d61b..18c8135 100644 --- a/lattica/src/network/core.rs +++ b/lattica/src/network/core.rs @@ -13,7 +13,7 @@ use chrono::Utc; use cid::Cid; use fnv::FnvHashMap; use futures::io::WriteHalf; -use futures::{AsyncReadExt, AsyncWriteExt}; +use futures::{AsyncReadExt, AsyncWriteExt, future::join_all}; use libp2p::{ Multiaddr, PeerId, Stream, Swarm, SwarmBuilder, futures::StreamExt, @@ -814,45 +814,118 @@ impl Lattica { Err(anyhow!("Failed to reconnect to peer {}", peer_id)) } - /// Ensure bootstrap and relay nodes are connected, reconnect if necessary + /// Ensure bootstrap, relay nodes and known peers are connected (concurrent) async fn ensure_network_connected(&self) { - // Reconnect bootstrap nodes + // Collect all addresses to reconnect + let mut dial_futures = Vec::new(); + + // 1. Check bootstrap nodes for addr in &self.config.bootstrap_nodes { if let Some(Protocol::P2p(peer_id)) = addr.iter().last() { let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_err() { - continue; - } - let is_connected = rx.await.unwrap_or(false); - - if !is_connected { - tracing::info!("Bootstrap {} not connected, reconnecting...", peer_id); - let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::Dial(addr.clone(), tx)).is_ok() { - let _ = tokio::time::timeout(Duration::from_secs(5), rx).await; + if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_ok() { + if !rx.await.unwrap_or(false) { + tracing::info!("Bootstrap {} not connected, will reconnect...", peer_id); + dial_futures.push(self.dial_with_timeout(addr.clone(), "Bootstrap")); } } } } - // Reconnect relay servers (dial to establish connection) + // 2. Check relay servers for addr in &self.config.relay_servers { if let Some(Protocol::P2p(peer_id)) = addr.iter().last() { let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_err() { - continue; + if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_ok() { + if !rx.await.unwrap_or(false) { + tracing::info!("Relay {} not connected, will reconnect...", peer_id); + dial_futures.push(self.dial_with_timeout(addr.clone(), "Relay")); + } } - let is_connected = rx.await.unwrap_or(false); + } + } - if !is_connected { - tracing::info!("Relay {} not connected, reconnecting...", peer_id); - let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::Dial(addr.clone(), tx)).is_ok() { - let _ = tokio::time::timeout(Duration::from_secs(5), rx).await; - } + // 3. Check known peers from address book + let address_book = self.address_book.read().await; + let known_peers = address_book.peers(); + drop(address_book); + + let mut peer_reconnect_futures = Vec::new(); + for peer_id in known_peers.iter() { + // Skip bootstrap and relay nodes + let is_infra = self.config.bootstrap_nodes.iter().any(|addr| { + addr.iter().last() == Some(Protocol::P2p(*peer_id)) + }) || self.config.relay_servers.iter().any(|addr| { + addr.iter().last() == Some(Protocol::P2p(*peer_id)) + }); + if is_infra { + continue; + } + + let (tx, rx) = oneshot::channel(); + if self.cmd.try_send(Command::CheckConnection(*peer_id, tx)).is_ok() { + if !rx.await.unwrap_or(false) { + peer_reconnect_futures.push(self.try_reconnect_peer_logged(*peer_id)); } } } + + // Execute all reconnections concurrently + if !dial_futures.is_empty() || !peer_reconnect_futures.is_empty() { + let dial_count = dial_futures.len(); + let peer_count = peer_reconnect_futures.len(); + tracing::info!( + "Reconnecting {} infrastructure nodes and {} data peers concurrently...", + dial_count, + peer_count + ); + + // Run all concurrently with overall timeout + let _ = tokio::time::timeout(Duration::from_secs(10), async { + let (dial_results, peer_results) = tokio::join!( + join_all(dial_futures), + join_all(peer_reconnect_futures) + ); + + let dial_success = dial_results.iter().filter(|r| r.is_ok()).count(); + let peer_success = peer_results.iter().filter(|r| r.is_ok()).count(); + + tracing::info!( + "Reconnection complete: {}/{} infrastructure, {}/{} peers", + dial_success, dial_count, + peer_success, peer_count + ); + }).await; + } + } + + /// Dial an address with timeout + async fn dial_with_timeout(&self, addr: Multiaddr, label: &str) -> Result<()> { + let (tx, rx) = oneshot::channel(); + if self.cmd.try_send(Command::Dial(addr.clone(), tx)).is_err() { + return Err(anyhow!("Failed to send dial command")); + } + match tokio::time::timeout(Duration::from_secs(5), rx).await { + Ok(Ok(_)) => { + tracing::debug!("{} dial success: {}", label, addr); + Ok(()) + } + _ => Err(anyhow!("{} dial failed: {}", label, addr)), + } + } + + /// Try to reconnect peer with logging + async fn try_reconnect_peer_logged(&self, peer_id: PeerId) -> Result<()> { + match self.try_reconnect_peer(&peer_id, Duration::from_secs(5)).await { + Ok(_) => { + tracing::info!("Reconnected to data provider peer {}", peer_id); + Ok(()) + } + Err(e) => { + tracing::debug!("Failed to reconnect peer {}: {}", peer_id, e); + Err(e) + } + } } async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { From bbc5d70062c5baf0c4261b178c26fa78a7a1bd27 Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 30 Dec 2025 20:45:08 +0800 Subject: [PATCH 3/5] not connect bootstrap and relay again --- lattica/src/network/core.rs | 74 ++++--------------------------------- 1 file changed, 8 insertions(+), 66 deletions(-) diff --git a/lattica/src/network/core.rs b/lattica/src/network/core.rs index 18c8135..cca1a5d 100644 --- a/lattica/src/network/core.rs +++ b/lattica/src/network/core.rs @@ -814,45 +814,17 @@ impl Lattica { Err(anyhow!("Failed to reconnect to peer {}", peer_id)) } - /// Ensure bootstrap, relay nodes and known peers are connected (concurrent) + /// Ensure known data peers are connected (concurrent) + /// Note: Bootstrap and relay connections are managed by the global reconnect_timer in swarm_poll async fn ensure_network_connected(&self) { - // Collect all addresses to reconnect - let mut dial_futures = Vec::new(); - - // 1. Check bootstrap nodes - for addr in &self.config.bootstrap_nodes { - if let Some(Protocol::P2p(peer_id)) = addr.iter().last() { - let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_ok() { - if !rx.await.unwrap_or(false) { - tracing::info!("Bootstrap {} not connected, will reconnect...", peer_id); - dial_futures.push(self.dial_with_timeout(addr.clone(), "Bootstrap")); - } - } - } - } - - // 2. Check relay servers - for addr in &self.config.relay_servers { - if let Some(Protocol::P2p(peer_id)) = addr.iter().last() { - let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_ok() { - if !rx.await.unwrap_or(false) { - tracing::info!("Relay {} not connected, will reconnect...", peer_id); - dial_futures.push(self.dial_with_timeout(addr.clone(), "Relay")); - } - } - } - } - - // 3. Check known peers from address book + // Check known peers from address book let address_book = self.address_book.read().await; let known_peers = address_book.peers(); drop(address_book); let mut peer_reconnect_futures = Vec::new(); for peer_id in known_peers.iter() { - // Skip bootstrap and relay nodes + // Skip bootstrap and relay nodes (handled by global timer) let is_infra = self.config.bootstrap_nodes.iter().any(|addr| { addr.iter().last() == Some(Protocol::P2p(*peer_id)) }) || self.config.relay_servers.iter().any(|addr| { @@ -871,49 +843,19 @@ impl Lattica { } // Execute all reconnections concurrently - if !dial_futures.is_empty() || !peer_reconnect_futures.is_empty() { - let dial_count = dial_futures.len(); + if !peer_reconnect_futures.is_empty() { let peer_count = peer_reconnect_futures.len(); - tracing::info!( - "Reconnecting {} infrastructure nodes and {} data peers concurrently...", - dial_count, - peer_count - ); + tracing::info!("Reconnecting {} data peers concurrently...", peer_count); // Run all concurrently with overall timeout let _ = tokio::time::timeout(Duration::from_secs(10), async { - let (dial_results, peer_results) = tokio::join!( - join_all(dial_futures), - join_all(peer_reconnect_futures) - ); - - let dial_success = dial_results.iter().filter(|r| r.is_ok()).count(); + let peer_results = join_all(peer_reconnect_futures).await; let peer_success = peer_results.iter().filter(|r| r.is_ok()).count(); - - tracing::info!( - "Reconnection complete: {}/{} infrastructure, {}/{} peers", - dial_success, dial_count, - peer_success, peer_count - ); + tracing::info!("Reconnection complete: {}/{} peers", peer_success, peer_count); }).await; } } - /// Dial an address with timeout - async fn dial_with_timeout(&self, addr: Multiaddr, label: &str) -> Result<()> { - let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::Dial(addr.clone(), tx)).is_err() { - return Err(anyhow!("Failed to send dial command")); - } - match tokio::time::timeout(Duration::from_secs(5), rx).await { - Ok(Ok(_)) => { - tracing::debug!("{} dial success: {}", label, addr); - Ok(()) - } - _ => Err(anyhow!("{} dial failed: {}", label, addr)), - } - } - /// Try to reconnect peer with logging async fn try_reconnect_peer_logged(&self, peer_id: PeerId) -> Result<()> { match self.try_reconnect_peer(&peer_id, Duration::from_secs(5)).await { From 01534ec709baaaa4557a7c83bae22d301b10f5f3 Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 30 Dec 2025 20:48:09 +0800 Subject: [PATCH 4/5] comment change --- lattica/src/network/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lattica/src/network/core.rs b/lattica/src/network/core.rs index cca1a5d..ca6954b 100644 --- a/lattica/src/network/core.rs +++ b/lattica/src/network/core.rs @@ -1149,7 +1149,7 @@ impl Lattica { /// Get a block from the network /// - /// On timeout, this method will trigger bootstrap reconnection for subsequent retries. + /// On timeout, this method will trigger reconnection for subsequent retries. pub async fn get_block( &self, cid: &Cid, From 553589954c9f417e5929c60e2a53f0743670a470 Mon Sep 17 00:00:00 2001 From: jason Date: Mon, 5 Jan 2026 22:01:30 +0800 Subject: [PATCH 5/5] reuse ensure_direct_connection --- lattica/src/network/core.rs | 55 ++++++++++++++----------------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/lattica/src/network/core.rs b/lattica/src/network/core.rs index ca6954b..ad11e3b 100644 --- a/lattica/src/network/core.rs +++ b/lattica/src/network/core.rs @@ -834,72 +834,57 @@ impl Lattica { continue; } - let (tx, rx) = oneshot::channel(); - if self.cmd.try_send(Command::CheckConnection(*peer_id, tx)).is_ok() { - if !rx.await.unwrap_or(false) { - peer_reconnect_futures.push(self.try_reconnect_peer_logged(*peer_id)); - } - } + let peer_id = *peer_id; + peer_reconnect_futures.push(async move { + self.ensure_peer_connected(&peer_id, Duration::from_secs(5)).await + }); } // Execute all reconnections concurrently if !peer_reconnect_futures.is_empty() { let peer_count = peer_reconnect_futures.len(); - tracing::info!("Reconnecting {} data peers concurrently...", peer_count); + tracing::info!("Checking {} data peers concurrently...", peer_count); // Run all concurrently with overall timeout let _ = tokio::time::timeout(Duration::from_secs(10), async { let peer_results = join_all(peer_reconnect_futures).await; let peer_success = peer_results.iter().filter(|r| r.is_ok()).count(); - tracing::info!("Reconnection complete: {}/{} peers", peer_success, peer_count); + tracing::info!("Connection check complete: {}/{} peers", peer_success, peer_count); }).await; } } - /// Try to reconnect peer with logging - async fn try_reconnect_peer_logged(&self, peer_id: PeerId) -> Result<()> { - match self.try_reconnect_peer(&peer_id, Duration::from_secs(5)).await { - Ok(_) => { - tracing::info!("Reconnected to data provider peer {}", peer_id); - Ok(()) - } - Err(e) => { - tracing::debug!("Failed to reconnect peer {}: {}", peer_id, e); - Err(e) - } - } - } - - async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { - // check swarm + /// Ensure a peer is connected, attempting reconnection if needed + async fn ensure_peer_connected(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { let (tx, rx) = oneshot::channel(); self.cmd.try_send(Command::CheckConnection(*peer_id, tx))?; let is_connected = rx.await.unwrap_or(false); if !is_connected { - // try reconnect - tracing::debug!( - "Peer {} is not connected, attempting to reconnect...", - peer_id - ); + tracing::debug!("Peer {} is not connected, attempting to reconnect...", peer_id); self.try_reconnect_peer(peer_id, timeout).await?; - // verify status + // Verify reconnection succeeded let (tx2, rx2) = oneshot::channel(); self.cmd.try_send(Command::CheckConnection(*peer_id, tx2))?; let reconnected = rx2.await.unwrap_or(false); if !reconnected { - return Err(anyhow!( - "Failed to establish connection to peer {}", - peer_id - )); + return Err(anyhow!("Failed to establish connection to peer {}", peer_id)); } tracing::info!("Successfully reconnected to peer {}", peer_id); } - // check direct connection + Ok(()) + } + + /// Ensure a direct (non-relayed) connection to a peer + async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { + // First ensure the peer is connected + self.ensure_peer_connected(peer_id, timeout).await?; + + // Then verify it's a direct connection (not relayed) let address_book = self.address_book.read().await; if let Some(info) = address_book.info(peer_id) { let has_direct = info.addresses().any(|(_, _, _, is_relayed, _)| !is_relayed);