diff --git a/Cargo.toml b/Cargo.toml index bf2c6292..a26efec3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,3 +46,22 @@ edition = "2021" [workspace.features] default = [] testnet = [] + +[workspace.lints.clippy] +# Performance +redundant_clone = "warn" +inefficient_to_string = "warn" +unnecessary_to_owned = "warn" +needless_pass_by_value = "warn" +assigning_clones = "warn" + +# Patterns +manual_let_else = "warn" +map_unwrap_or = "warn" +single_match_else = "warn" +needless_borrow = "warn" +unused_async = "warn" + +# Error handling +unwrap_used = "warn" +expect_used = "warn" diff --git a/crates/discovery/Cargo.toml b/crates/discovery/Cargo.toml index ef8cef5b..a02d8975 100644 --- a/crates/discovery/Cargo.toml +++ b/crates/discovery/Cargo.toml @@ -20,3 +20,6 @@ shared = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } url = { workspace = true } + +[lints] +workspace = true \ No newline at end of file diff --git a/crates/discovery/src/api/routes/get_nodes.rs b/crates/discovery/src/api/routes/get_nodes.rs index 0ad768a6..ebad3070 100644 --- a/crates/discovery/src/api/routes/get_nodes.rs +++ b/crates/discovery/src/api/routes/get_nodes.rs @@ -21,26 +21,24 @@ pub async fn get_nodes(data: Data) -> HttpResponse { } fn filter_nodes_for_pool(nodes: Vec, pool_id: u32) -> Vec { - let nodes_for_pool: Vec = nodes - .iter() - .filter(|node| node.compute_pool_id == pool_id) - .cloned() - .collect(); - - // Filter out nodes with IPs that are currently active in another pool - let filtered: Vec = nodes_for_pool - .iter() - .filter(|node| { - // Check if there's any other node with the same IP address in a different pool that is active - !nodes.iter().any(|other| { - other.ip_address == node.ip_address - && other.compute_pool_id != node.compute_pool_id - && other.is_active - }) - }) - .cloned() - .collect(); - filtered + // First pass: collect nodes for the target pool + let mut nodes_for_pool = Vec::new(); + let mut other_active_ips = std::collections::HashSet::new(); + + // Single pass through all nodes to collect both target pool nodes and active IPs from other pools + for node in nodes { + if node.compute_pool_id == pool_id { + nodes_for_pool.push(node); + } else if node.is_active { + other_active_ips.insert(node.ip_address.clone()); + } + } + + // Filter out nodes whose IPs are active in other pools + nodes_for_pool + .into_iter() + .filter(|node| !other_active_ips.contains(&node.ip_address)) + .collect() } pub async fn get_nodes_for_pool( @@ -51,14 +49,14 @@ pub async fn get_nodes_for_pool( let nodes = data.node_store.get_nodes().await; match nodes { Ok(nodes) => { - let id_clone = pool_id.clone(); - let pool_contract_id: U256 = match id_clone.parse::() { + let pool_contract_id: U256 = match pool_id.parse::() { Ok(id) => id, Err(_) => { return HttpResponse::BadRequest() .json(ApiResponse::new(false, "Invalid pool ID format")); } }; + let pool_id: u32 = match pool_id.parse() { Ok(id) => id, Err(_) => { @@ -69,19 +67,17 @@ pub async fn get_nodes_for_pool( match data.contracts.clone() { Some(contracts) => { - let pool_info = - match contracts.compute_pool.get_pool_info(pool_contract_id).await { - Ok(info) => info, - Err(_) => { - return HttpResponse::NotFound() - .json(ApiResponse::new(false, "Pool not found")); - } - }; + let Ok(pool_info) = + contracts.compute_pool.get_pool_info(pool_contract_id).await + else { + return HttpResponse::NotFound() + .json(ApiResponse::new(false, "Pool not found")); + }; let owner = pool_info.creator; let manager = pool_info.compute_manager_key; let address_str = match req.headers().get("x-address") { Some(address) => match address.to_str() { - Ok(addr) => addr.to_string(), + Ok(addr) => addr, Err(_) => { return HttpResponse::BadRequest().json(ApiResponse::new( false, @@ -424,7 +420,7 @@ mod tests { assert!(filtered_nodes.iter().any(|n| n.id == "0x2222")); // The pool 1 node should be included in pool 1 results - let filtered_nodes = filter_nodes_for_pool(nodes.clone(), 1); + let filtered_nodes = filter_nodes_for_pool(nodes, 1); assert_eq!(filtered_nodes.len(), 1); assert!(filtered_nodes.iter().any(|n| n.id == "0x1111")); } diff --git a/crates/discovery/src/api/routes/node.rs b/crates/discovery/src/api/routes/node.rs index 80c46c12..96a1bb66 100644 --- a/crates/discovery/src/api/routes/node.rs +++ b/crates/discovery/src/api/routes/node.rs @@ -17,7 +17,7 @@ pub async fn register_node( // Check for the x-address header let address_str = match req.headers().get("x-address") { Some(address) => match address.to_str() { - Ok(addr) => addr.to_string(), + Ok(addr) => addr, Err(_) => { return HttpResponse::BadRequest() .json(ApiResponse::new(false, "Invalid x-address header")) @@ -33,32 +33,34 @@ pub async fn register_node( return HttpResponse::BadRequest() .json(ApiResponse::new(false, "Invalid x-address header")); } - - let update_node = node.clone(); - let existing_node = data.node_store.get_node(update_node.id.clone()).await; + let node = node.into_inner(); + let existing_node = data.node_store.get_node(&node.id).await; if let Ok(Some(existing_node)) = existing_node { // Node already exists - check if it's active in a pool if existing_node.is_active { - if existing_node.node == update_node { - log::info!("Node {} is already active in a pool", update_node.id); + if existing_node.node == node { + log::info!("Node {} is already active in a pool", node.id); return HttpResponse::Ok() .json(ApiResponse::new(true, "Node registered successfully")); } // Temp. adjustment: The gpu object has changed and includes a vec of indices now. // This now causes the discovery svc to reject nodes that have just updated their software. // This is a temporary fix to ensure the node is accepted even though the indices are different. - let mut existing_clone = existing_node.node.clone(); - existing_clone.worker_p2p_id = update_node.worker_p2p_id.clone(); - existing_clone.worker_p2p_addresses = update_node.worker_p2p_addresses.clone(); - match &update_node.compute_specs { + let mut existing_clone = existing_node.node; + existing_clone.worker_p2p_id.clone_from(&node.worker_p2p_id); + existing_clone + .worker_p2p_addresses + .clone_from(&node.worker_p2p_addresses); + match &node.compute_specs { Some(compute_specs) => { if let Some(ref mut existing_compute_specs) = existing_clone.compute_specs { match &compute_specs.gpu { Some(gpu_specs) => { existing_compute_specs.gpu = Some(gpu_specs.clone()); existing_compute_specs.storage_gb = compute_specs.storage_gb; - existing_compute_specs.storage_path = - compute_specs.storage_path.clone(); + existing_compute_specs + .storage_path + .clone_from(&compute_specs.storage_path); } None => { existing_compute_specs.gpu = None; @@ -71,15 +73,15 @@ pub async fn register_node( } } - if existing_clone == update_node { - log::info!("Node {} is already active in a pool", update_node.id); + if existing_clone == node { + log::info!("Node {} is already active in a pool", node.id); return HttpResponse::Ok() .json(ApiResponse::new(true, "Node registered successfully")); } warn!( "Node {} tried to change discovery but is already active in a pool", - update_node.id + node.id ); // Node is currently active in pool - cannot be updated // Did the user actually change node information? @@ -92,21 +94,17 @@ pub async fn register_node( let active_nodes_count = data .node_store - .count_active_nodes_by_ip(update_node.ip_address.clone()) + .count_active_nodes_by_ip(&node.ip_address) .await; if let Ok(count) = active_nodes_count { let existing_node_by_ip = data .node_store - .get_active_node_by_ip(update_node.ip_address.clone()) + .get_active_node_by_ip(&node.ip_address) .await; let is_existing_node = existing_node_by_ip - .map(|result| { - result - .map(|node| node.id == update_node.id) - .unwrap_or(false) - }) + .map(|result| result.is_some_and(|n| n.id == node.id)) .unwrap_or(false); let effective_count = if is_existing_node { count - 1 } else { count }; @@ -114,33 +112,27 @@ pub async fn register_node( if effective_count >= data.max_nodes_per_ip { warn!( "Node {} registration would exceed IP limit. Current active nodes on IP {}: {}, max allowed: {}", - update_node.id, update_node.ip_address, count, data.max_nodes_per_ip + node.id, node.ip_address, count, data.max_nodes_per_ip ); return HttpResponse::BadRequest().json(ApiResponse::new( false, &format!( "IP address {} already has {} active nodes (max allowed: {})", - update_node.ip_address, count, data.max_nodes_per_ip + node.ip_address, count, data.max_nodes_per_ip ), )); } } - if let Some(contracts) = data.contracts.clone() { - let provider_address = match node.provider_address.parse() { - Ok(addr) => addr, - Err(_) => { - return HttpResponse::BadRequest() - .json(ApiResponse::new(false, "Invalid provider address format")); - } + if let Some(contracts) = &data.contracts { + let Ok(provider_address) = node.provider_address.parse() else { + return HttpResponse::BadRequest() + .json(ApiResponse::new(false, "Invalid provider address format")); }; - let node_id = match node.id.parse() { - Ok(id) => id, - Err(_) => { - return HttpResponse::BadRequest() - .json(ApiResponse::new(false, "Invalid node ID format")); - } + let Ok(node_id) = node.id.parse() else { + return HttpResponse::BadRequest() + .json(ApiResponse::new(false, "Invalid node ID format")); }; if contracts @@ -202,9 +194,7 @@ pub async fn register_node( } } - let node_store = data.node_store.clone(); - - match node_store.register_node(node.into_inner()).await { + match data.node_store.register_node(node).await { Ok(_) => HttpResponse::Ok().json(ApiResponse::new(true, "Node registered successfully")), Err(_) => HttpResponse::InternalServerError() .json(ApiResponse::new(false, "Internal server error")), @@ -469,13 +459,7 @@ mod tests { assert!(body.success); assert_eq!(body.data, "Node registered successfully"); - let nodes = app_state.node_store.get_nodes().await; - let nodes = match nodes { - Ok(nodes) => nodes, - Err(_) => { - panic!("Error getting nodes"); - } - }; + let nodes = app_state.node_store.get_nodes().await.unwrap(); assert_eq!(nodes.len(), 1); assert_eq!(nodes[0].id, node.id); assert_eq!(nodes[0].last_updated, None); @@ -615,13 +599,7 @@ mod tests { assert!(body.success); assert_eq!(body.data, "Node registered successfully"); - let nodes = app_state.node_store.get_nodes().await; - let nodes = match nodes { - Ok(nodes) => nodes, - Err(_) => { - panic!("Error getting nodes"); - } - }; + let nodes = app_state.node_store.get_nodes().await.unwrap(); assert_eq!(nodes.len(), 1); assert_eq!(nodes[0].id, node.id); } diff --git a/crates/discovery/src/api/server.rs b/crates/discovery/src/api/server.rs index 6204c5c4..1c5e9506 100644 --- a/crates/discovery/src/api/server.rs +++ b/crates/discovery/src/api/server.rs @@ -33,27 +33,24 @@ async fn health_check(app_state: web::Data) -> HttpResponse { if app_state.chain_sync_enabled { let sync_status = { let last_sync_guard = app_state.last_chain_sync.lock().await; - match *last_sync_guard { - Some(last_sync) => { - if let Ok(elapsed) = last_sync.elapsed() { - if elapsed > Duration::from_secs(60) { - warn!( - "Health check: Chain sync is delayed. Last sync was {} seconds ago", - elapsed.as_secs() - ); - Some(elapsed) - } else { - None - } + if let Some(last_sync) = *last_sync_guard { + if let Ok(elapsed) = last_sync.elapsed() { + if elapsed > Duration::from_secs(60) { + warn!( + "Health check: Chain sync is delayed. Last sync was {} seconds ago", + elapsed.as_secs() + ); + Some(elapsed) } else { - warn!("Health check: Unable to determine elapsed time since last sync"); - Some(Duration::from_secs(u64::MAX)) + None } - } - None => { - warn!("Health check: Chain sync has not occurred yet"); + } else { + warn!("Health check: Unable to determine elapsed time since last sync"); Some(Duration::from_secs(u64::MAX)) } + } else { + warn!("Health check: Chain sync has not occurred yet"); + Some(Duration::from_secs(u64::MAX)) } }; diff --git a/crates/discovery/src/chainsync/sync.rs b/crates/discovery/src/chainsync/sync.rs index a90d4b28..55812824 100644 --- a/crates/discovery/src/chainsync/sync.rs +++ b/crates/discovery/src/chainsync/sync.rs @@ -121,7 +121,7 @@ impl ChainSync { } } - pub async fn run(self) -> Result<(), Error> { + pub fn run(self) -> Result<(), Error> { let ChainSync { node_store, cancel_token, diff --git a/crates/discovery/src/location_service.rs b/crates/discovery/src/location_service.rs index 51a9bd3d..1b7fda62 100644 --- a/crates/discovery/src/location_service.rs +++ b/crates/discovery/src/location_service.rs @@ -31,7 +31,7 @@ impl LocationService { let client = Client::builder() .timeout(Duration::from_secs(5)) .build() - .expect("Failed to build HTTP client"); + .unwrap_or_else(|_| panic!("Failed to build HTTP client")); Self { client, diff --git a/crates/discovery/src/main.rs b/crates/discovery/src/main.rs index 56fd9ee4..fb6a6bbd 100644 --- a/crates/discovery/src/main.rs +++ b/crates/discovery/src/main.rs @@ -88,11 +88,8 @@ async fn main() -> Result<()> { let redis_store = Arc::new(RedisStore::new(&args.redis_url)); let node_store = Arc::new(NodeStore::new(redis_store.as_ref().clone())); - let endpoint = match args.rpc_url.parse() { - Ok(url) => url, - Err(_) => { - return Err(anyhow::anyhow!("invalid RPC URL: {}", args.rpc_url)); - } + let Ok(endpoint) = args.rpc_url.parse() else { + return Err(anyhow::anyhow!("invalid RPC URL: {}", args.rpc_url)); }; let provider = RootProvider::new_http(endpoint); @@ -103,7 +100,7 @@ async fn main() -> Result<()> { .with_compute_pool() .with_stake_manager() .build() - .unwrap(); + .unwrap_or_else(|_| panic!("Failed to build contracts")); let cancellation_token = CancellationToken::new(); let last_chain_sync = Arc::new(Mutex::new(None::)); @@ -119,7 +116,7 @@ async fn main() -> Result<()> { contracts.clone(), last_chain_sync.clone(), ); - chain_sync.run().await?; + chain_sync.run()?; // Start location enrichment service if enabled if let Some(location_url) = args.location_service_url.clone() { diff --git a/crates/discovery/src/store/node_store.rs b/crates/discovery/src/store/node_store.rs index 0b6c271f..72db6cd2 100644 --- a/crates/discovery/src/store/node_store.rs +++ b/crates/discovery/src/store/node_store.rs @@ -20,7 +20,7 @@ impl NodeStore { .await } - pub async fn get_node(&self, address: String) -> Result, Error> { + pub async fn get_node(&self, address: &str) -> Result, Error> { let key = format!("node:{address}"); let mut con = self.get_connection().await?; let node: Option = con.get(&key).await?; @@ -31,7 +31,7 @@ impl NodeStore { Ok(node) } - pub async fn get_active_node_by_ip(&self, ip: String) -> Result, Error> { + pub async fn get_active_node_by_ip(&self, ip: &str) -> Result, Error> { let mut con = self.get_connection().await?; let node_ids: Vec = con.smembers("node:ids").await?; @@ -52,7 +52,7 @@ impl NodeStore { Ok(None) } - pub async fn count_active_nodes_by_ip(&self, ip: String) -> Result { + pub async fn count_active_nodes_by_ip(&self, ip: &str) -> Result { let mut con = self.get_connection().await?; let node_ids: Vec = con.smembers("node:ids").await?; @@ -61,15 +61,14 @@ impl NodeStore { } let node_keys: Vec = node_ids.iter().map(|id| format!("node:{id}")).collect(); + let serialized_nodes: Vec = + redis::pipe().get(&node_keys).query_async(&mut con).await?; let mut count = 0; - for key in node_keys { - let serialized_node: Option = con.get(&key).await?; - if let Some(serialized_node) = serialized_node { - let deserialized_node: DiscoveryNode = serde_json::from_str(&serialized_node)?; - if deserialized_node.ip_address == ip && deserialized_node.is_active { - count += 1; - } + for serialized_node in serialized_nodes { + let deserialized_node: DiscoveryNode = serde_json::from_str(&serialized_node)?; + if deserialized_node.ip_address == ip && deserialized_node.is_active { + count += 1; } } Ok(count) @@ -82,7 +81,7 @@ impl NodeStore { let mut con = self.get_connection().await?; if con.exists(&key).await? { - let existing_node = self.get_node(address.clone()).await?; + let existing_node = self.get_node(&address).await?; if let Some(existing_node) = existing_node { let updated_node = existing_node.with_updated_node(node); self.update_node(updated_node).await?; diff --git a/crates/orchestrator/src/store/core/context.rs b/crates/orchestrator/src/store/core/context.rs index 2d732eb0..98d57485 100644 --- a/crates/orchestrator/src/store/core/context.rs +++ b/crates/orchestrator/src/store/core/context.rs @@ -18,7 +18,7 @@ impl StoreContext { node_store: Arc::new(NodeStore::new(store.clone())), heartbeat_store: Arc::new(HeartbeatStore::new(store.clone())), task_store: Arc::new(TaskStore::new(store.clone())), - metrics_store: Arc::new(MetricsStore::new(store.clone())), + metrics_store: Arc::new(MetricsStore::new(store)), } } } diff --git a/crates/shared/src/p2p/client.rs b/crates/shared/src/p2p/client.rs index 33c7011e..6b4db1d4 100644 --- a/crates/shared/src/p2p/client.rs +++ b/crates/shared/src/p2p/client.rs @@ -151,19 +151,15 @@ impl P2PClient { message, } => { // Parse the signature from the server - let parsed_signature = - if let Ok(sig) = alloy::primitives::Signature::from_str(&signed_message) { - sig - } else { - return Err(anyhow::anyhow!("Failed to parse signature from server")); - }; + let Ok(parsed_signature) = alloy::primitives::Signature::from_str(&signed_message) + else { + return Err(anyhow::anyhow!("Failed to parse signature from server")); + }; // Recover address from the challenge message that the server signed - let recovered_address = if let Ok(addr) = + let Ok(recovered_address) = parsed_signature.recover_address_from_msg(&challenge_message) - { - addr - } else { + else { return Err(anyhow::anyhow!( "Failed to recover address from server signature" )); @@ -179,7 +175,9 @@ impl P2PClient { } debug!("Auth challenge received from node: {target_p2p_id}"); - let signature = sign_message(&message, &self.wallet).await.unwrap(); + let signature = sign_message(&message, &self.wallet) + .await + .map_err(|e| anyhow::anyhow!("Failed to sign message: {e}"))?; P2PRequest::new(P2PMessage::AuthSolution { signed_message: signature, }) diff --git a/crates/shared/src/security/auth_signature_middleware.rs b/crates/shared/src/security/auth_signature_middleware.rs index 8ea70c52..64a7cdca 100644 --- a/crates/shared/src/security/auth_signature_middleware.rs +++ b/crates/shared/src/security/auth_signature_middleware.rs @@ -315,7 +315,7 @@ where }))); } }; - let mut payload_data = payload_value.clone(); + let mut payload_data = payload_value; if let Some(obj) = payload_data.as_object_mut() { nonce = obj .get("nonce") @@ -388,24 +388,18 @@ where } }; - let recovered_address = match parsed_signature.recover_address_from_msg(msg) { - Ok(addr) => addr, - Err(_) => { - return Err(ErrorBadRequest(json!({ - "error": "Failed to recover address from message", - "code": "ADDRESS_RECOVERY_FAILED" - }))) - } + let Ok(recovered_address) = parsed_signature.recover_address_from_msg(msg) else { + return Err(ErrorBadRequest(json!({ + "error": "Failed to recover address from message", + "code": "ADDRESS_RECOVERY_FAILED" + }))); }; - let expected_address = match Address::from_str(&address) { - Ok(addr) => addr, - Err(_) => { - return Err(ErrorBadRequest(json!({ - "error": "Invalid address format", - "code": "INVALID_ADDRESS_FORMAT" - }))) - } + let Ok(expected_address) = Address::from_str(&address) else { + return Err(ErrorBadRequest(json!({ + "error": "Invalid address format", + "code": "INVALID_ADDRESS_FORMAT" + }))); }; if recovered_address != expected_address { diff --git a/crates/shared/src/web3/contracts/core/contract.rs b/crates/shared/src/web3/contracts/core/contract.rs index fc13f8f5..cd830650 100644 --- a/crates/shared/src/web3/contracts/core/contract.rs +++ b/crates/shared/src/web3/contracts/core/contract.rs @@ -53,8 +53,7 @@ impl Contract

{ eprintln!("Error parsing JSON, exiting."); std::process::exit(1); }); - let abi = - serde_json::from_value(abi_json.clone()).expect("Failed to parse ABI from artifact"); + let abi = serde_json::from_value(abi_json).expect("Failed to parse ABI from artifact"); ContractInstance::new(address, provider, Interface::new(abi)) }