From 693b91ab4598607531715ed2126599dd3ec58fc5 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Jun 2025 15:37:39 -0400 Subject: [PATCH 1/8] refactor task bridge to handle async tasks in concurrently, use JoinMap for file validation tasks --- .cargo/config.toml | 2 + Cargo.lock | 1 + crates/worker/Cargo.toml | 3 +- crates/worker/src/cli/command.rs | 5 +- crates/worker/src/docker/taskbridge/bridge.rs | 538 +++++++++++------- .../src/docker/taskbridge/file_handler.rs | 8 +- 6 files changed, 330 insertions(+), 227 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..bff29e6e --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/Cargo.lock b/Cargo.lock index e7b945c0..3dd3cb55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8539,6 +8539,7 @@ dependencies = [ "thiserror 2.0.12", "time", "tokio", + "tokio-stream", "tokio-util", "toml", "tracing", diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 2d2c8b44..63ec3521 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -34,7 +34,7 @@ anyhow = { workspace = true } thiserror = "2.0.11" toml = { workspace = true } ctrlc = "3.4.5" -tokio-util = { workspace = true } +tokio-util = { workspace = true, features = ["rt"] } futures = { workspace = true } chrono = { workspace = true } serial_test = "0.5.1" @@ -55,3 +55,4 @@ iroh = { workspace = true } rand_v8 = { workspace = true } rand_core_v6 = { workspace = true } dashmap = "6.1.0" +tokio-stream = { version = "0.1.17", features = ["net"] } diff --git a/crates/worker/src/cli/command.rs b/crates/worker/src/cli/command.rs index 7ed1b3a3..af550952 100644 --- a/crates/worker/src/cli/command.rs +++ b/crates/worker/src/cli/command.rs @@ -445,7 +445,7 @@ pub async fn execute_command( cancellation_token.clone(), gpu, system_memory, - task_bridge.socket_path.clone(), + task_bridge.get_socket_path().to_string(), docker_storage_path, node_wallet_instance .wallet @@ -458,11 +458,10 @@ pub async fn execute_command( let bridge_cancellation_token = cancellation_token.clone(); tokio::spawn(async move { - let bridge_clone = task_bridge.clone(); tokio::select! { _ = bridge_cancellation_token.cancelled() => { } - _ = bridge_clone.run() => { + _ = task_bridge.run() => { } } }); diff --git a/crates/worker/src/docker/taskbridge/bridge.rs b/crates/worker/src/docker/taskbridge/bridge.rs index 29ea51e8..c0f87f59 100644 --- a/crates/worker/src/docker/taskbridge/bridge.rs +++ b/crates/worker/src/docker/taskbridge/bridge.rs @@ -2,7 +2,11 @@ use crate::docker::taskbridge::file_handler; use crate::docker::taskbridge::json_helper; use crate::metrics::store::MetricsStore; use crate::state::system_state::SystemState; +use anyhow::bail; use anyhow::Result; +use futures::future::BoxFuture; +use futures::FutureExt; +use futures::StreamExt as _; use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use shared::models::node::Node; @@ -21,13 +25,18 @@ const DEFAULT_MACOS_SOCKET: &str = "/tmp/com.prime.worker/"; const DEFAULT_LINUX_SOCKET: &str = "/tmp/com.prime.worker/"; pub struct TaskBridge { - pub socket_path: String, - pub metrics_store: Arc, - pub contracts: Option>, - pub node_config: Option, - pub node_wallet: Option, - pub docker_storage_path: String, - pub state: Arc, + socket_path: String, + config: TaskBridgeConfig, +} + +#[derive(Clone)] +struct TaskBridgeConfig { + metrics_store: Arc, + contracts: Option>, + node_config: Option, + node_wallet: Option, + docker_storage_path: String, + state: Arc, } #[derive(Deserialize, Serialize, Debug)] @@ -39,7 +48,7 @@ struct MetricInput { impl TaskBridge { #[allow(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( socket_path: Option<&str>, metrics_store: Arc, contracts: Option>, @@ -47,7 +56,7 @@ impl TaskBridge { node_wallet: Option, docker_storage_path: String, state: Arc, - ) -> Arc { + ) -> Self { let path = match socket_path { Some(path) => path.to_string(), None => { @@ -59,144 +68,30 @@ impl TaskBridge { } }; - Arc::new(Self { + Self { socket_path: path, - metrics_store, - contracts, - node_config, - node_wallet, - docker_storage_path, - state, - }) - } - - async fn handle_metric(self: Arc, input: &MetricInput) -> Result<()> { - debug!("Processing metric message"); - for (key, value) in input.metrics.iter() { - debug!("Metric - Key: {key}, Value: {value}"); - let _ = self - .metrics_store - .update_metric( - input.task_id.clone(), - key.to_string(), - value.as_f64().unwrap_or(0.0), - ) - .await; + config: TaskBridgeConfig { + metrics_store, + contracts, + node_config, + node_wallet, + docker_storage_path, + state, + }, } - Ok(()) } - fn handle_file_upload(self: Arc, json_str: &str) -> Result<()> { - debug!("Handling file upload"); - if let Ok(file_info) = serde_json::from_str::(json_str) { - let task_id = file_info["task_id"].as_str().unwrap_or("unknown"); - - // Handle file upload if save_path is present - if let Some(file_name) = file_info["output/save_path"].as_str() { - info!("Handling file upload for task_id: {task_id}, file: {file_name}"); - - let storage_path_inner = self.docker_storage_path.clone(); - let task_id_inner = task_id.to_string(); - let file_name_inner = file_name.to_string(); - let wallet_inner = self.node_wallet.as_ref().unwrap().clone(); - let state_inner = self.state.clone(); - - tokio::spawn(async move { - if let Err(e) = file_handler::handle_file_upload( - &storage_path_inner, - &task_id_inner, - &file_name_inner, - &wallet_inner, - &state_inner, - ) - .await - { - error!("Failed to handle file upload: {e}"); - } else { - info!("File upload handled successfully"); - } - }); - } - - // Handle file validation if sha256 is present - if let Some(file_sha) = file_info["output/sha256"].as_str() { - debug!("Processing file validation message"); - let output_flops: f64 = file_info["output/output_flops"].as_f64().unwrap_or(0.0); - let input_flops: f64 = file_info["output/input_flops"].as_f64().unwrap_or(0.0); - info!( - "Handling file validation for task_id: {task_id}, sha: {file_sha}, output_flops: {output_flops}, input_flops: {input_flops}" - ); - - if let (Some(contracts_ref), Some(node_ref)) = - (self.contracts.clone(), self.node_config.clone()) - { - let file_sha_inner = file_sha.to_string(); - let contracts_inner = contracts_ref.clone(); - let node_inner = node_ref.clone(); - let provider = match self.node_wallet.as_ref() { - Some(wallet) => wallet.provider(), - None => { - error!("No wallet provider found"); - return Err(anyhow::anyhow!("No wallet provider found")); - } - }; - - if output_flops <= 0.0 { - error!("Invalid work units calculation: output_flops ({output_flops}) must be greater than 0.0. Blocking file validation submission."); - return Err(anyhow::anyhow!( - "Invalid work units: output_flops must be greater than 0.0" - )); - } - let work_units = output_flops; - - tokio::spawn(async move { - if let Err(e) = file_handler::handle_file_validation( - &file_sha_inner, - &contracts_inner, - &node_inner, - &provider, - work_units, - ) - .await - { - error!("Failed to handle file validation: {e}"); - } - }); - } else { - error!("Missing contracts or node configuration for file validation"); - } - } - } else { - error!("Failed to parse JSON: {json_str}"); - } - Ok(()) + pub(crate) fn get_socket_path(&self) -> &str { + &self.socket_path } - async fn handle_message(self: Arc, json_str: &str) -> Result<()> { - debug!("Extracted JSON object: {json_str}"); - if json_str.contains("output/save_path") { - if let Err(e) = self.handle_file_upload(json_str) { - error!("Failed to handle file upload: {}", e); - } - } else { - debug!("Processing metric message"); - match serde_json::from_str::(json_str) { - Ok(input) => { - if let Err(e) = self.handle_metric(&input).await { - error!("Failed to handle metric: {e}"); - } - } - Err(e) => { - error!("Failed to parse metric input: {json_str} {e}"); - } - } - } + pub(crate) async fn run(self) -> Result<()> { + let Self { + socket_path, + config, + } = self; - Ok(()) - } - - pub async fn run(self: Arc) -> Result<()> { - let socket_path = Path::new(&self.socket_path); + let socket_path = Path::new(&socket_path); debug!("Setting up TaskBridge socket at: {}", socket_path.display()); if let Some(parent) = socket_path.parent() { @@ -244,91 +139,296 @@ impl TaskBridge { } } info!("TaskBridge socket created at: {}", socket_path.display()); + + let mut handle_stream_futures = tokio::task::JoinSet::new(); + let mut listener_stream = tokio_stream::wrappers::UnixListenerStream::new(listener); + let (file_validation_futures_tx, mut file_validation_futures_rx) = + tokio::sync::mpsc::channel::<(String, BoxFuture>)>(100); + let mut file_validation_futures_map = tokio_util::task::JoinMap::new(); + loop { - let bridge = self.clone(); - match listener.accept().await { - Ok((stream, _addr)) => { - tokio::spawn(async move { - debug!("Received connection from {_addr:?}"); - let mut reader = BufReader::new(stream); - let mut buffer = vec![0; 1024]; - let mut data = Vec::new(); - - loop { - let n = match reader.read(&mut buffer).await { - Ok(0) => { - debug!("Connection closed by client"); - 0 - } - Ok(n) => { - debug!("Read {n} bytes from socket"); - n - } - Err(e) => { - error!("Error reading from stream: {e}"); - break; - } - }; - - data.extend_from_slice(&buffer[..n]); - debug!("Current data buffer size: {} bytes", data.len()); - - if let Ok(data_str) = std::str::from_utf8(&data) { - debug!("Raw data received: {data_str}"); - } else { - debug!("Raw data received (non-UTF8): {} bytes", data.len()); - } - - let mut current_pos = 0; - while current_pos < data.len() { - // Try to find a complete JSON object - if let Some((json_str, byte_length)) = - json_helper::extract_next_json(&data[current_pos..]) - { - let json_str = json_str.to_string(); - let bridge_clone = bridge.clone(); - if let Err(e) = bridge_clone.handle_message(&json_str).await { - error!("Error handling message: {e}"); - } - - current_pos += byte_length; - debug!( - "Advanced position to {current_pos} after processing JSON" - ); - } else { - debug!("No complete JSON object found, waiting for more data"); - break; - } - } - - data = data.split_off(current_pos); - debug!( - "Remaining data buffer size after processing: {} bytes", - data.len() - ); - if n == 0 { - if data.is_empty() { - // No data left to process, we can break - break; - } else { - // We have data but couldn't parse it as complete JSON objects - // and the connection is closed - log and discard - if let Ok(unparsed) = std::str::from_utf8(&data) { - warn!("Discarding unparseable data after connection close: {unparsed}"); - } else { - warn!("Discarding unparseable binary data after connection close ({} bytes)", data.len()); - } - // Break out of the loop - break; - } - } + // let bridge = self.clone(); + tokio::select! { + Some(res) = listener_stream.next() => { + match res { + Ok(stream) => { + let handle_future = handle_stream(config.clone(), stream, file_validation_futures_tx.clone()).fuse(); + handle_stream_futures.spawn(handle_future); + } + Err(e) => { + error!("Accept failed on Unix socket: {e}"); + } + } + } + Some(task) = handle_stream_futures.join_next() => { + match task { + Ok(Ok(())) => { + debug!("Stream handled successfully"); + } + Ok(Err(e)) => { + error!("Error handling stream: {e}"); } - }); + Err(e) => { + error!("Task join error: {e}"); + } + } + } + Some((hash, fut)) = file_validation_futures_rx.recv() => { + if file_validation_futures_map.contains_key(&hash) { + debug!("duplicate file validation task for hash: {hash}, skipping"); + continue; + } + file_validation_futures_map.spawn(hash, fut); + } + Some((hash, task)) = file_validation_futures_map.join_next() => { + match task { + Ok(Ok(())) => { + debug!("File validation task for hash {hash} completed successfully"); + } + Ok(Err(e)) => { + error!("File validation task for hash {hash} failed: {e}"); + } + Err(e) => { + error!("Error joining file validation task for hash {hash}: {e}"); + } + } + } + } + } + // match listener.accept().await { + // Ok((stream, addr)) => handle_stream(stream, addr) + // .await + // .unwrap_or_else(|e| error!("Error handling stream: {e}")), + // Err(e) => error!("Accept failed on Unix socket: {e}"), + // } + } +} + +async fn handle_stream( + config: TaskBridgeConfig, + stream: tokio::net::UnixStream, + file_validation_futures_tx: tokio::sync::mpsc::Sender<( + String, + BoxFuture<'_, anyhow::Result<()>>, + )>, +) -> Result<()> { + let addr = stream.peer_addr()?; + debug!("Received connection from {addr:?}"); + let mut reader = BufReader::new(stream); + let mut buffer = vec![0; 1024]; + let mut data = Vec::new(); + + loop { + let n = match reader.read(&mut buffer).await { + Ok(0) => { + debug!("Connection closed by client"); + 0 + } + Ok(n) => { + debug!("Read {n} bytes from socket"); + n + } + Err(e) => { + bail!("Error reading from stream: {e}"); + } + }; + + data.extend_from_slice(&buffer[..n]); + debug!("Current data buffer size: {} bytes", data.len()); + + if let Ok(data_str) = std::str::from_utf8(&data) { + debug!("Raw data received: {data_str}"); + } else { + debug!("Raw data received (non-UTF8): {} bytes", data.len()); + } + + let mut current_pos = 0; + while current_pos < data.len() { + // Try to find a complete JSON object + if let Some((json_str, byte_length)) = + json_helper::extract_next_json(&data[current_pos..]) + { + let json_str = json_str.to_string(); + if let Err(e) = handle_message( + config.clone(), + &json_str, + file_validation_futures_tx.clone(), + ) + .await + { + error!("Error handling message: {e}"); } - Err(e) => error!("Accept failed on Unix socket: {e}"), + + current_pos += byte_length; + debug!("Advanced position to {current_pos} after processing JSON"); + } else { + debug!("No complete JSON object found, waiting for more data"); + break; } } + + data = data.split_off(current_pos); + debug!( + "Remaining data buffer size after processing: {} bytes", + data.len() + ); + if n == 0 { + if data.is_empty() { + // No data left to process, we can break + break; + } else { + // We have data but couldn't parse it as complete JSON objects + // and the connection is closed - log and discard + if let Ok(unparsed) = std::str::from_utf8(&data) { + warn!("Discarding unparseable data after connection close: {unparsed}"); + } else { + warn!( + "Discarding unparseable binary data after connection close ({} bytes)", + data.len() + ); + } + // Break out of the loop + break; + } + } + } + Ok(()) +} + +async fn handle_metric(config: TaskBridgeConfig, input: &MetricInput) -> Result<()> { + debug!("Processing metric message"); + for (key, value) in input.metrics.iter() { + debug!("Metric - Key: {key}, Value: {value}"); + let _ = config + .metrics_store + .update_metric( + input.task_id.clone(), + key.to_string(), + value.as_f64().unwrap_or(0.0), + ) + .await; } + Ok(()) +} + +async fn handle_file_upload( + config: TaskBridgeConfig, + json_str: &str, + file_validation_futures_tx: tokio::sync::mpsc::Sender<( + String, + BoxFuture<'_, anyhow::Result<()>>, + )>, +) -> Result<()> { + debug!("Handling file upload"); + if let Ok(file_info) = serde_json::from_str::(json_str) { + let task_id = file_info["task_id"].as_str().unwrap_or("unknown"); + + // Handle file upload if save_path is present + if let Some(file_name) = file_info["output/save_path"].as_str() { + info!("Handling file upload for task_id: {task_id}, file: {file_name}"); + + let storage_path_inner = config.docker_storage_path.clone(); + let task_id_inner = task_id.to_string(); + let file_name_inner = file_name.to_string(); + let wallet_inner = config.node_wallet.as_ref().unwrap().clone(); + let state_inner = config.state.clone(); + + tokio::spawn(async move { + if let Err(e) = file_handler::handle_file_upload( + &storage_path_inner, + &task_id_inner, + &file_name_inner, + &wallet_inner, + &state_inner, + ) + .await + { + error!("Failed to handle file upload: {e}"); + } else { + info!("File upload handled successfully"); + } + }); + } + + // Handle file validation if sha256 is present + if let Some(file_sha) = file_info["output/sha256"].as_str() { + debug!("Processing file validation message"); + let output_flops: f64 = file_info["output/output_flops"].as_f64().unwrap_or(0.0); + let input_flops: f64 = file_info["output/input_flops"].as_f64().unwrap_or(0.0); + + info!( + "Handling file validation for task_id: {task_id}, sha: {file_sha}, output_flops: {output_flops}, input_flops: {input_flops}" + ); + + if let (Some(contracts), Some(node)) = + (config.contracts.clone(), config.node_config.clone()) + { + let provider = match config.node_wallet.as_ref() { + Some(wallet) => wallet.provider(), + None => { + error!("No wallet provider found"); + return Err(anyhow::anyhow!("No wallet provider found")); + } + }; + + if output_flops <= 0.0 { + error!("Invalid work units calculation: output_flops ({output_flops}) must be greater than 0.0. Blocking file validation submission."); + return Err(anyhow::anyhow!( + "Invalid work units: output_flops must be greater than 0.0" + )); + } + let work_units = output_flops; + + let _ = file_validation_futures_tx + .send(( + file_sha.to_string(), + Box::pin(file_handler::handle_file_validation( + file_sha.to_string(), + contracts.clone(), + node.clone(), + provider, + work_units, + )), + )) + .await; + } else { + error!("Missing contracts or node configuration for file validation"); + } + } + } else { + error!("Failed to parse JSON: {json_str}"); + } + Ok(()) +} + +async fn handle_message( + config: TaskBridgeConfig, + json_str: &str, + file_validation_futures_tx: tokio::sync::mpsc::Sender<( + String, + BoxFuture<'_, anyhow::Result<()>>, + )>, +) -> Result<()> { + debug!("Extracted JSON object: {json_str}"); + if json_str.contains("output/save_path") { + if let Err(e) = handle_file_upload(config, json_str, file_validation_futures_tx).await { + error!("Failed to handle file upload: {}", e); + } + } else { + debug!("Processing metric message"); + match serde_json::from_str::(json_str) { + Ok(input) => { + if let Err(e) = handle_metric(config, &input).await { + error!("Failed to handle metric: {e}"); + } + } + Err(e) => { + error!("Failed to parse metric input: {json_str} {e}"); + } + } + } + + Ok(()) } #[cfg(test)] diff --git a/crates/worker/src/docker/taskbridge/file_handler.rs b/crates/worker/src/docker/taskbridge/file_handler.rs index 857c04ae..50ec255e 100644 --- a/crates/worker/src/docker/taskbridge/file_handler.rs +++ b/crates/worker/src/docker/taskbridge/file_handler.rs @@ -313,10 +313,10 @@ pub async fn handle_file_upload( /// Handles a file validation request pub async fn handle_file_validation( - file_sha: &str, - contracts: &Contracts, - node: &Node, - provider: &WalletProvider, + file_sha: String, + contracts: Contracts, + node: Node, + provider: WalletProvider, work_units: f64, ) -> Result<()> { info!("📄 Received file SHA for validation: {file_sha}"); From a17c4ebbd159dfc1631c2277609205217eca67fd Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Jun 2025 15:51:42 -0400 Subject: [PATCH 2/8] put file upload futures into select --- crates/worker/src/cli/command.rs | 2 +- crates/worker/src/docker/taskbridge/bridge.rs | 77 +++++++++++-------- .../src/docker/taskbridge/file_handler.rs | 14 ++-- 3 files changed, 54 insertions(+), 39 deletions(-) diff --git a/crates/worker/src/cli/command.rs b/crates/worker/src/cli/command.rs index af550952..47fdc0dc 100644 --- a/crates/worker/src/cli/command.rs +++ b/crates/worker/src/cli/command.rs @@ -688,7 +688,7 @@ pub async fn execute_command( }; if let Err(e) = p2p_service.start() { - error!("❌ Failed to start P2P listener: {}", e); + error!("❌ Failed to start P2P listener: {e}"); std::process::exit(1); } diff --git a/crates/worker/src/docker/taskbridge/bridge.rs b/crates/worker/src/docker/taskbridge/bridge.rs index c0f87f59..ffca00ec 100644 --- a/crates/worker/src/docker/taskbridge/bridge.rs +++ b/crates/worker/src/docker/taskbridge/bridge.rs @@ -145,14 +145,16 @@ impl TaskBridge { let (file_validation_futures_tx, mut file_validation_futures_rx) = tokio::sync::mpsc::channel::<(String, BoxFuture>)>(100); let mut file_validation_futures_map = tokio_util::task::JoinMap::new(); + let (file_upload_futures_tx, mut file_upload_futures_rx) = + tokio::sync::mpsc::channel::>>(100); + let mut file_upload_futures_set = tokio::task::JoinSet::new(); loop { - // let bridge = self.clone(); tokio::select! { Some(res) = listener_stream.next() => { match res { Ok(stream) => { - let handle_future = handle_stream(config.clone(), stream, file_validation_futures_tx.clone()).fuse(); + let handle_future = handle_stream(config.clone(), stream, file_upload_futures_tx.clone(), file_validation_futures_tx.clone()).fuse(); handle_stream_futures.spawn(handle_future); } Err(e) => { @@ -193,20 +195,31 @@ impl TaskBridge { } } } + Some(fut) = file_upload_futures_rx.recv() => { + file_upload_futures_set.spawn(fut); + } + Some(task) = file_upload_futures_set.join_next() => { + match task { + Ok(Ok(())) => { + debug!("File upload task completed successfully"); + } + Ok(Err(e)) => { + error!("File upload task failed: {e}"); + } + Err(e) => { + error!("Error joining file upload task: {e}"); + } + } + } } } - // match listener.accept().await { - // Ok((stream, addr)) => handle_stream(stream, addr) - // .await - // .unwrap_or_else(|e| error!("Error handling stream: {e}")), - // Err(e) => error!("Accept failed on Unix socket: {e}"), - // } } } async fn handle_stream( config: TaskBridgeConfig, stream: tokio::net::UnixStream, + file_upload_futures_tx: tokio::sync::mpsc::Sender>>, file_validation_futures_tx: tokio::sync::mpsc::Sender<( String, BoxFuture<'_, anyhow::Result<()>>, @@ -252,6 +265,7 @@ async fn handle_stream( if let Err(e) = handle_message( config.clone(), &json_str, + file_upload_futures_tx.clone(), file_validation_futures_tx.clone(), ) .await @@ -314,6 +328,7 @@ async fn handle_metric(config: TaskBridgeConfig, input: &MetricInput) -> Result< async fn handle_file_upload( config: TaskBridgeConfig, json_str: &str, + file_upload_futures_tx: tokio::sync::mpsc::Sender>>, file_validation_futures_tx: tokio::sync::mpsc::Sender<( String, BoxFuture<'_, anyhow::Result<()>>, @@ -327,27 +342,19 @@ async fn handle_file_upload( if let Some(file_name) = file_info["output/save_path"].as_str() { info!("Handling file upload for task_id: {task_id}, file: {file_name}"); - let storage_path_inner = config.docker_storage_path.clone(); - let task_id_inner = task_id.to_string(); - let file_name_inner = file_name.to_string(); - let wallet_inner = config.node_wallet.as_ref().unwrap().clone(); - let state_inner = config.state.clone(); - - tokio::spawn(async move { - if let Err(e) = file_handler::handle_file_upload( - &storage_path_inner, - &task_id_inner, - &file_name_inner, - &wallet_inner, - &state_inner, - ) - .await - { - error!("Failed to handle file upload: {e}"); - } else { - info!("File upload handled successfully"); - } - }); + let Some(wallet) = config.node_wallet.as_ref() else { + bail!("no wallet found; must be set to upload files"); + }; + + let _ = file_upload_futures_tx + .send(Box::pin(file_handler::handle_file_upload( + config.docker_storage_path.clone(), + task_id.to_string(), + file_name.to_string(), + wallet.clone(), + config.state.clone(), + ))) + .await; } // Handle file validation if sha256 is present @@ -404,6 +411,7 @@ async fn handle_file_upload( async fn handle_message( config: TaskBridgeConfig, json_str: &str, + file_upload_futures_tx: tokio::sync::mpsc::Sender>>, file_validation_futures_tx: tokio::sync::mpsc::Sender<( String, BoxFuture<'_, anyhow::Result<()>>, @@ -411,8 +419,15 @@ async fn handle_message( ) -> Result<()> { debug!("Extracted JSON object: {json_str}"); if json_str.contains("output/save_path") { - if let Err(e) = handle_file_upload(config, json_str, file_validation_futures_tx).await { - error!("Failed to handle file upload: {}", e); + if let Err(e) = handle_file_upload( + config, + json_str, + file_upload_futures_tx, + file_validation_futures_tx, + ) + .await + { + error!("Failed to handle file upload: {e}"); } } else { debug!("Processing metric message"); diff --git a/crates/worker/src/docker/taskbridge/file_handler.rs b/crates/worker/src/docker/taskbridge/file_handler.rs index 50ec255e..82d856b6 100644 --- a/crates/worker/src/docker/taskbridge/file_handler.rs +++ b/crates/worker/src/docker/taskbridge/file_handler.rs @@ -17,11 +17,11 @@ use std::time::Duration; /// Handles a file upload request pub async fn handle_file_upload( - storage_path: &str, - task_id: &str, - file_name: &str, - wallet: &Wallet, - state: &Arc, + storage_path: String, + task_id: String, + file_name: String, + wallet: Wallet, + state: Arc, ) -> Result<()> { info!("📄 Received file upload request: {file_name}"); info!("Task ID: {task_id}, Storage path: {storage_path}"); @@ -44,7 +44,7 @@ pub async fn handle_file_upload( info!("Clean file name: {clean_file_name}"); let task_dir = format!("prime-task-{task_id}"); - let file_path = Path::new(storage_path) + let file_path = Path::new(&storage_path) .join(&task_dir) .join("data") .join(clean_file_name); @@ -122,7 +122,7 @@ pub async fn handle_file_upload( }; let signature = - match sign_request_with_nonce("/storage/request-upload", wallet, Some(&request_value)) + match sign_request_with_nonce("/storage/request-upload", &wallet, Some(&request_value)) .await { Ok(sig) => { From 33706d28c783ab0038fd34eb997480aebd701bd3 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Jun 2025 16:04:27 -0400 Subject: [PATCH 3/8] add todo --- crates/worker/src/docker/taskbridge/bridge.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/worker/src/docker/taskbridge/bridge.rs b/crates/worker/src/docker/taskbridge/bridge.rs index ffca00ec..391da910 100644 --- a/crates/worker/src/docker/taskbridge/bridge.rs +++ b/crates/worker/src/docker/taskbridge/bridge.rs @@ -32,6 +32,9 @@ pub struct TaskBridge { #[derive(Clone)] struct TaskBridgeConfig { metrics_store: Arc, + + // TODO: the optional values are only used for testing; refactor + // the tests such that these aren't optional contracts: Option>, node_config: Option, node_wallet: Option, From 33d33edb1cbae7ccd8ab9f0f64a985f078542d3e Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Jun 2025 16:15:44 -0400 Subject: [PATCH 4/8] update ci rustflag --- .github/workflows/checks.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 42ab51d3..f5e2564b 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -1,4 +1,7 @@ name: Check Format & Lint +env: + RUSTFLAGS: "--cfg tokio_unstable" + on: pull_request: branches: [ main, master, develop ] @@ -57,4 +60,4 @@ jobs: if: success() || failure() run: | redis-server --version - RUST_BACKTRACE=1 cargo test -- --nocapture \ No newline at end of file + RUST_BACKTRACE=1 cargo test -- --nocapture From 4a722f1be7ac09eb7114d2a9a947e42bb02e5a02 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Jun 2025 16:37:08 -0400 Subject: [PATCH 5/8] maybe fix workflow --- .github/workflows/checks.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index f5e2564b..ffe97066 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -1,7 +1,6 @@ name: Check Format & Lint env: RUSTFLAGS: "--cfg tokio_unstable" - on: pull_request: branches: [ main, master, develop ] @@ -21,7 +20,6 @@ env: LANG: C.UTF-8 LC_ALL: C.UTF-8 - jobs: check: name: Format & Lint From 4119d12a36e440445c875e1993c7df441d94655f Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Jun 2025 16:37:34 -0400 Subject: [PATCH 6/8] actually fix --- .github/workflows/checks.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index ffe97066..b9f3d588 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -1,6 +1,4 @@ name: Check Format & Lint -env: - RUSTFLAGS: "--cfg tokio_unstable" on: pull_request: branches: [ main, master, develop ] @@ -16,7 +14,7 @@ on: env: CARGO_TERM_COLOR: always CARGO_INCREMENTAL: 0 - RUSTFLAGS: "-C debuginfo=0" + RUSTFLAGS: "-C debuginfo=0 --cfg tokio_unstable" LANG: C.UTF-8 LC_ALL: C.UTF-8 From 301c050009d8a75a2e699d20531eb003d62f51fe Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 2 Jul 2025 14:19:36 -0400 Subject: [PATCH 7/8] use FuturesUnordered instead of JoinSet --- .cargo/config.toml | 2 - crates/worker/src/docker/taskbridge/bridge.rs | 38 +++++++++++-------- 2 files changed, 22 insertions(+), 18 deletions(-) delete mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index bff29e6e..00000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] diff --git a/crates/worker/src/docker/taskbridge/bridge.rs b/crates/worker/src/docker/taskbridge/bridge.rs index 391da910..c723f6c1 100644 --- a/crates/worker/src/docker/taskbridge/bridge.rs +++ b/crates/worker/src/docker/taskbridge/bridge.rs @@ -5,6 +5,7 @@ use crate::state::system_state::SystemState; use anyhow::bail; use anyhow::Result; use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt as _; use log::{debug, error, info, warn}; @@ -13,6 +14,7 @@ use shared::models::node::Node; use shared::web3::contracts::core::builder::Contracts; use shared::web3::wallet::Wallet; use shared::web3::wallet::WalletProvider; +use std::collections::HashSet; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; use std::sync::Arc; @@ -143,14 +145,15 @@ impl TaskBridge { } info!("TaskBridge socket created at: {}", socket_path.display()); - let mut handle_stream_futures = tokio::task::JoinSet::new(); + let mut handle_stream_futures = FuturesUnordered::new(); let mut listener_stream = tokio_stream::wrappers::UnixListenerStream::new(listener); let (file_validation_futures_tx, mut file_validation_futures_rx) = tokio::sync::mpsc::channel::<(String, BoxFuture>)>(100); - let mut file_validation_futures_map = tokio_util::task::JoinMap::new(); + let mut file_validation_futures_set = HashSet::new(); + let mut file_validation_futures = FuturesUnordered::new(); let (file_upload_futures_tx, mut file_upload_futures_rx) = tokio::sync::mpsc::channel::>>(100); - let mut file_upload_futures_set = tokio::task::JoinSet::new(); + let mut file_upload_futures_set = FuturesUnordered::new(); loop { tokio::select! { @@ -158,35 +161,38 @@ impl TaskBridge { match res { Ok(stream) => { let handle_future = handle_stream(config.clone(), stream, file_upload_futures_tx.clone(), file_validation_futures_tx.clone()).fuse(); - handle_stream_futures.spawn(handle_future); + handle_stream_futures.push(tokio::task::spawn(handle_future)); } Err(e) => { error!("Accept failed on Unix socket: {e}"); } } } - Some(task) = handle_stream_futures.join_next() => { - match task { + Some(res) = handle_stream_futures.next() => { + match res { Ok(Ok(())) => { - debug!("Stream handled successfully"); + debug!("Stream handler completed successfully"); } Ok(Err(e)) => { - error!("Error handling stream: {e}"); + error!("Stream handler failed: {e}"); } Err(e) => { - error!("Task join error: {e}"); + error!("Error joining stream handler task: {e}"); } } } Some((hash, fut)) = file_validation_futures_rx.recv() => { - if file_validation_futures_map.contains_key(&hash) { + if file_validation_futures_set.contains(&hash) { debug!("duplicate file validation task for hash: {hash}, skipping"); continue; } - file_validation_futures_map.spawn(hash, fut); + // we never remove hashes from this set, as we should never + // submit the same file for validation twice. + file_validation_futures_set.insert(hash.clone()); + file_validation_futures.push(async move {(hash, tokio::task::spawn(fut).await)}); } - Some((hash, task)) = file_validation_futures_map.join_next() => { - match task { + Some((hash, res)) = file_validation_futures.next() => { + match res { Ok(Ok(())) => { debug!("File validation task for hash {hash} completed successfully"); } @@ -199,10 +205,10 @@ impl TaskBridge { } } Some(fut) = file_upload_futures_rx.recv() => { - file_upload_futures_set.spawn(fut); + file_upload_futures_set.push(tokio::task::spawn(fut)); } - Some(task) = file_upload_futures_set.join_next() => { - match task { + Some(res) = file_upload_futures_set.next() => { + match res { Ok(Ok(())) => { debug!("File upload task completed successfully"); } From 88feea4d18970366ac8846941836c791166ffbab Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 3 Jul 2025 13:12:20 -0400 Subject: [PATCH 8/8] remove unstable tokio flag --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index b9f3d588..f81163f4 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -14,7 +14,7 @@ on: env: CARGO_TERM_COLOR: always CARGO_INCREMENTAL: 0 - RUSTFLAGS: "-C debuginfo=0 --cfg tokio_unstable" + RUSTFLAGS: "-C debuginfo=0" LANG: C.UTF-8 LC_ALL: C.UTF-8