From 32c22f01f29e48ad835bada3b629c977e6c50dea Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 23 Jun 2025 15:36:09 -0400 Subject: [PATCH 1/2] add balance field to DiscoveryNode and OrchestratorNode --- crates/discovery/src/api/routes/node.rs | 1 + crates/discovery/src/chainsync/sync.rs | 187 ++++++++++-------- crates/discovery/src/main.rs | 3 +- crates/orchestrator/src/models/node.rs | 11 ++ .../src/store/domains/node_store.rs | 15 +- crates/shared/src/models/node.rs | 15 +- 6 files changed, 142 insertions(+), 90 deletions(-) diff --git a/crates/discovery/src/api/routes/node.rs b/crates/discovery/src/api/routes/node.rs index 80c46c12..e6a76f36 100644 --- a/crates/discovery/src/api/routes/node.rs +++ b/crates/discovery/src/api/routes/node.rs @@ -361,6 +361,7 @@ mod tests { last_updated: None, created_at: None, location: None, + latest_balance: None, }; match app_state.node_store.update_node(validated).await { diff --git a/crates/discovery/src/chainsync/sync.rs b/crates/discovery/src/chainsync/sync.rs index 8833692c..e2a472db 100644 --- a/crates/discovery/src/chainsync/sync.rs +++ b/crates/discovery/src/chainsync/sync.rs @@ -1,5 +1,6 @@ use crate::store::node_store::NodeStore; use alloy::primitives::Address; +use alloy::providers::Provider as _; use alloy::providers::RootProvider; use anyhow::Error; use futures::stream::{self, StreamExt}; @@ -18,6 +19,7 @@ pub struct ChainSync { pub node_store: Arc, cancel_token: CancellationToken, chain_sync_interval: Duration, + provider: RootProvider, contracts: Contracts, last_chain_sync: Arc>>, } @@ -27,6 +29,7 @@ impl ChainSync { node_store: Arc, cancellation_token: CancellationToken, chain_sync_interval: Duration, + provider: RootProvider, contracts: Contracts, last_chain_sync: Arc>>, ) -> Self { @@ -34,103 +37,19 @@ impl ChainSync { node_store, cancel_token: cancellation_token, chain_sync_interval, + provider, contracts, last_chain_sync, } } - async fn sync_single_node( - node_store: Arc, - contracts: Contracts, - node: DiscoveryNode, - ) -> Result<(), Error> { - let mut n = node.clone(); - - // Safely parse provider_address and node_address - let provider_address = Address::from_str(&node.provider_address).map_err(|e| { - error!( - "Failed to parse provider address '{}': {}", - node.provider_address, e - ); - anyhow::anyhow!("Invalid provider address") - })?; - - let node_address = Address::from_str(&node.id).map_err(|e| { - error!("Failed to parse node address '{}': {}", node.id, e); - anyhow::anyhow!("Invalid node address") - })?; - - let node_info = contracts - .compute_registry - .get_node(provider_address, node_address) - .await - .map_err(|e| { - error!( - "Error retrieving node info for provider {} and node {}: {}", - provider_address, node_address, e - ); - anyhow::anyhow!("Failed to retrieve node info") - })?; - - let provider_info = contracts - .compute_registry - .get_provider(provider_address) - .await - .map_err(|e| { - error!( - "Error retrieving provider info for {}: {}", - provider_address, e - ); - anyhow::anyhow!("Failed to retrieve provider info") - })?; - - let (is_active, is_validated) = node_info; - n.is_active = is_active; - n.is_validated = is_validated; - n.is_provider_whitelisted = provider_info.is_whitelisted; - - // Handle potential errors from async calls - let is_blacklisted = contracts - .compute_pool - .is_node_blacklisted(node.node.compute_pool_id, node_address) - .await - .map_err(|e| { - error!( - "Error checking if node {} is blacklisted in pool {}: {}", - node_address, node.node.compute_pool_id, e - ); - anyhow::anyhow!("Failed to check blacklist status") - })?; - n.is_blacklisted = is_blacklisted; - - // Only update if the node has changed - if n.is_active != node.is_active - || n.is_validated != node.is_validated - || n.is_provider_whitelisted != node.is_provider_whitelisted - || n.is_blacklisted != node.is_blacklisted - { - match node_store.update_node(n).await { - Ok(_) => { - debug!("Successfully updated node {}", node.id); - Ok(()) - } - Err(e) => { - error!("Error updating node {}: {}", node.id, e); - Err(anyhow::anyhow!("Failed to update node: {}", e)) - } - } - } else { - debug!("Node {} unchanged, skipping update", node.id); - Ok(()) - } - } - pub async fn run(self) -> Result<(), Error> { let ChainSync { node_store, cancel_token, chain_sync_interval, last_chain_sync, + provider, contracts, } = self; @@ -157,9 +76,10 @@ impl ChainSync { let results: Vec> = stream::iter(nodes) .map(|node| { let node_store = node_store.clone(); + let provider = provider.clone(); let contracts = contracts.clone(); async move { - ChainSync::sync_single_node(node_store, contracts, node).await + sync_single_node(node_store, provider, contracts, node).await } }) .buffer_unordered(MAX_CONCURRENT_SYNCS) @@ -211,3 +131,96 @@ impl ChainSync { Ok(()) } } + +async fn sync_single_node( + node_store: Arc, + provider: RootProvider, + contracts: Contracts, + node: DiscoveryNode, +) -> Result<(), Error> { + let mut n = node.clone(); + + // Safely parse provider_address and node_address + let provider_address = Address::from_str(&node.provider_address).map_err(|e| { + error!( + "Failed to parse provider address '{}': {}", + node.provider_address, e + ); + anyhow::anyhow!("Invalid provider address") + })?; + + let node_address = Address::from_str(&node.id).map_err(|e| { + error!("Failed to parse node address '{}': {}", node.id, e); + anyhow::anyhow!("Invalid node address") + })?; + + let balance = provider.get_balance(node_address).await.map_err(|e| { + error!("Error retrieving balance for node {}: {}", node_address, e); + anyhow::anyhow!("Failed to retrieve node balance") + })?; + n.latest_balance = Some(balance); + + let node_info = contracts + .compute_registry + .get_node(provider_address, node_address) + .await + .map_err(|e| { + error!( + "Error retrieving node info for provider {} and node {}: {}", + provider_address, node_address, e + ); + anyhow::anyhow!("Failed to retrieve node info") + })?; + + let provider_info = contracts + .compute_registry + .get_provider(provider_address) + .await + .map_err(|e| { + error!( + "Error retrieving provider info for {}: {}", + provider_address, e + ); + anyhow::anyhow!("Failed to retrieve provider info") + })?; + + let (is_active, is_validated) = node_info; + n.is_active = is_active; + n.is_validated = is_validated; + n.is_provider_whitelisted = provider_info.is_whitelisted; + + // Handle potential errors from async calls + let is_blacklisted = contracts + .compute_pool + .is_node_blacklisted(node.node.compute_pool_id, node_address) + .await + .map_err(|e| { + error!( + "Error checking if node {} is blacklisted in pool {}: {}", + node_address, node.node.compute_pool_id, e + ); + anyhow::anyhow!("Failed to check blacklist status") + })?; + n.is_blacklisted = is_blacklisted; + + // Only update if the node has changed + if n.is_active != node.is_active + || n.is_validated != node.is_validated + || n.is_provider_whitelisted != node.is_provider_whitelisted + || n.is_blacklisted != node.is_blacklisted + { + match node_store.update_node(n).await { + Ok(_) => { + debug!("Successfully updated node {}", node.id); + Ok(()) + } + Err(e) => { + error!("Error updating node {}: {}", node.id, e); + Err(anyhow::anyhow!("Failed to update node: {}", e)) + } + } + } else { + debug!("Node {} unchanged, skipping update", node.id); + Ok(()) + } +} diff --git a/crates/discovery/src/main.rs b/crates/discovery/src/main.rs index 600bdd96..6155c987 100644 --- a/crates/discovery/src/main.rs +++ b/crates/discovery/src/main.rs @@ -97,7 +97,7 @@ async fn main() -> Result<()> { }; let provider = RootProvider::new_http(endpoint); - let contracts = ContractBuilder::new(provider) + let contracts = ContractBuilder::new(provider.clone()) .with_compute_registry() .with_ai_token() .with_prime_network() @@ -117,6 +117,7 @@ async fn main() -> Result<()> { node_store.clone(), cancellation_token.clone(), Duration::from_secs(10), + provider, contracts.clone(), last_chain_sync.clone(), ); diff --git a/crates/orchestrator/src/models/node.rs b/crates/orchestrator/src/models/node.rs index 9d463cbd..8b689ea0 100644 --- a/crates/orchestrator/src/models/node.rs +++ b/crates/orchestrator/src/models/node.rs @@ -1,4 +1,5 @@ use alloy::primitives::Address; +use alloy::primitives::U256; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use shared::models::heartbeat::TaskDetails; @@ -34,6 +35,15 @@ pub struct OrchestratorNode { pub worker_p2p_addresses: Option>, #[serde(default)] pub location: Option, + #[schema(schema_with = custom_u256)] // TODO? + pub balance: Option, +} + +fn custom_u256() -> utoipa::openapi::Object { + utoipa::openapi::ObjectBuilder::new() + .schema_type(utoipa::openapi::schema::Type::String) + .description(Some("A U256 value represented as a hexadecimal string")) + .build() } fn serialize_address(address: &Address, serializer: S) -> Result @@ -61,6 +71,7 @@ impl From for OrchestratorNode { worker_p2p_id: discovery_node.worker_p2p_id.clone(), worker_p2p_addresses: discovery_node.worker_p2p_addresses.clone(), location: discovery_node.location.clone(), + balance: discovery_node.latest_balance, } } } diff --git a/crates/orchestrator/src/store/domains/node_store.rs b/crates/orchestrator/src/store/domains/node_store.rs index e7a7296f..5afa01e1 100644 --- a/crates/orchestrator/src/store/domains/node_store.rs +++ b/crates/orchestrator/src/store/domains/node_store.rs @@ -84,7 +84,9 @@ impl NodeStore { .map_err(|e| anyhow::anyhow!("Failed to serialize location: {}", e))?; fields.push(("location".to_string(), location_json)); } - + if let Some(balance) = &node.balance { + fields.push(("balance".to_string(), balance.to_string())); + } Ok(fields) } @@ -142,6 +144,16 @@ impl NodeStore { .get("location") .and_then(|s| serde_json::from_str(s).ok()); + let balance = fields + .get("balance") + .and_then(|s| { + Some( + s.parse::() + .map_err(|_| anyhow::anyhow!("invalid balance format")), + ) + }) + .transpose()?; + Ok(OrchestratorNode { address, ip_address, @@ -158,6 +170,7 @@ impl NodeStore { worker_p2p_id, worker_p2p_addresses, location, + balance, }) } diff --git a/crates/shared/src/models/node.rs b/crates/shared/src/models/node.rs index 3a2e0804..7cd031e9 100644 --- a/crates/shared/src/models/node.rs +++ b/crates/shared/src/models/node.rs @@ -1,14 +1,16 @@ +use alloy::primitives::U256; use anyhow::anyhow; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fmt; use std::ops::Deref; use std::str::FromStr; -use utoipa::ToSchema; +use utoipa::{openapi::Object, ToSchema}; #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default, ToSchema)] pub struct Node { pub id: String, + // the node's on-chain address. pub provider_address: String, pub ip_address: String, pub port: u16, @@ -563,6 +565,15 @@ pub struct DiscoveryNode { pub created_at: Option>, #[serde(default)] pub location: Option, + #[schema(schema_with = custom_u256)] // TODO? + pub latest_balance: Option, +} + +fn custom_u256() -> Object { + utoipa::openapi::ObjectBuilder::new() + .schema_type(utoipa::openapi::schema::Type::String) + .description(Some("A U256 value represented as a hexadecimal string")) + .build() } impl DiscoveryNode { @@ -576,6 +587,7 @@ impl DiscoveryNode { last_updated: Some(Utc::now()), created_at: self.created_at, location: self.location.clone(), + latest_balance: self.latest_balance, } } } @@ -599,6 +611,7 @@ impl From for DiscoveryNode { last_updated: None, created_at: Some(Utc::now()), location: None, + latest_balance: None, } } } From 97dd19d7922543b13d03b70536aa4ec4b26d91a5 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 23 Jun 2025 16:25:43 -0400 Subject: [PATCH 2/2] add Nodestatus::LowBalance and handle in handle_status_change --- crates/orchestrator/src/discovery/monitor.rs | 16 ++++++++++++++++ crates/orchestrator/src/models/node.rs | 12 +----------- .../plugins/node_groups/status_update_impl.rs | 2 +- .../orchestrator/src/store/domains/node_store.rs | 14 -------------- crates/shared/src/models/node.rs | 7 ++++--- 5 files changed, 22 insertions(+), 29 deletions(-) diff --git a/crates/orchestrator/src/discovery/monitor.rs b/crates/orchestrator/src/discovery/monitor.rs index d5fee629..c1aa4c7f 100644 --- a/crates/orchestrator/src/discovery/monitor.rs +++ b/crates/orchestrator/src/discovery/monitor.rs @@ -4,6 +4,7 @@ use crate::plugins::StatusUpdatePlugin; use crate::store::core::StoreContext; use crate::utils::loop_heartbeats::LoopHeartbeats; use alloy::primitives::Address; +use alloy::primitives::U256; use anyhow::Error; use anyhow::Result; use chrono::Utc; @@ -368,6 +369,21 @@ impl DiscoveryMonitor { } } } + + if let Some(balance) = discovery_node.latest_balance { + if balance == U256::ZERO { + info!( + "Node {} has zero balance, marking as low balance", + node_address + ); + if let Err(e) = self + .update_node_status(&node_address, NodeStatus::LowBalance) + .await + { + error!("Error updating node status: {}", e); + } + } + } } Ok(None) => { // Don't add new node if there's already a healthy node with same IP and port diff --git a/crates/orchestrator/src/models/node.rs b/crates/orchestrator/src/models/node.rs index 8b689ea0..86ebdc8d 100644 --- a/crates/orchestrator/src/models/node.rs +++ b/crates/orchestrator/src/models/node.rs @@ -1,5 +1,4 @@ use alloy::primitives::Address; -use alloy::primitives::U256; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use shared::models::heartbeat::TaskDetails; @@ -35,15 +34,6 @@ pub struct OrchestratorNode { pub worker_p2p_addresses: Option>, #[serde(default)] pub location: Option, - #[schema(schema_with = custom_u256)] // TODO? - pub balance: Option, -} - -fn custom_u256() -> utoipa::openapi::Object { - utoipa::openapi::ObjectBuilder::new() - .schema_type(utoipa::openapi::schema::Type::String) - .description(Some("A U256 value represented as a hexadecimal string")) - .build() } fn serialize_address(address: &Address, serializer: S) -> Result @@ -71,7 +61,6 @@ impl From for OrchestratorNode { worker_p2p_id: discovery_node.worker_p2p_id.clone(), worker_p2p_addresses: discovery_node.worker_p2p_addresses.clone(), location: discovery_node.location.clone(), - balance: discovery_node.latest_balance, } } } @@ -92,6 +81,7 @@ pub enum NodeStatus { Dead, Ejected, Banned, + LowBalance, } impl Display for NodeStatus { diff --git a/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs b/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs index 568b297f..79d3e1ca 100644 --- a/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs +++ b/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs @@ -20,7 +20,7 @@ impl StatusUpdatePlugin for NodeGroupsPlugin { ); match node.status { - NodeStatus::Dead => { + NodeStatus::Dead | NodeStatus::LowBalance => { // Dissolve entire group if node becomes unhealthy if let Some(group) = self.get_node_group(&node_addr).await? { info!( diff --git a/crates/orchestrator/src/store/domains/node_store.rs b/crates/orchestrator/src/store/domains/node_store.rs index 5afa01e1..d9d5901c 100644 --- a/crates/orchestrator/src/store/domains/node_store.rs +++ b/crates/orchestrator/src/store/domains/node_store.rs @@ -84,9 +84,6 @@ impl NodeStore { .map_err(|e| anyhow::anyhow!("Failed to serialize location: {}", e))?; fields.push(("location".to_string(), location_json)); } - if let Some(balance) = &node.balance { - fields.push(("balance".to_string(), balance.to_string())); - } Ok(fields) } @@ -144,16 +141,6 @@ impl NodeStore { .get("location") .and_then(|s| serde_json::from_str(s).ok()); - let balance = fields - .get("balance") - .and_then(|s| { - Some( - s.parse::() - .map_err(|_| anyhow::anyhow!("invalid balance format")), - ) - }) - .transpose()?; - Ok(OrchestratorNode { address, ip_address, @@ -170,7 +157,6 @@ impl NodeStore { worker_p2p_id, worker_p2p_addresses, location, - balance, }) } diff --git a/crates/shared/src/models/node.rs b/crates/shared/src/models/node.rs index 7cd031e9..849a586d 100644 --- a/crates/shared/src/models/node.rs +++ b/crates/shared/src/models/node.rs @@ -565,14 +565,15 @@ pub struct DiscoveryNode { pub created_at: Option>, #[serde(default)] pub location: Option, - #[schema(schema_with = custom_u256)] // TODO? + #[schema(schema_with = u256_schema)] pub latest_balance: Option, } -fn custom_u256() -> Object { +fn u256_schema() -> Object { utoipa::openapi::ObjectBuilder::new() .schema_type(utoipa::openapi::schema::Type::String) - .description(Some("A U256 value represented as a hexadecimal string")) + .description(Some("A U256 value represented as a decimal string")) + .examples(Some(serde_json::json!("1000000000000000000"))) .build() }