diff --git a/crates/dev-utils/examples/submit_work.rs b/crates/dev-utils/examples/submit_work.rs index 0d00cd52..aa3b489c 100644 --- a/crates/dev-utils/examples/submit_work.rs +++ b/crates/dev-utils/examples/submit_work.rs @@ -54,7 +54,6 @@ async fn main() -> Result<()> { let call = contracts .compute_pool .build_work_submission_call(pool_id, node, work_key, U256::from(179949060096000.0)) - .await .map_err(|e| eyre::eyre!("Failed to build work submission call: {}", e))?; let tx = call diff --git a/crates/discovery/src/chainsync/sync.rs b/crates/discovery/src/chainsync/sync.rs index a90d4b28..55812824 100644 --- a/crates/discovery/src/chainsync/sync.rs +++ b/crates/discovery/src/chainsync/sync.rs @@ -121,7 +121,7 @@ impl ChainSync { } } - pub async fn run(self) -> Result<(), Error> { + pub fn run(self) -> Result<(), Error> { let ChainSync { node_store, cancel_token, diff --git a/crates/discovery/src/main.rs b/crates/discovery/src/main.rs index 56fd9ee4..e0a0b100 100644 --- a/crates/discovery/src/main.rs +++ b/crates/discovery/src/main.rs @@ -119,7 +119,7 @@ async fn main() -> Result<()> { contracts.clone(), last_chain_sync.clone(), ); - chain_sync.run().await?; + chain_sync.run()?; // Start location enrichment service if enabled if let Some(location_url) = args.location_service_url.clone() { diff --git a/crates/orchestrator/src/metrics/webhook_sender.rs b/crates/orchestrator/src/metrics/webhook_sender.rs index a1fb1af0..bc7c2f83 100644 --- a/crates/orchestrator/src/metrics/webhook_sender.rs +++ b/crates/orchestrator/src/metrics/webhook_sender.rs @@ -70,9 +70,7 @@ impl MetricsWebhookSender { if Self::metrics_changed(&metrics, &self.last_sent_metrics) { info!("Sending {} metrics via webhook", metrics.len()); for plugin in &self.webhook_plugins { - let _ = plugin - .send_metrics_updated(self.pool_id, metrics.clone()) - .await; + let _ = plugin.send_metrics_updated(self.pool_id, metrics.clone()); } // Update last sent metrics self.last_sent_metrics = metrics.clone(); diff --git a/crates/orchestrator/src/plugins/node_groups/mod.rs b/crates/orchestrator/src/plugins/node_groups/mod.rs index f9318ff6..32f7d34e 100644 --- a/crates/orchestrator/src/plugins/node_groups/mod.rs +++ b/crates/orchestrator/src/plugins/node_groups/mod.rs @@ -622,20 +622,13 @@ impl NodeGroupsPlugin { for group in webhook_groups { if let Some(plugins) = &self.webhook_plugins { for plugin in plugins.iter() { - let group_clone = group.clone(); - let plugin_clone = plugin.clone(); - tokio::spawn(async move { - if let Err(e) = plugin_clone - .send_group_created( - group_clone.id.to_string(), - group_clone.configuration_name.to_string(), - group_clone.nodes.iter().cloned().collect(), - ) - .await - { - error!("Failed to send group created webhook: {e}"); - } - }); + if let Err(e) = plugin.send_group_created( + group.id.clone(), + group.configuration_name.clone(), + group.nodes.iter().cloned().collect(), + ) { + error!("Failed to send group created webhook: {}", e); + } } } } @@ -981,51 +974,36 @@ impl NodeGroupsPlugin { } // Send webhook notifications - self.send_merge_webhooks(groups_to_merge, &merged_group) - .await; + self.send_merge_webhooks(groups_to_merge, &merged_group); Ok(Some((merged_group, group_ids_to_dissolve))) } /// Send webhook notifications for group merging - async fn send_merge_webhooks(&self, dissolved_groups: &[NodeGroup], merged_group: &NodeGroup) { + fn send_merge_webhooks(&self, dissolved_groups: &[NodeGroup], merged_group: &NodeGroup) { if let Some(plugins) = &self.webhook_plugins { // Send dissolved notifications for group in dissolved_groups { for plugin in plugins.iter() { - let plugin_clone = plugin.clone(); - let group_clone = group.clone(); - tokio::spawn(async move { - if let Err(e) = plugin_clone - .send_group_destroyed( - group_clone.id, - group_clone.configuration_name, - group_clone.nodes.iter().cloned().collect(), - ) - .await - { - error!("Failed to send group dissolved webhook: {e}"); - } - }); + if let Err(e) = plugin.send_group_destroyed( + group.id.clone(), + group.configuration_name.clone(), + group.nodes.iter().cloned().collect(), + ) { + error!("Failed to send group dissolved webhook: {}", e); + } } } // Send created notification for plugin in plugins.iter() { - let plugin_clone = plugin.clone(); - let group_clone = merged_group.clone(); - tokio::spawn(async move { - if let Err(e) = plugin_clone - .send_group_created( - group_clone.id, - group_clone.configuration_name, - group_clone.nodes.iter().cloned().collect(), - ) - .await - { - error!("Failed to send group created webhook: {e}"); - } - }); + if let Err(e) = plugin.send_group_created( + merged_group.id.clone(), + merged_group.configuration_name.clone(), + merged_group.nodes.iter().cloned().collect(), + ) { + error!("Failed to send group created webhook: {}", e); + } } } } @@ -1098,20 +1076,13 @@ impl NodeGroupsPlugin { ); if let Some(plugins) = &self.webhook_plugins { for plugin in plugins.iter() { - let plugin_clone = plugin.clone(); - let group_clone = group.clone(); - tokio::spawn(async move { - if let Err(e) = plugin_clone - .send_group_destroyed( - group_clone.id.to_string(), - group_clone.configuration_name.to_string(), - group_clone.nodes.iter().cloned().collect(), - ) - .await - { - error!("Failed to send group dissolved webhook: {e}"); - } - }); + if let Err(e) = plugin.send_group_destroyed( + group.id.clone(), + group.configuration_name.clone(), + group.nodes.iter().cloned().collect(), + ) { + error!("Failed to send group dissolved webhook: {}", e); + } } } } else { diff --git a/crates/orchestrator/src/plugins/webhook/mod.rs b/crates/orchestrator/src/plugins/webhook/mod.rs index e96c4d2d..97665139 100644 --- a/crates/orchestrator/src/plugins/webhook/mod.rs +++ b/crates/orchestrator/src/plugins/webhook/mod.rs @@ -206,33 +206,24 @@ impl WebhookPlugin { Err(error) } - async fn send_event(&self, event: WebhookEvent) -> Result<(), Error> { + fn send_event(&self, event: WebhookEvent) -> Result<(), Error> { let payload = WebhookPayload::new(event); - - #[cfg(not(test))] - { - let webhook_url = self.webhook_url.clone(); - let client = self.client.clone(); - tokio::spawn(async move { - let plugin = WebhookPlugin { - webhook_url, - client, - }; - if let Err(e) = plugin.send_with_retry(payload).await { - // Error already logged in send_with_retry - let _ = e; - } - }); - Ok(()) - } - - #[cfg(test)] - { - self.send_with_retry(payload).await - } + let webhook_url = self.webhook_url.clone(); + let client = self.client.clone(); + tokio::spawn(async move { + let plugin = WebhookPlugin { + webhook_url, + client, + }; + if let Err(e) = plugin.send_with_retry(payload).await { + // Error already logged in send_with_retry + let _ = e; + } + }); + Ok(()) } - pub async fn send_node_status_changed( + pub fn send_node_status_changed( &self, node: &OrchestratorNode, old_status: &NodeStatus, @@ -244,11 +235,10 @@ impl WebhookPlugin { old_status: old_status.to_string(), new_status: node.status.to_string(), }; - - self.send_event(event).await + self.send_event(event) } - pub async fn send_group_created( + pub fn send_group_created( &self, group_id: String, configuration_name: String, @@ -259,11 +249,10 @@ impl WebhookPlugin { configuration_name, nodes, }; - - self.send_event(event).await + self.send_event(event) } - pub async fn send_group_destroyed( + pub fn send_group_destroyed( &self, group_id: String, configuration_name: String, @@ -274,18 +263,16 @@ impl WebhookPlugin { configuration_name, nodes, }; - - self.send_event(event).await + self.send_event(event) } - pub async fn send_metrics_updated( + pub fn send_metrics_updated( &self, pool_id: u32, metrics: std::collections::HashMap, ) -> Result<(), Error> { let event = WebhookEvent::MetricsUpdated { pool_id, metrics }; - - self.send_event(event).await + self.send_event(event) } } @@ -305,7 +292,7 @@ impl StatusUpdatePlugin for WebhookPlugin { return Ok(()); } - if let Err(e) = self.send_node_status_changed(node, old_status).await { + if let Err(e) = self.send_node_status_changed(node, old_status) { error!("Failed to send webhook to {}: {}", self.webhook_url, e); } @@ -331,8 +318,15 @@ mod tests { } } + impl WebhookPlugin { + async fn send_event_blocking(&self, event: WebhookEvent) -> Result<(), Error> { + let payload = WebhookPayload::new(event); + self.send_with_retry(payload).await + } + } + #[tokio::test] - async fn test_webhook_sends_on_status_change() -> Result<()> { + async fn test_webhook_sends_on_status_change() { let mut server = Server::new_async().await; let _mock = server @@ -358,11 +352,10 @@ mod tests { let node = create_test_node(NodeStatus::Healthy); let result = plugin.handle_status_change(&node, &NodeStatus::Dead).await; assert!(result.is_ok()); - Ok(()) } #[tokio::test] - async fn test_webhook_sends_on_group_created() -> Result<()> { + async fn test_webhook_sends_on_group_created() { let mut server = Server::new_async().await; let _mock = server .mock("POST", "/webhook") @@ -385,15 +378,17 @@ mod tests { let group_id = "1234567890"; let configuration_name = "test_configuration"; let nodes = vec!["0x1234567890123456789012345678901234567890".to_string()]; - let result = plugin - .send_group_created(group_id.to_string(), configuration_name.to_string(), nodes) - .await; + let event = WebhookEvent::GroupCreated { + group_id: group_id.to_string(), + configuration_name: configuration_name.to_string(), + nodes, + }; + let result = plugin.send_event_blocking(event).await; assert!(result.is_ok()); - Ok(()) } #[tokio::test] - async fn test_webhook_sends_on_metrics_updated() -> Result<()> { + async fn test_webhook_sends_on_metrics_updated() { let mut server = Server::new_async().await; let mock = server .mock("POST", "/webhook") @@ -416,15 +411,18 @@ mod tests { let mut metrics = std::collections::HashMap::new(); metrics.insert("test_metric".to_string(), 1.0); metrics.insert("metric_2".to_string(), 2.0); - let result = plugin.send_metrics_updated(1, metrics).await; + let event = WebhookEvent::MetricsUpdated { + pool_id: 1, + metrics, + }; + let result = plugin.send_event_blocking(event).await; assert!(result.is_ok()); mock.assert_async().await; - Ok(()) } #[tokio::test] - async fn test_with_bearer_token() -> Result<()> { + async fn test_with_bearer_token() { let mut server = Server::new_async().await; let mock = server .mock("POST", "/webhook") @@ -448,14 +446,17 @@ mod tests { let mut metrics = std::collections::HashMap::new(); metrics.insert("test_metric".to_string(), 1.0); metrics.insert("metric_2".to_string(), 2.0); - let result = plugin.send_metrics_updated(1, metrics).await; + let event = WebhookEvent::MetricsUpdated { + pool_id: 1, + metrics, + }; + let result = plugin.send_event_blocking(event).await; assert!(result.is_ok()); mock.assert_async().await; - Ok(()) } #[tokio::test] - async fn test_webhook_retry_logic() -> Result<()> { + async fn test_webhook_retry_logic() { let mut server = Server::new_async().await; // First two attempts fail, third succeeds @@ -484,13 +485,17 @@ mod tests { let mut metrics = std::collections::HashMap::new(); metrics.insert("test_metric".to_string(), 1.0); - let result = plugin.send_metrics_updated(1, metrics).await; + let result = plugin + .send_event_blocking(WebhookEvent::MetricsUpdated { + pool_id: 1, + metrics, + }) + .await; assert!(result.is_ok()); - Ok(()) } #[tokio::test] - async fn test_webhook_max_retries_exceeded() -> Result<()> { + async fn test_webhook_max_retries_exceeded() { let mut server = Server::new_async().await; // All attempts fail @@ -507,8 +512,12 @@ mod tests { let mut metrics = std::collections::HashMap::new(); metrics.insert("test_metric".to_string(), 1.0); - let result = plugin.send_metrics_updated(1, metrics).await; + let result = plugin + .send_event_blocking(WebhookEvent::MetricsUpdated { + pool_id: 1, + metrics, + }) + .await; assert!(result.is_err()); - Ok(()) } } diff --git a/crates/orchestrator/src/status_update/mod.rs b/crates/orchestrator/src/status_update/mod.rs index edfc4858..2567f69e 100644 --- a/crates/orchestrator/src/status_update/mod.rs +++ b/crates/orchestrator/src/status_update/mod.rs @@ -66,7 +66,7 @@ impl NodeStatusUpdater { } #[cfg(test)] - async fn is_node_in_pool(&self, _: &OrchestratorNode) -> bool { + fn is_node_in_pool(&self, _: &OrchestratorNode) -> bool { true } @@ -85,6 +85,9 @@ impl NodeStatusUpdater { &self, node: &OrchestratorNode, ) -> Result<(), anyhow::Error> { + #[cfg(test)] + let node_in_pool = self.is_node_in_pool(node); + #[cfg(not(test))] let node_in_pool = self.is_node_in_pool(node).await; if node_in_pool { match self @@ -115,6 +118,9 @@ impl NodeStatusUpdater { let nodes = self.store_context.node_store.get_nodes().await?; for node in nodes { if node.status == NodeStatus::Dead { + #[cfg(test)] + let node_in_pool = self.is_node_in_pool(&node); + #[cfg(not(test))] let node_in_pool = self.is_node_in_pool(&node).await; debug!("Node {:?} is in pool: {}", node.address, node_in_pool); if node_in_pool { @@ -151,6 +157,9 @@ impl NodeStatusUpdater { .get_unhealthy_counter(&node.address) .await?; + #[cfg(test)] + let is_node_in_pool = self.is_node_in_pool(&node); + #[cfg(not(test))] let is_node_in_pool = self.is_node_in_pool(&node).await; let mut status_changed = false; let mut new_status = node.status.clone(); diff --git a/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs b/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs index fa609b2b..ff0a20ce 100644 --- a/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs +++ b/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs @@ -404,7 +404,7 @@ impl ComputePool { Ok(result) } - pub async fn build_work_submission_call( + pub fn build_work_submission_call( &self, pool_id: U256, node: Address, diff --git a/crates/validator/src/validators/synthetic_data/chain_operations.rs b/crates/validator/src/validators/synthetic_data/chain_operations.rs index 4708e830..dbca615f 100644 --- a/crates/validator/src/validators/synthetic_data/chain_operations.rs +++ b/crates/validator/src/validators/synthetic_data/chain_operations.rs @@ -1,6 +1,21 @@ use super::*; impl SyntheticDataValidator { + #[cfg(test)] + pub fn soft_invalidate_work(&self, work_key: &str) -> Result<(), Error> { + info!("Soft invalidating work: {}", work_key); + + if self.disable_chain_invalidation { + info!("Chain invalidation is disabled, skipping work soft invalidation"); + return Ok(()); + } + + info!("Test mode: skipping actual work soft invalidation"); + let _ = &self.prime_network; + Ok(()) + } + + #[cfg(not(test))] pub async fn soft_invalidate_work(&self, work_key: &str) -> Result<(), Error> { info!("Soft invalidating work: {work_key}"); @@ -9,47 +24,37 @@ impl SyntheticDataValidator { return Ok(()); } - // Special case for tests - skip actual blockchain interaction - #[cfg(test)] - { - info!("Test mode: skipping actual work soft invalidation"); - let _ = &self.prime_network; - Ok(()) - } + let work_info = self + .get_work_info_from_redis(work_key) + .await? + .ok_or_else(|| Error::msg("Work info not found for soft invalidation"))?; + let work_key_bytes = hex::decode(work_key) + .map_err(|e| Error::msg(format!("Failed to decode hex work key: {}", e)))?; + + // Create 64-byte payload: work_key (32 bytes) + work_units (32 bytes) + let mut data = Vec::with_capacity(64); + data.extend_from_slice(&work_key_bytes); - #[cfg(not(test))] + // Convert work_units to 32-byte representation + let work_units_bytes = work_info.work_units.to_be_bytes::<32>(); + data.extend_from_slice(&work_units_bytes); + + match self + .prime_network + .soft_invalidate_work(self.pool_id, data) + .await { - let work_info = self - .get_work_info_from_redis(work_key) - .await? - .ok_or_else(|| Error::msg("Work info not found for soft invalidation"))?; - let work_key_bytes = hex::decode(work_key) - .map_err(|e| Error::msg(format!("Failed to decode hex work key: {e}")))?; - - // Create 64-byte payload: work_key (32 bytes) + work_units (32 bytes) - let mut data = Vec::with_capacity(64); - data.extend_from_slice(&work_key_bytes); - - // Convert work_units to 32-byte representation - let work_units_bytes = work_info.work_units.to_be_bytes::<32>(); - data.extend_from_slice(&work_units_bytes); - - match self - .prime_network - .soft_invalidate_work(self.pool_id, data) - .await - { - Ok(_) => Ok(()), - Err(e) => { - error!("Failed to soft invalidate work {work_key}: {e}"); - Err(Error::msg(format!("Failed to soft invalidate work: {e}"))) - } + Ok(_) => Ok(()), + Err(e) => { + error!("Failed to soft invalidate work {}: {}", work_key, e); + Err(Error::msg(format!("Failed to soft invalidate work: {}", e))) } } } - pub async fn invalidate_work(&self, work_key: &str) -> Result<(), Error> { - info!("Invalidating work: {work_key}"); + #[cfg(test)] + pub fn invalidate_work(&self, work_key: &str) -> Result<(), Error> { + info!("Invalidating work: {}", work_key); if let Some(metrics) = &self.metrics { metrics.record_work_key_invalidation(); @@ -60,28 +65,36 @@ impl SyntheticDataValidator { return Ok(()); } - #[cfg(test)] - { - info!("Test mode: skipping actual work invalidation"); - let _ = &self.prime_network; - let _ = &self.penalty; - Ok(()) + info!("Test mode: skipping actual work invalidation"); + let _ = &self.prime_network; + let _ = &self.penalty; + Ok(()) + } + + #[cfg(not(test))] + pub async fn invalidate_work(&self, work_key: &str) -> Result<(), Error> { + info!("Invalidating work: {work_key}"); + + if let Some(metrics) = &self.metrics { + metrics.record_work_key_invalidation(); + } + + if self.disable_chain_invalidation { + info!("Chain invalidation is disabled, skipping work invalidation"); + return Ok(()); } - #[cfg(not(test))] + let data = hex::decode(work_key) + .map_err(|e| Error::msg(format!("Failed to decode hex work key: {}", e)))?; + match self + .prime_network + .invalidate_work(self.pool_id, self.penalty, data) + .await { - let data = hex::decode(work_key) - .map_err(|e| Error::msg(format!("Failed to decode hex work key: {e}")))?; - match self - .prime_network - .invalidate_work(self.pool_id, self.penalty, data) - .await - { - Ok(_) => Ok(()), - Err(e) => { - error!("Failed to invalidate work {work_key}: {e}"); - Err(Error::msg(format!("Failed to invalidate work: {e}"))) - } + Ok(_) => Ok(()), + Err(e) => { + error!("Failed to invalidate work {}: {}", work_key, e); + Err(Error::msg(format!("Failed to invalidate work: {}", e))) } } } @@ -92,7 +105,13 @@ impl SyntheticDataValidator { invalidation_type: InvalidationType, ) -> Result<(), Error> { match invalidation_type { + #[cfg(test)] + InvalidationType::Soft => self.soft_invalidate_work(work_key), + #[cfg(not(test))] InvalidationType::Soft => self.soft_invalidate_work(work_key).await, + #[cfg(test)] + InvalidationType::Hard => self.invalidate_work(work_key), + #[cfg(not(test))] InvalidationType::Hard => self.invalidate_work(work_key).await, } } diff --git a/crates/validator/src/validators/synthetic_data/mod.rs b/crates/validator/src/validators/synthetic_data/mod.rs index 0f2c2f81..a7c432de 100644 --- a/crates/validator/src/validators/synthetic_data/mod.rs +++ b/crates/validator/src/validators/synthetic_data/mod.rs @@ -529,6 +529,11 @@ impl SyntheticDataValidator { const MAX_ATTEMPTS: i64 = 60; if attempts >= MAX_ATTEMPTS { // If we've tried too many times, soft invalidate the work and update its status + #[cfg(test)] + if let Err(_e) = self.soft_invalidate_work(work_key) { + // Test mode: error handling skipped + } + #[cfg(not(test))] if let Err(e) = self.soft_invalidate_work(work_key).await { error!( "Failed to soft invalidate work after max filename resolution attempts: {e}" @@ -1009,7 +1014,7 @@ impl SyntheticDataValidator { Ok(rejected_nodes) } - async fn handle_group_toploc_acceptance( + fn handle_group_toploc_acceptance( &self, group: &ToplocGroup, status: &GroupValidationResult, @@ -1038,22 +1043,20 @@ impl SyntheticDataValidator { } if !work_units_match { - let nodes_with_wrong_work_unit_claims = self - .handle_work_units_mismatch( - group, - status, - total_claimed_units, - node_work_units, - output_flops_u256, - ) - .await?; + let nodes_with_wrong_work_unit_claims = self.handle_work_units_mismatch( + group, + status, + total_claimed_units, + node_work_units, + output_flops_u256, + )?; Ok(nodes_with_wrong_work_unit_claims) } else { Ok(Vec::new()) } } - async fn handle_work_units_mismatch( + fn handle_work_units_mismatch( &self, group: &ToplocGroup, status: &GroupValidationResult, @@ -1290,15 +1293,13 @@ impl SyntheticDataValidator { let rejected_nodes = self.handle_group_toploc_rejection(&group, &status).await?; toploc_nodes_to_invalidate.extend(rejected_nodes); } else if status.status == ValidationResult::Accept { - let wrong_claim_nodes = self - .handle_group_toploc_acceptance( - &group, - &status, - total_claimed_units, - &node_work_units, - &toploc_config_name, - ) - .await?; + let wrong_claim_nodes = self.handle_group_toploc_acceptance( + &group, + &status, + total_claimed_units, + &node_work_units, + &toploc_config_name, + )?; nodes_with_wrong_work_unit_claims.extend(wrong_claim_nodes); } @@ -1563,6 +1564,25 @@ impl SyntheticDataValidator { for work_key in work_keys { // Soft invalidate (less penalty than hard invalidation) + #[cfg(test)] + if let Err(_e) = self.soft_invalidate_work(&work_key) { + // Test mode: error handling skipped + } else { + // Mark work as soft invalidated due to incomplete group + if let Err(e) = self + .update_work_validation_status( + &work_key, + &ValidationResult::IncompleteGroup, + ) + .await + { + error!( + "Failed to update work validation status for {}: {}", + work_key, e + ); + } + } + #[cfg(not(test))] if let Err(e) = self.soft_invalidate_work(&work_key).await { error!("Failed to soft invalidate work key {work_key}: {e}"); } else { diff --git a/crates/validator/src/validators/synthetic_data/tests/mod.rs b/crates/validator/src/validators/synthetic_data/tests/mod.rs index 806eaec5..a589076f 100644 --- a/crates/validator/src/validators/synthetic_data/tests/mod.rs +++ b/crates/validator/src/validators/synthetic_data/tests/mod.rs @@ -1016,7 +1016,7 @@ async fn test_incomplete_group_status_tracking() -> Result<(), Error> { assert_eq!(work_keys.len(), 1); // Soft invalidate and set status to IncompleteGroup - validator.soft_invalidate_work(FILE_SHA_1).await?; + validator.soft_invalidate_work(FILE_SHA_1)?; validator .update_work_validation_status(FILE_SHA_1, &ValidationResult::IncompleteGroup) .await?; diff --git a/crates/worker/src/cli/command.rs b/crates/worker/src/cli/command.rs index efaec645..7ed1b3a3 100644 --- a/crates/worker/src/cli/command.rs +++ b/crates/worker/src/cli/command.rs @@ -688,8 +688,8 @@ pub async fn execute_command( } }; - if let Err(e) = p2p_service.start().await { - error!("❌ Failed to start P2P listener: {e}"); + if let Err(e) = p2p_service.start() { + error!("❌ Failed to start P2P listener: {}", e); std::process::exit(1); } diff --git a/crates/worker/src/docker/taskbridge/bridge.rs b/crates/worker/src/docker/taskbridge/bridge.rs index 5278ffdc..29ea51e8 100644 --- a/crates/worker/src/docker/taskbridge/bridge.rs +++ b/crates/worker/src/docker/taskbridge/bridge.rs @@ -85,7 +85,7 @@ impl TaskBridge { } Ok(()) } - async fn handle_file_upload(self: Arc, json_str: &str) -> Result<()> { + fn handle_file_upload(self: Arc, json_str: &str) -> Result<()> { debug!("Handling file upload"); if let Ok(file_info) = serde_json::from_str::(json_str) { let task_id = file_info["task_id"].as_str().unwrap_or("unknown"); @@ -175,8 +175,8 @@ impl TaskBridge { async fn handle_message(self: Arc, json_str: &str) -> Result<()> { debug!("Extracted JSON object: {json_str}"); if json_str.contains("output/save_path") { - if let Err(e) = self.handle_file_upload(json_str).await { - error!("Failed to handle file upload: {e}"); + if let Err(e) = self.handle_file_upload(json_str) { + error!("Failed to handle file upload: {}", e); } } else { debug!("Processing metric message"); diff --git a/crates/worker/src/docker/taskbridge/file_handler.rs b/crates/worker/src/docker/taskbridge/file_handler.rs index ae0e95bf..857c04ae 100644 --- a/crates/worker/src/docker/taskbridge/file_handler.rs +++ b/crates/worker/src/docker/taskbridge/file_handler.rs @@ -356,7 +356,6 @@ pub async fn handle_file_validation( decoded_sha.to_vec(), U256::from(work_units), ) - .await .unwrap(); let tx = retry_call(call, 20, provider.clone(), None) diff --git a/crates/worker/src/p2p/service.rs b/crates/worker/src/p2p/service.rs index 85479ab5..c6ad45e2 100644 --- a/crates/worker/src/p2p/service.rs +++ b/crates/worker/src/p2p/service.rs @@ -143,7 +143,7 @@ impl P2PService { Ok(endpoint) } /// Start accepting incoming connections with automatic recovery - pub async fn start(&self) -> Result<()> { + pub fn start(&self) -> Result<()> { let service = Arc::new(self.clone()); let cancellation_token = self.cancellation_token.clone(); @@ -696,7 +696,7 @@ mod tests { let random_nonce = rand_v8::thread_rng().gen::(); tokio::spawn(async move { - service.start().await.unwrap(); + service.start().unwrap(); }); let ping = P2PMessage::Ping { @@ -723,7 +723,7 @@ mod tests { let addresses = service.listening_addresses().to_vec(); tokio::spawn(async move { - service.start().await.unwrap(); + service.start().unwrap(); }); let ping = P2PMessage::Ping {