From 309ab94068eb19de8643fceca9a0c70256966cdf Mon Sep 17 00:00:00 2001 From: JannikSt Date: Tue, 24 Jun 2025 09:03:24 +1000 Subject: [PATCH 1/6] add additional status update loop metrics (#575) --- crates/orchestrator/src/main.rs | 2 ++ crates/orchestrator/src/metrics/mod.rs | 25 ++++++++++++++++++-- crates/orchestrator/src/status_update/mod.rs | 22 ++++++++++++++++- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index ea26cee6..5a2d7378 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -381,6 +381,7 @@ async fn main() -> Result<()> { let status_update_store_context = store_context.clone(); let status_update_heartbeats = heartbeats.clone(); + let status_update_metrics = metrics_context.clone(); tasks.spawn({ let contracts = contracts.clone(); async move { @@ -393,6 +394,7 @@ async fn main() -> Result<()> { args.disable_ejection, status_update_heartbeats.clone(), status_updater_plugins, + status_update_metrics, ); status_updater.run().await } diff --git a/crates/orchestrator/src/metrics/mod.rs b/crates/orchestrator/src/metrics/mod.rs index 1f49d0b8..8e942cb0 100644 --- a/crates/orchestrator/src/metrics/mod.rs +++ b/crates/orchestrator/src/metrics/mod.rs @@ -1,4 +1,4 @@ -use prometheus::{CounterVec, GaugeVec, Opts, Registry, TextEncoder}; +use prometheus::{CounterVec, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry, TextEncoder}; pub mod sync_service; pub mod webhook_sender; @@ -14,6 +14,7 @@ pub struct MetricsContext { pub heartbeat_requests_total: CounterVec, pub nodes_per_task: GaugeVec, pub task_state: GaugeVec, + pub status_update_execution_time: HistogramVec, } impl MetricsContext { @@ -90,7 +91,6 @@ impl MetricsContext { &["task_id", "task_name", "pool_id"], ) .unwrap(); - let task_state = GaugeVec::new( Opts::new( "orchestrator_task_state", @@ -100,6 +100,19 @@ impl MetricsContext { ) .unwrap(); + let status_update_execution_time = HistogramVec::new( + HistogramOpts::new( + "orchestrator_status_update_execution_time_seconds", + "Duration of status update execution", + ) + .buckets(vec![ + 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 15.0, 30.0, 45.0, 60.0, 90.0, + 120.0, + ]), + &["node_address", "pool_id"], + ) + .unwrap(); + let registry = Registry::new(); let _ = registry.register(Box::new(compute_task_gauges.clone())); let _ = registry.register(Box::new(task_info.clone())); @@ -110,6 +123,7 @@ impl MetricsContext { let _ = registry.register(Box::new(heartbeat_requests_total.clone())); let _ = registry.register(Box::new(nodes_per_task.clone())); let _ = registry.register(Box::new(task_state.clone())); + let _ = registry.register(Box::new(status_update_execution_time.clone())); Self { compute_task_gauges, @@ -123,6 +137,7 @@ impl MetricsContext { heartbeat_requests_total, nodes_per_task, task_state, + status_update_execution_time, } } @@ -218,6 +233,12 @@ impl MetricsContext { self.compute_task_gauges.reset(); } + pub fn record_status_update_execution_time(&self, node_address: &str, duration: f64) { + self.status_update_execution_time + .with_label_values(&[node_address, &self.pool_id]) + .observe(duration); + } + /// Clear all orchestrator statistics metrics pub fn clear_orchestrator_statistics(&self) { self.nodes_total.reset(); diff --git a/crates/orchestrator/src/status_update/mod.rs b/crates/orchestrator/src/status_update/mod.rs index 8dc55852..837533ac 100644 --- a/crates/orchestrator/src/status_update/mod.rs +++ b/crates/orchestrator/src/status_update/mod.rs @@ -1,3 +1,4 @@ +use crate::metrics::MetricsContext; use crate::models::node::{NodeStatus, OrchestratorNode}; use crate::plugins::StatusUpdatePlugin; use crate::store::core::StoreContext; @@ -7,7 +8,7 @@ use shared::web3::contracts::core::builder::Contracts; use shared::web3::wallet::WalletProvider; use std::result::Result; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::time::interval; pub struct NodeStatusUpdater { @@ -19,6 +20,7 @@ pub struct NodeStatusUpdater { disable_ejection: bool, heartbeats: Arc, plugins: Vec>, + metrics: Arc, } impl NodeStatusUpdater { @@ -32,6 +34,7 @@ impl NodeStatusUpdater { disable_ejection: bool, heartbeats: Arc, plugins: Vec>, + metrics: Arc, ) -> Self { Self { store_context, @@ -42,6 +45,7 @@ impl NodeStatusUpdater { disable_ejection, heartbeats, plugins, + metrics, } } @@ -133,6 +137,7 @@ impl NodeStatusUpdater { pub async fn process_nodes(&self) -> Result<(), anyhow::Error> { let nodes = self.store_context.node_store.get_nodes().await?; for node in nodes { + let start_time = Instant::now(); let node = node.clone(); let old_status = node.status.clone(); let heartbeat = self @@ -313,6 +318,12 @@ impl NodeStatusUpdater { } } } + // Record status update execution time + let duration = start_time.elapsed(); + self.metrics.record_status_update_execution_time( + &node.address.to_string(), + duration.as_secs_f64(), + ); } Ok(()) } @@ -346,6 +357,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); let node = OrchestratorNode { address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), @@ -450,6 +462,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater @@ -502,6 +515,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater @@ -570,6 +584,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater @@ -648,6 +663,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater @@ -734,6 +750,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater @@ -817,6 +834,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater @@ -894,6 +912,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater @@ -962,6 +981,7 @@ mod tests { false, Arc::new(LoopHeartbeats::new(&mode)), vec![], + app_state.metrics.clone(), ); tokio::spawn(async move { updater From e4e720875ae587fdea9482d74ddde7fdb14622da Mon Sep 17 00:00:00 2001 From: JannikSt Date: Tue, 24 Jun 2025 09:03:39 +1000 Subject: [PATCH 2/6] increase heartbeat timeout (#574) --- crates/orchestrator/src/store/domains/heartbeat_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/orchestrator/src/store/domains/heartbeat_store.rs b/crates/orchestrator/src/store/domains/heartbeat_store.rs index 8ea273a6..a54cb769 100644 --- a/crates/orchestrator/src/store/domains/heartbeat_store.rs +++ b/crates/orchestrator/src/store/domains/heartbeat_store.rs @@ -29,7 +29,7 @@ impl HeartbeatStore { con.set_options::<_, _, ()>( &key, payload_string, - redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(60)), + redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(180)), ) .await .map_err(|_| anyhow!("Failed to set options"))?; From 0fe349a55bbe1c2c2ff6bf0792e5a882b0938b92 Mon Sep 17 00:00:00 2001 From: JannikSt Date: Tue, 24 Jun 2025 10:41:01 +1000 Subject: [PATCH 3/6] switch to sequential validation as quickfix for nonce issues (#576) --- crates/validator/src/validators/hardware.rs | 52 +++++++++------------ 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/crates/validator/src/validators/hardware.rs b/crates/validator/src/validators/hardware.rs index a3bf8c4c..d673de9c 100644 --- a/crates/validator/src/validators/hardware.rs +++ b/crates/validator/src/validators/hardware.rs @@ -1,6 +1,5 @@ use alloy::primitives::Address; use anyhow::Result; -use futures::future::join_all; use log::{debug, error, info}; use shared::{ models::node::DiscoveryNode, @@ -9,8 +8,6 @@ use shared::{ wallet::{Wallet, WalletProvider}, }, }; -use std::sync::Arc; -use tokio::sync::Semaphore; use crate::p2p::client::P2PClient; use crate::validators::hardware_challenge::HardwareChallenge; @@ -80,6 +77,8 @@ impl<'a> HardwareValidator<'a> { ); } + debug!("Sending validation transaction for node {}", node.id); + if let Err(e) = contracts .prime_network .validate_node(provider_address, node_address) @@ -89,6 +88,9 @@ impl<'a> HardwareValidator<'a> { return Err(anyhow::anyhow!("Failed to validate node: {}", e)); } + // Small delay to ensure nonce incrementation + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + info!("Node {} successfully validated", node.id); Ok(()) } @@ -96,38 +98,25 @@ impl<'a> HardwareValidator<'a> { pub async fn validate_nodes(&self, nodes: Vec) -> Result<()> { let non_validated: Vec<_> = nodes.into_iter().filter(|n| !n.is_validated).collect(); debug!("Non validated nodes: {:?}", non_validated); + info!("Starting validation for {} nodes", non_validated.len()); + let contracts = self.contracts.clone(); let wallet = self.wallet; let p2p_client = self.p2p_client; - let semaphore = Arc::new(Semaphore::new(10)); - let futures = non_validated - .into_iter() - .map(|node| { - let node_clone = node.clone(); - let contracts_clone = contracts.clone(); - let permit = semaphore.clone(); - - async move { - let _permit = permit.acquire().await; - - match HardwareValidator::validate_node( - wallet, - contracts_clone, - p2p_client, - node_clone, - ) - .await - { - Ok(_) => (), - Err(e) => { - error!("Failed to validate node: {}", e); - } - } - } - }) - .collect::>(); - join_all(futures).await; + // Process non validated nodes sequentially as simple fix + // to avoid nonce conflicts for now. Will sophisticate this in the future + for node in non_validated { + let node_id = node.id.clone(); + match HardwareValidator::validate_node(wallet, contracts.clone(), p2p_client, node) + .await + { + Ok(_) => (), + Err(e) => { + error!("Failed to validate node {}: {}", node_id, e); + } + } + } Ok(()) } } @@ -138,6 +127,7 @@ mod tests { use shared::models::node::Node; use shared::web3::contracts::core::builder::ContractBuilder; use shared::web3::wallet::Wallet; + use std::sync::Arc; use url::Url; #[tokio::test] From 735dd3e1c6d9ecb8bf13f2ddaf02013896bc3b13 Mon Sep 17 00:00:00 2001 From: JannikSt Date: Tue, 24 Jun 2025 12:39:18 +1000 Subject: [PATCH 4/6] imp(worker): host nw mode with ability to switch networking config (#577) * host nw mode with ability to switch networking config --- crates/worker/src/cli/command.rs | 6 ++++++ crates/worker/src/docker/docker_manager.rs | 20 +++++++++++++++++++- crates/worker/src/docker/service.rs | 7 ++++++- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/crates/worker/src/cli/command.rs b/crates/worker/src/cli/command.rs index d3ed6b32..222ceb65 100644 --- a/crates/worker/src/cli/command.rs +++ b/crates/worker/src/cli/command.rs @@ -113,6 +113,10 @@ pub enum Commands { /// Storage path for worker data (overrides automatic selection) #[arg(long)] storage_path: Option, + + /// Disable host network mode + #[arg(long, default_value = "false")] + disable_host_network_mode: bool, }, Check {}, @@ -192,6 +196,7 @@ pub async fn execute_command( loki_url: _, log_level: _, storage_path, + disable_host_network_mode, } => { if *disable_state_storing && !(*no_auto_recover) { Console::user_error( @@ -449,6 +454,7 @@ pub async fn execute_command( .address() .to_string(), state.get_p2p_seed(), + *disable_host_network_mode, )); let bridge_cancellation_token = cancellation_token.clone(); diff --git a/crates/worker/src/docker/docker_manager.rs b/crates/worker/src/docker/docker_manager.rs index f385899d..8d4c5f27 100644 --- a/crates/worker/src/docker/docker_manager.rs +++ b/crates/worker/src/docker/docker_manager.rs @@ -46,6 +46,15 @@ pub struct ContainerDetails { pub struct DockerManager { docker: Docker, storage_path: String, + /// Controls whether to use host network mode for containers. + /// + /// Currently defaults to host mode (when false) to work around performance issues + /// with Docker bridge networking on certain cloud providers. This is a trade-off + /// between security isolation and performance. + /// + /// TODO: Investigate root cause of bridge network performance degradation and + /// implement a more optimal solution that maintains security isolation. + disable_host_network_mode: bool, } impl DockerManager { @@ -128,7 +137,7 @@ impl DockerManager { } /// Create a new DockerManager instance - pub fn new(storage_path: String) -> Result { + pub fn new(storage_path: String, disable_host_network_mode: bool) -> Result { let docker = match Docker::connect_with_unix_defaults() { Ok(docker) => docker, Err(e) => { @@ -159,6 +168,7 @@ impl DockerManager { Ok(Self { docker, storage_path, + disable_host_network_mode, }) } @@ -385,6 +395,12 @@ impl DockerManager { Some(binds) }; + let network_mode = if self.disable_host_network_mode { + "bridge".to_string() + } else { + "host".to_string() + }; + let host_config = if gpu.is_some() { let gpu = gpu.unwrap(); let device_ids = match &gpu.indices { @@ -399,6 +415,7 @@ impl DockerManager { }; Some(HostConfig { + network_mode: Some(network_mode), extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]), device_requests: Some(vec![DeviceRequest { driver: Some("nvidia".into()), @@ -417,6 +434,7 @@ impl DockerManager { }) } else { Some(HostConfig { + network_mode: Some(network_mode), extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]), binds: volume_binds, restart_policy: Some(bollard::models::RestartPolicy { diff --git a/crates/worker/src/docker/service.rs b/crates/worker/src/docker/service.rs index 59b9cf26..fb75c8c9 100644 --- a/crates/worker/src/docker/service.rs +++ b/crates/worker/src/docker/service.rs @@ -31,6 +31,7 @@ const TASK_PREFIX: &str = "prime-task"; const RESTART_INTERVAL_SECONDS: i64 = 10; impl DockerService { + #[allow(clippy::too_many_arguments)] pub fn new( cancellation_token: CancellationToken, gpu: Option, @@ -39,8 +40,10 @@ impl DockerService { storage_path: String, node_address: String, p2p_seed: Option, + disable_host_network_mode: bool, ) -> Self { - let docker_manager = Arc::new(DockerManager::new(storage_path).unwrap()); + let docker_manager = + Arc::new(DockerManager::new(storage_path, disable_host_network_mode).unwrap()); Self { docker_manager, cancellation_token, @@ -429,6 +432,7 @@ mod tests { "/tmp/test-storage".to_string(), Address::ZERO.to_string(), None, + false, ); let task = Task { image: "ubuntu:latest".to_string(), @@ -477,6 +481,7 @@ mod tests { "/tmp/test-storage".to_string(), Address::ZERO.to_string(), Some(12345), // p2p_seed for testing + false, ); // Test command argument replacement From c55590c9fbc29e4ca8156dd3a031c3c7c4376752 Mon Sep 17 00:00:00 2001 From: JannikSt Date: Tue, 24 Jun 2025 13:06:28 +1000 Subject: [PATCH 5/6] imp(general): Orchestrator Discovery Sync, Worker Timeouts (#578) * fix race condition in discovery sync, decrease heartbeat ttl, increase general auth mw request timeout * increase heartbeat timeout --- crates/orchestrator/src/discovery/monitor.rs | 53 ++++++++++++++----- .../src/store/domains/heartbeat_store.rs | 2 +- .../src/security/auth_signature_middleware.rs | 3 +- .../src/operations/heartbeat/service.rs | 2 +- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/crates/orchestrator/src/discovery/monitor.rs b/crates/orchestrator/src/discovery/monitor.rs index d5fee629..a5f31b29 100644 --- a/crates/orchestrator/src/discovery/monitor.rs +++ b/crates/orchestrator/src/discovery/monitor.rs @@ -309,23 +309,40 @@ impl DiscoveryMonitor { if !discovery_node.is_active && existing_node.status == NodeStatus::Healthy { // Node is active False but we have it in store and it is healthy // This means that the node likely got kicked by e.g. the validator - // We simply remove it from the store now and will rediscover it later? - info!( - "Node {} is no longer active on chain, marking as ejected", - node_address - ); - if !discovery_node.is_provider_whitelisted { - if let Err(e) = self - .update_node_status(&node_address, NodeStatus::Ejected) + // Add a grace period check to avoid immediately marking nodes that just became healthy + let should_mark_inactive = + if let Some(last_status_change) = existing_node.last_status_change { + let grace_period = chrono::Duration::minutes(5); // 5 minute grace period + let now = chrono::Utc::now(); + now.signed_duration_since(last_status_change) > grace_period + } else { + // If no last_status_change, assume it's been healthy for a while + true + }; + + if should_mark_inactive { + info!( + "Node {} is no longer active on chain, marking as ejected", + node_address + ); + if !discovery_node.is_provider_whitelisted { + if let Err(e) = self + .update_node_status(&node_address, NodeStatus::Ejected) + .await + { + error!("Error updating node status: {}", e); + } + } else if let Err(e) = self + .update_node_status(&node_address, NodeStatus::Dead) .await { error!("Error updating node status: {}", e); } - } else if let Err(e) = self - .update_node_status(&node_address, NodeStatus::Dead) - .await - { - error!("Error updating node status: {}", e); + } else { + info!( + "Node {} is no longer active on chain but recently became healthy, waiting before marking inactive", + node_address + ); } } @@ -359,6 +376,16 @@ impl DiscoveryMonitor { ) { if last_change < last_updated { info!("Node {} is dead but has been updated on discovery, marking as discovered", node_address); + + if existing_node.compute_specs != discovery_node.compute_specs { + info!( + "Node {} compute specs changed, marking as discovered", + node_address + ); + let mut node = existing_node.clone(); + node.compute_specs = discovery_node.compute_specs.clone(); + let _ = self.store_context.node_store.add_node(node.clone()).await; + } if let Err(e) = self .update_node_status(&node_address, NodeStatus::Discovered) .await diff --git a/crates/orchestrator/src/store/domains/heartbeat_store.rs b/crates/orchestrator/src/store/domains/heartbeat_store.rs index a54cb769..1011dc86 100644 --- a/crates/orchestrator/src/store/domains/heartbeat_store.rs +++ b/crates/orchestrator/src/store/domains/heartbeat_store.rs @@ -29,7 +29,7 @@ impl HeartbeatStore { con.set_options::<_, _, ()>( &key, payload_string, - redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(180)), + redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(90)), ) .await .map_err(|_| anyhow!("Failed to set options"))?; diff --git a/crates/shared/src/security/auth_signature_middleware.rs b/crates/shared/src/security/auth_signature_middleware.rs index 8732ae20..af63e318 100644 --- a/crates/shared/src/security/auth_signature_middleware.rs +++ b/crates/shared/src/security/auth_signature_middleware.rs @@ -32,6 +32,7 @@ const MAX_NONCE_LENGTH: usize = 64; const MIN_NONCE_LENGTH: usize = 16; const RATE_LIMIT_WINDOW_SECS: u64 = 60; const MAX_REQUESTS_PER_WINDOW: usize = 100; +const REQUEST_EXPIRY_SECS: u64 = 300; type SyncAddressValidator = Arc bool + Send + Sync>; type AsyncAddressValidator = Arc LocalBoxFuture<'static, bool> + Send + Sync>; @@ -445,7 +446,7 @@ where .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); - if current_time - timestamp > 10 { + if current_time - timestamp > REQUEST_EXPIRY_SECS { return Err(ErrorBadRequest(json!({ "error": "Request expired", "code": "REQUEST_EXPIRED", diff --git a/crates/worker/src/operations/heartbeat/service.rs b/crates/worker/src/operations/heartbeat/service.rs index 6e5a291f..9c828cdd 100644 --- a/crates/worker/src/operations/heartbeat/service.rs +++ b/crates/worker/src/operations/heartbeat/service.rs @@ -44,7 +44,7 @@ impl HeartbeatService { state: Arc, ) -> Result, HeartbeatError> { let client = Client::builder() - .timeout(Duration::from_secs(5)) // 5 second timeout + .timeout(Duration::from_secs(20)) .build() .map_err(|_| HeartbeatError::InitFailed)?; // Adjusted to match the expected error type From 7726382e34ed7133229b8f9a03fe048c9cebc3c7 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 24 Jun 2025 13:27:49 +1000 Subject: [PATCH 6/6] release 0.3.7 --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bdb56f42..b2bd37ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2376,7 +2376,7 @@ dependencies = [ [[package]] name = "discovery" -version = "0.3.6" +version = "0.3.7" dependencies = [ "actix-web", "alloy", @@ -4882,7 +4882,7 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestrator" -version = "0.3.6" +version = "0.3.7" dependencies = [ "actix-web", "actix-web-prometheus", @@ -7679,7 +7679,7 @@ dependencies = [ [[package]] name = "validator" -version = "0.3.6" +version = "0.3.7" dependencies = [ "actix-web", "alloy", @@ -8496,7 +8496,7 @@ dependencies = [ [[package]] name = "worker" -version = "0.3.6" +version = "0.3.7" dependencies = [ "actix-web", "alloy", diff --git a/Cargo.toml b/Cargo.toml index b2ac203b..eddd2267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ iroh = "0.34.1" rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] } rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] } [workspace.package] -version = "0.3.6" +version = "0.3.7" edition = "2021" [workspace.features]