Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,22 @@ edition = "2021"
[workspace.features]
default = []
testnet = []

[workspace.lints.clippy]
# Performance
redundant_clone = "warn"
inefficient_to_string = "warn"
unnecessary_to_owned = "warn"
needless_pass_by_value = "warn"
assigning_clones = "warn"

# Patterns
manual_let_else = "warn"
map_unwrap_or = "warn"
single_match_else = "warn"
needless_borrow = "warn"
unused_async = "warn"

# Error handling
unwrap_used = "warn"
expect_used = "warn"
3 changes: 3 additions & 0 deletions crates/discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ shared = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
url = { workspace = true }

[lints]
workspace = true
60 changes: 28 additions & 32 deletions crates/discovery/src/api/routes/get_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,24 @@ pub async fn get_nodes(data: Data<AppState>) -> HttpResponse {
}

fn filter_nodes_for_pool(nodes: Vec<DiscoveryNode>, pool_id: u32) -> Vec<DiscoveryNode> {
let nodes_for_pool: Vec<DiscoveryNode> = nodes
.iter()
.filter(|node| node.compute_pool_id == pool_id)
.cloned()
.collect();

// Filter out nodes with IPs that are currently active in another pool
let filtered: Vec<DiscoveryNode> = nodes_for_pool
.iter()
.filter(|node| {
// Check if there's any other node with the same IP address in a different pool that is active
!nodes.iter().any(|other| {
other.ip_address == node.ip_address
&& other.compute_pool_id != node.compute_pool_id
&& other.is_active
})
})
.cloned()
.collect();
filtered
// First pass: collect nodes for the target pool
let mut nodes_for_pool = Vec::new();
let mut other_active_ips = std::collections::HashSet::new();

// Single pass through all nodes to collect both target pool nodes and active IPs from other pools
for node in nodes {
if node.compute_pool_id == pool_id {
nodes_for_pool.push(node);
} else if node.is_active {
other_active_ips.insert(node.ip_address.clone());
}
}

// Filter out nodes whose IPs are active in other pools
nodes_for_pool
.into_iter()
.filter(|node| !other_active_ips.contains(&node.ip_address))
.collect()
}

pub async fn get_nodes_for_pool(
Expand All @@ -51,14 +49,14 @@ pub async fn get_nodes_for_pool(
let nodes = data.node_store.get_nodes().await;
match nodes {
Ok(nodes) => {
let id_clone = pool_id.clone();
let pool_contract_id: U256 = match id_clone.parse::<U256>() {
let pool_contract_id: U256 = match pool_id.parse::<U256>() {
Ok(id) => id,
Err(_) => {
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid pool ID format"));
}
};

Comment on lines +52 to +59
Copy link

Copilot AI Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] You parse pool_id twice (once to U256 and again to u32). Consider parsing once to u32 and converting to U256 using U256::from(parsed_id) to avoid duplication and improve readability.

Suggested change
let pool_contract_id: U256 = match pool_id.parse::<U256>() {
Ok(id) => id,
Err(_) => {
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid pool ID format"));
}
};

Copilot uses AI. Check for mistakes.
let pool_id: u32 = match pool_id.parse() {
Copy link

Copilot AI Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Shadowing the pool_id variable (string → u32) may be confusing. Consider using a distinct name like parsed_pool_id for clarity.

Suggested change
let pool_id: u32 = match pool_id.parse() {
let parsed_pool_id: u32 = match pool_id.parse() {

Copilot uses AI. Check for mistakes.
Ok(id) => id,
Err(_) => {
Expand All @@ -69,19 +67,17 @@ pub async fn get_nodes_for_pool(

match data.contracts.clone() {
Some(contracts) => {
let pool_info =
match contracts.compute_pool.get_pool_info(pool_contract_id).await {
Ok(info) => info,
Err(_) => {
return HttpResponse::NotFound()
.json(ApiResponse::new(false, "Pool not found"));
}
};
let Ok(pool_info) =
contracts.compute_pool.get_pool_info(pool_contract_id).await
else {
return HttpResponse::NotFound()
.json(ApiResponse::new(false, "Pool not found"));
};
let owner = pool_info.creator;
let manager = pool_info.compute_manager_key;
let address_str = match req.headers().get("x-address") {
Some(address) => match address.to_str() {
Ok(addr) => addr.to_string(),
Ok(addr) => addr,
Err(_) => {
return HttpResponse::BadRequest().json(ApiResponse::new(
false,
Expand Down Expand Up @@ -424,7 +420,7 @@ mod tests {
assert!(filtered_nodes.iter().any(|n| n.id == "0x2222"));

// The pool 1 node should be included in pool 1 results
let filtered_nodes = filter_nodes_for_pool(nodes.clone(), 1);
let filtered_nodes = filter_nodes_for_pool(nodes, 1);
assert_eq!(filtered_nodes.len(), 1);
assert!(filtered_nodes.iter().any(|n| n.id == "0x1111"));
}
Expand Down
86 changes: 32 additions & 54 deletions crates/discovery/src/api/routes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn register_node(
// Check for the x-address header
let address_str = match req.headers().get("x-address") {
Some(address) => match address.to_str() {
Ok(addr) => addr.to_string(),
Ok(addr) => addr,
Err(_) => {
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid x-address header"))
Expand All @@ -33,32 +33,34 @@ pub async fn register_node(
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid x-address header"));
}

let update_node = node.clone();
let existing_node = data.node_store.get_node(update_node.id.clone()).await;
let node = node.into_inner();
let existing_node = data.node_store.get_node(&node.id).await;
if let Ok(Some(existing_node)) = existing_node {
// Node already exists - check if it's active in a pool
if existing_node.is_active {
if existing_node.node == update_node {
log::info!("Node {} is already active in a pool", update_node.id);
if existing_node.node == node {
log::info!("Node {} is already active in a pool", node.id);
return HttpResponse::Ok()
.json(ApiResponse::new(true, "Node registered successfully"));
}
// Temp. adjustment: The gpu object has changed and includes a vec of indices now.
// This now causes the discovery svc to reject nodes that have just updated their software.
// This is a temporary fix to ensure the node is accepted even though the indices are different.
let mut existing_clone = existing_node.node.clone();
existing_clone.worker_p2p_id = update_node.worker_p2p_id.clone();
existing_clone.worker_p2p_addresses = update_node.worker_p2p_addresses.clone();
match &update_node.compute_specs {
let mut existing_clone = existing_node.node;
existing_clone.worker_p2p_id.clone_from(&node.worker_p2p_id);
existing_clone
.worker_p2p_addresses
.clone_from(&node.worker_p2p_addresses);
match &node.compute_specs {
Some(compute_specs) => {
if let Some(ref mut existing_compute_specs) = existing_clone.compute_specs {
match &compute_specs.gpu {
Some(gpu_specs) => {
existing_compute_specs.gpu = Some(gpu_specs.clone());
existing_compute_specs.storage_gb = compute_specs.storage_gb;
existing_compute_specs.storage_path =
compute_specs.storage_path.clone();
existing_compute_specs
.storage_path
.clone_from(&compute_specs.storage_path);
}
None => {
existing_compute_specs.gpu = None;
Expand All @@ -71,15 +73,15 @@ pub async fn register_node(
}
}

if existing_clone == update_node {
log::info!("Node {} is already active in a pool", update_node.id);
if existing_clone == node {
log::info!("Node {} is already active in a pool", node.id);
return HttpResponse::Ok()
.json(ApiResponse::new(true, "Node registered successfully"));
}

warn!(
"Node {} tried to change discovery but is already active in a pool",
update_node.id
node.id
);
// Node is currently active in pool - cannot be updated
// Did the user actually change node information?
Expand All @@ -92,55 +94,45 @@ pub async fn register_node(

let active_nodes_count = data
.node_store
.count_active_nodes_by_ip(update_node.ip_address.clone())
.count_active_nodes_by_ip(&node.ip_address)
.await;

if let Ok(count) = active_nodes_count {
let existing_node_by_ip = data
.node_store
.get_active_node_by_ip(update_node.ip_address.clone())
.get_active_node_by_ip(&node.ip_address)
.await;

let is_existing_node = existing_node_by_ip
.map(|result| {
result
.map(|node| node.id == update_node.id)
.unwrap_or(false)
})
.map(|result| result.is_some_and(|n| n.id == node.id))
.unwrap_or(false);

let effective_count = if is_existing_node { count - 1 } else { count };

if effective_count >= data.max_nodes_per_ip {
warn!(
"Node {} registration would exceed IP limit. Current active nodes on IP {}: {}, max allowed: {}",
update_node.id, update_node.ip_address, count, data.max_nodes_per_ip
node.id, node.ip_address, count, data.max_nodes_per_ip
);
return HttpResponse::BadRequest().json(ApiResponse::new(
false,
&format!(
"IP address {} already has {} active nodes (max allowed: {})",
update_node.ip_address, count, data.max_nodes_per_ip
node.ip_address, count, data.max_nodes_per_ip
),
));
}
}

if let Some(contracts) = data.contracts.clone() {
let provider_address = match node.provider_address.parse() {
Ok(addr) => addr,
Err(_) => {
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid provider address format"));
}
if let Some(contracts) = &data.contracts {
let Ok(provider_address) = node.provider_address.parse() else {
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid provider address format"));
};

let node_id = match node.id.parse() {
Ok(id) => id,
Err(_) => {
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid node ID format"));
}
let Ok(node_id) = node.id.parse() else {
return HttpResponse::BadRequest()
.json(ApiResponse::new(false, "Invalid node ID format"));
};

if contracts
Expand Down Expand Up @@ -202,9 +194,7 @@ pub async fn register_node(
}
}

let node_store = data.node_store.clone();

match node_store.register_node(node.into_inner()).await {
match data.node_store.register_node(node).await {
Ok(_) => HttpResponse::Ok().json(ApiResponse::new(true, "Node registered successfully")),
Err(_) => HttpResponse::InternalServerError()
.json(ApiResponse::new(false, "Internal server error")),
Expand Down Expand Up @@ -469,13 +459,7 @@ mod tests {
assert!(body.success);
assert_eq!(body.data, "Node registered successfully");

let nodes = app_state.node_store.get_nodes().await;
let nodes = match nodes {
Ok(nodes) => nodes,
Err(_) => {
panic!("Error getting nodes");
}
};
let nodes = app_state.node_store.get_nodes().await.unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].id, node.id);
assert_eq!(nodes[0].last_updated, None);
Expand Down Expand Up @@ -615,13 +599,7 @@ mod tests {
assert!(body.success);
assert_eq!(body.data, "Node registered successfully");

let nodes = app_state.node_store.get_nodes().await;
let nodes = match nodes {
Ok(nodes) => nodes,
Err(_) => {
panic!("Error getting nodes");
}
};
let nodes = app_state.node_store.get_nodes().await.unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].id, node.id);
}
Expand Down
31 changes: 14 additions & 17 deletions crates/discovery/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,24 @@ async fn health_check(app_state: web::Data<AppState>) -> HttpResponse {
if app_state.chain_sync_enabled {
let sync_status = {
let last_sync_guard = app_state.last_chain_sync.lock().await;
match *last_sync_guard {
Some(last_sync) => {
if let Ok(elapsed) = last_sync.elapsed() {
if elapsed > Duration::from_secs(60) {
warn!(
"Health check: Chain sync is delayed. Last sync was {} seconds ago",
elapsed.as_secs()
);
Some(elapsed)
} else {
None
}
if let Some(last_sync) = *last_sync_guard {
if let Ok(elapsed) = last_sync.elapsed() {
if elapsed > Duration::from_secs(60) {
warn!(
"Health check: Chain sync is delayed. Last sync was {} seconds ago",
elapsed.as_secs()
);
Some(elapsed)
} else {
warn!("Health check: Unable to determine elapsed time since last sync");
Some(Duration::from_secs(u64::MAX))
None
}
}
None => {
warn!("Health check: Chain sync has not occurred yet");
} else {
warn!("Health check: Unable to determine elapsed time since last sync");
Some(Duration::from_secs(u64::MAX))
}
} else {
warn!("Health check: Chain sync has not occurred yet");
Some(Duration::from_secs(u64::MAX))
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/discovery/src/chainsync/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl ChainSync {
}
}

pub async fn run(self) -> Result<(), Error> {
pub fn run(self) -> Result<(), Error> {
let ChainSync {
node_store,
cancel_token,
Expand Down
2 changes: 1 addition & 1 deletion crates/discovery/src/location_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl LocationService {
let client = Client::builder()
.timeout(Duration::from_secs(5))
.build()
.expect("Failed to build HTTP client");
.unwrap_or_else(|_| panic!("Failed to build HTTP client"));

Self {
client,
Expand Down
Loading
Loading