Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions crates/orchestrator/src/node/invite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use alloy::primitives::utils::keccak256 as keccak;
use alloy::primitives::U256;
use alloy::signers::Signer;
use anyhow::Result;
use futures::stream;
use futures::StreamExt;
use hex;
use log::{debug, error, info, warn};
use reqwest::Client;
Expand All @@ -20,6 +22,7 @@ use tokio::time::{interval, Duration};
// Timeout constants
const REQUEST_TIMEOUT: u64 = 15; // 15 seconds for HTTP requests
const CONNECTION_TIMEOUT: u64 = 10; // 10 seconds for establishing connections
const DEFAULT_INVITE_CONCURRENT_COUNT: usize = 32; // Max concurrent count of nodes being invited
Comment thread
JannikSt marked this conversation as resolved.

pub struct NodeInviter<'a> {
wallet: &'a Wallet,
Expand Down Expand Up @@ -75,7 +78,7 @@ impl<'a> NodeInviter<'a> {
}
}

async fn _generate_invite(&self, node: OrchestratorNode) -> Result<[u8; 65]> {
async fn _generate_invite(&self, node: &OrchestratorNode) -> Result<[u8; 65]> {
let domain_id: [u8; 32] = U256::from(self.domain_id).to_be_bytes();
let pool_id: [u8; 32] = U256::from(self.pool_id).to_be_bytes();

Expand All @@ -92,8 +95,7 @@ impl<'a> NodeInviter<'a> {
Ok(signature)
}

async fn _send_invite(&self, node: OrchestratorNode) -> Result<(), anyhow::Error> {
let node_to_update = node.clone();
async fn _send_invite(&self, node: &OrchestratorNode) -> Result<(), anyhow::Error> {
let node_url = format!("http://{}:{}", node.ip_address, node.port);
let invite_path = "/invite".to_string();
let invite_url = format!("{}{}", node_url, invite_path);
Expand Down Expand Up @@ -151,7 +153,6 @@ impl<'a> NodeInviter<'a> {
let status = response.status();

if status.is_success() {
let node = node_to_update.clone();
info!("Successfully invited node");
self.store_context
.node_store
Expand Down Expand Up @@ -185,22 +186,25 @@ impl<'a> NodeInviter<'a> {

async fn process_uninvited_nodes(&self) -> Result<()> {
let nodes = self.store_context.node_store.get_uninvited_nodes();
let mut failed_nodes = Vec::new();

for node in nodes {
// TODO: Eventually and carefully move this to tokio
let invited_nodes = stream::iter(nodes.into_iter().map(|node| async move {
info!("Processing node {:?}", node.address);
match self._send_invite(node.clone()).await {
match self._send_invite(&node).await {
Ok(_) => {
info!("Successfully processed node {:?}", node.address);
Ok(())
}
Err(e) => {
error!("Failed to process node {:?}: {}", node.address, e);
Comment thread
JannikSt marked this conversation as resolved.
failed_nodes.push((node, e));
Err((node, e))
}
}
}
}))
.buffer_unordered(DEFAULT_INVITE_CONCURRENT_COUNT)
.collect::<Vec<_>>()
.await;

let failed_nodes: Vec<_> = invited_nodes.into_iter().filter_map(Result::err).collect();
if !failed_nodes.is_empty() {
warn!(
"Failed to process {} nodes: {:?}",
Expand Down