diff --git a/crates/orchestrator/src/node/invite.rs b/crates/orchestrator/src/node/invite.rs index 9f01e96a..a52f5397 100644 --- a/crates/orchestrator/src/node/invite.rs +++ b/crates/orchestrator/src/node/invite.rs @@ -6,6 +6,8 @@ use alloy::primitives::utils::keccak256 as keccak; use alloy::primitives::U256; use alloy::signers::Signer; use anyhow::Result; +use futures::stream; +use futures::StreamExt; use hex; use log::{debug, error, info, warn}; use reqwest::Client; @@ -20,6 +22,7 @@ use tokio::time::{interval, Duration}; // Timeout constants const REQUEST_TIMEOUT: u64 = 15; // 15 seconds for HTTP requests const CONNECTION_TIMEOUT: u64 = 10; // 10 seconds for establishing connections +const DEFAULT_INVITE_CONCURRENT_COUNT: usize = 32; // Max concurrent count of nodes being invited pub struct NodeInviter<'a> { wallet: &'a Wallet, @@ -75,7 +78,7 @@ impl<'a> NodeInviter<'a> { } } - async fn _generate_invite(&self, node: OrchestratorNode) -> Result<[u8; 65]> { + async fn _generate_invite(&self, node: &OrchestratorNode) -> Result<[u8; 65]> { let domain_id: [u8; 32] = U256::from(self.domain_id).to_be_bytes(); let pool_id: [u8; 32] = U256::from(self.pool_id).to_be_bytes(); @@ -92,8 +95,7 @@ impl<'a> NodeInviter<'a> { Ok(signature) } - async fn _send_invite(&self, node: OrchestratorNode) -> Result<(), anyhow::Error> { - let node_to_update = node.clone(); + async fn _send_invite(&self, node: &OrchestratorNode) -> Result<(), anyhow::Error> { let node_url = format!("http://{}:{}", node.ip_address, node.port); let invite_path = "/invite".to_string(); let invite_url = format!("{}{}", node_url, invite_path); @@ -151,7 +153,6 @@ impl<'a> NodeInviter<'a> { let status = response.status(); if status.is_success() { - let node = node_to_update.clone(); info!("Successfully invited node"); self.store_context .node_store @@ -185,22 +186,25 @@ impl<'a> NodeInviter<'a> { async fn process_uninvited_nodes(&self) -> Result<()> { let nodes = self.store_context.node_store.get_uninvited_nodes(); - let mut failed_nodes = Vec::new(); - for node in nodes { - // TODO: Eventually and carefully move this to tokio + let invited_nodes = stream::iter(nodes.into_iter().map(|node| async move { info!("Processing node {:?}", node.address); - match self._send_invite(node.clone()).await { + match self._send_invite(&node).await { Ok(_) => { info!("Successfully processed node {:?}", node.address); + Ok(()) } Err(e) => { error!("Failed to process node {:?}: {}", node.address, e); - failed_nodes.push((node, e)); + Err((node, e)) } } - } + })) + .buffer_unordered(DEFAULT_INVITE_CONCURRENT_COUNT) + .collect::>() + .await; + let failed_nodes: Vec<_> = invited_nodes.into_iter().filter_map(Result::err).collect(); if !failed_nodes.is_empty() { warn!( "Failed to process {} nodes: {:?}",