diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 00000000..500e84b2 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,29 @@ +--- +name: Bug Report +about: Report a problem with the worker or general protocol +title: '[BUG] ' +labels: bug +assignees: '' +--- + +## Environment +- OS: +- Version: +- Hardware Specs: [CPU model, RAM, GPU] + +## Bug Description +A clear description of what the bug is. + +## Steps To Reproduce +1. Configure worker with '...' +2. Connect to '...' +3. See error + +## Expected Behavior +What you expected to happen. + +## Actual Behavior +What actually happened (include any error messages, logs, screenshots). + +## Additional Context +Add any other relevant information here. \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 00000000..59b6123b --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1,8 @@ +blank_issues_enabled: false +contact_links: + - name: Community Discord + url: https://discord.gg/primeintellect + about: For quick questions and community support, join our Discord + - name: Documentation + url: https://docs.primeintellect.ai + about: Check our documentation for setup guides and FAQs \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/general_question.md b/.github/ISSUE_TEMPLATE/general_question.md new file mode 100644 index 00000000..7238c632 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/general_question.md @@ -0,0 +1,21 @@ +--- +name: General Question +about: Ask a general question about the protocol or worker +title: '[QUESTION] ' +labels: question +assignees: '' +--- + +## Question +A clear and concise description of your question. + +## Context +Any relevant context that might help us understand your question better. + +## Environment (if relevant) +- OS: +- Version: +- Hardware Specs: [CPU model, RAM, GPU] + +## Additional Information +Any other information that might be helpful. diff --git a/.github/ISSUE_TEMPLATE/setup_help.md b/.github/ISSUE_TEMPLATE/setup_help.md new file mode 100644 index 00000000..877e7d07 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/setup_help.md @@ -0,0 +1,20 @@ +--- +name: Setup Help +about: Questions about setting up or configuring the worker +title: '[SETUP] ' +labels: help-wanted, documentation +assignees: '' +--- + +## Environment +- OS: [e.g. Ubuntu 22.04] +- Worker Version: [e.g. v1.2.3] +- Hardware Specs: [CPU model, RAM, GPU] + +## What have you tried so far? +Steps you've already taken to configure your setup. + +## Specific Questions +Your specific questions about configuration or setup. + +## Logs/Configuration (if applicable) \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 41c3ff1d..94f28093 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2342,7 +2342,7 @@ dependencies = [ [[package]] name = "discovery" -version = "0.2.2" +version = "0.2.3" dependencies = [ "actix-web", "alloy", @@ -4349,7 +4349,7 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestrator" -version = "0.2.2" +version = "0.2.3" dependencies = [ "actix-web", "alloy", @@ -6477,7 +6477,7 @@ dependencies = [ [[package]] name = "validator" -version = "0.2.2" +version = "0.2.3" dependencies = [ "actix-web", "alloy", @@ -7126,7 +7126,7 @@ dependencies = [ [[package]] name = "worker" -version = "0.2.2" +version = "0.2.3" dependencies = [ "actix-web", "alloy", diff --git a/Cargo.toml b/Cargo.toml index 604198f5..316242df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["discovery", "worker", "validator", "shared", "orchestrator", "dev-ut resolver = "2" [workspace.package] -version = "0.2.2" +version = "0.2.3" edition = "2021" [workspace.features] diff --git a/Makefile b/Makefile index 6634aa4d..11504f77 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ mint-ai-tokens-to-federator: transfer-eth-to-provider: set -a; source ${ENV_FILE}; set +a; \ - cargo run -p dev-utils --example transfer_eth -- --address $${PROVIDER_ADDRESS} --key $${PRIVATE_KEY_FEDERATOR} --rpc-url $${RPC_URL} --amount 1000000000000000000 + cargo run -p dev-utils --example transfer_eth -- --address $${PROVIDER_ADDRESS} --key $${PRIVATE_KEY_FEDERATOR} --rpc-url $${RPC_URL} --amount 10000000000000000 transfer-eth-to-pool-owner: set -a; source ${ENV_FILE}; set +a; \ diff --git a/orchestrator/src/api/routes/heartbeat.rs b/orchestrator/src/api/routes/heartbeat.rs index 3c6dbbd2..b0c61131 100644 --- a/orchestrator/src/api/routes/heartbeat.rs +++ b/orchestrator/src/api/routes/heartbeat.rs @@ -1,9 +1,10 @@ -use crate::api::server::AppState; +use crate::{api::server::AppState, models::node::NodeStatus}; use actix_web::{ web::{self, post, Data}, HttpResponse, Scope, }; use alloy::primitives::Address; +use serde_json::json; use shared::models::heartbeat::{HeartbeatRequest, HeartbeatResponse}; use std::str::FromStr; @@ -13,6 +14,15 @@ async fn heartbeat( ) -> HttpResponse { let task_info = heartbeat.clone(); let node_address = Address::from_str(&heartbeat.address).unwrap(); + let node = app_state.store_context.node_store.get_node(&node_address); + if let Some(node) = node { + if node.status == NodeStatus::Banned { + return HttpResponse::BadRequest().json(json!({ + "success": false, + "error": "Node is banned" + })); + } + } app_state.store_context.node_store.update_node_task( node_address, diff --git a/orchestrator/src/api/routes/nodes.rs b/orchestrator/src/api/routes/nodes.rs index b8c5ab22..f67d51cc 100644 --- a/orchestrator/src/api/routes/nodes.rs +++ b/orchestrator/src/api/routes/nodes.rs @@ -4,9 +4,14 @@ use actix_web::{ HttpResponse, Scope, }; use alloy::primitives::Address; +use log::info; use serde_json::json; use shared::security::request_signer::sign_request; use std::str::FromStr; +use std::time::Duration; + +// Timeout for node operations in seconds +const NODE_REQUEST_TIMEOUT: u64 = 30; async fn get_nodes(app_state: Data) -> HttpResponse { let nodes = app_state.store_context.node_store.get_nodes(); @@ -52,6 +57,7 @@ async fn restart_node_task(node_id: web::Path, app_state: Data match reqwest::Client::new() .post(restart_url) + .timeout(Duration::from_secs(NODE_REQUEST_TIMEOUT)) .headers(headers) .body(payload.to_string()) .send() @@ -121,6 +127,7 @@ async fn get_node_logs(node_id: web::Path, app_state: Data) -> match reqwest::Client::new() .get(logs_url) + .timeout(Duration::from_secs(NODE_REQUEST_TIMEOUT)) .headers(headers) .send() .await @@ -156,12 +163,63 @@ async fn get_node_metrics(node_id: web::Path, app_state: Data) HttpResponse::Ok().json(json!({"success": true, "metrics": metrics})) } +async fn ban_node(node_id: web::Path, app_state: Data) -> HttpResponse { + info!("banning node: {}", node_id); + let node_address = match Address::from_str(&node_id) { + Ok(address) => address, + Err(_) => { + return HttpResponse::BadRequest().json(json!({ + "success": false, + "error": format!("Invalid node address: {}", node_id) + })); + } + }; + + let node = app_state.store_context.node_store.get_node(&node_address); + match node { + Some(node) => { + app_state + .store_context + .node_store + .update_node_status(&node.address, crate::models::node::NodeStatus::Banned); + + // Attempt to eject from pool + if let Some(contracts) = &app_state.contracts { + match contracts + .compute_pool + .eject_node(app_state.pool_id, node.address) + .await + { + Ok(_) => HttpResponse::Ok().json(json!({ + "success": true, + "message": format!("Node {} successfully ejected", node_id) + })), + Err(e) => HttpResponse::InternalServerError().json(json!({ + "success": false, + "error": format!("Failed to eject node from pool: {}", e) + })), + } + } else { + HttpResponse::InternalServerError().json(json!({ + "success": false, + "error": "Contracts not found" + })) + } + } + None => HttpResponse::NotFound().json(json!({ + "success": false, + "error": format!("Node not found: {}", node_id) + })), + } +} + pub fn nodes_routes() -> Scope { web::scope("/nodes") .route("", get().to(get_nodes)) .route("/{node_id}/restart", post().to(restart_node_task)) .route("/{node_id}/logs", get().to(get_node_logs)) .route("/{node_id}/metrics", get().to(get_node_metrics)) + .route("/{node_id}/ban", post().to(ban_node)) } #[cfg(test)] diff --git a/orchestrator/src/api/server.rs b/orchestrator/src/api/server.rs index 567229a1..9014f5e7 100644 --- a/orchestrator/src/api/server.rs +++ b/orchestrator/src/api/server.rs @@ -13,6 +13,7 @@ use log::info; use serde_json::json; use shared::security::api_key_middleware::ApiKeyMiddleware; use shared::security::auth_signature_middleware::{ValidateSignature, ValidatorState}; +use shared::web3::contracts::core::builder::Contracts; use shared::web3::wallet::Wallet; use std::sync::Arc; @@ -24,6 +25,8 @@ pub struct AppState { pub heartbeats: Arc, pub redis_store: Arc, pub hourly_upload_limit: i64, + pub contracts: Option>, + pub pool_id: u32, } #[allow(clippy::too_many_arguments)] @@ -38,6 +41,8 @@ pub async fn start_server( heartbeats: Arc, redis_store: Arc, hourly_upload_limit: i64, + contracts: Option>, + pool_id: u32, ) -> Result<(), Error> { info!("Starting server at http://{}:{}", host, port); let app_state = Data::new(AppState { @@ -48,6 +53,8 @@ pub async fn start_server( heartbeats, redis_store, hourly_upload_limit, + contracts, + pool_id, }); let node_store = app_state.store_context.node_store.clone(); let node_store_clone = node_store.clone(); diff --git a/orchestrator/src/api/tests/helper.rs b/orchestrator/src/api/tests/helper.rs index 4bdfe549..6ce533df 100644 --- a/orchestrator/src/api/tests/helper.rs +++ b/orchestrator/src/api/tests/helper.rs @@ -35,6 +35,8 @@ pub async fn create_test_app_state() -> Data { let store_context = Arc::new(StoreContext::new(store.clone())); Data::new(AppState { store_context: store_context.clone(), + contracts: None, + pool_id: 1, wallet: Arc::new( Wallet::new( "0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97", diff --git a/orchestrator/src/main.rs b/orchestrator/src/main.rs index 5b9ea11c..2e13569e 100644 --- a/orchestrator/src/main.rs +++ b/orchestrator/src/main.rs @@ -173,12 +173,13 @@ async fn main() -> Result<()> { // It also ejects nodes when they are dead. let status_update_store_context = store_context.clone(); let status_update_heartbeats = heartbeats.clone(); + let status_update_contracts = contracts.clone(); tasks.spawn(async move { let status_updater = NodeStatusUpdater::new( status_update_store_context.clone(), 15, None, - contracts.clone(), + status_update_contracts.clone(), compute_pool_id, args.disable_ejection, status_update_heartbeats.clone(), @@ -188,7 +189,7 @@ async fn main() -> Result<()> { let server_store_context = store_context.clone(); tokio::select! { - res = start_server("0.0.0.0", port, server_store_context.clone(), server_wallet, args.admin_api_key, args.s3_credentials, args.bucket_name, heartbeats.clone(), store.clone(), args.hourly_s3_upload_limit) => { + res = start_server("0.0.0.0", port, server_store_context.clone(), server_wallet, args.admin_api_key, args.s3_credentials, args.bucket_name, heartbeats.clone(), store.clone(), args.hourly_s3_upload_limit, Some(contracts.clone()), compute_pool_id) => { if let Err(e) = res { error!("Server error: {}", e); } diff --git a/orchestrator/src/models/node.rs b/orchestrator/src/models/node.rs index acfdfa3c..b408a9c8 100644 --- a/orchestrator/src/models/node.rs +++ b/orchestrator/src/models/node.rs @@ -66,4 +66,5 @@ pub enum NodeStatus { Unhealthy, Dead, Ejected, + Banned, } diff --git a/orchestrator/src/node/invite.rs b/orchestrator/src/node/invite.rs index 5f829839..9f01e96a 100644 --- a/orchestrator/src/node/invite.rs +++ b/orchestrator/src/node/invite.rs @@ -17,6 +17,10 @@ use std::time::SystemTime; use std::time::UNIX_EPOCH; use tokio::time::{interval, Duration}; +// Timeout constants +const REQUEST_TIMEOUT: u64 = 15; // 15 seconds for HTTP requests +const CONNECTION_TIMEOUT: u64 = 10; // 10 seconds for establishing connections + pub struct NodeInviter<'a> { wallet: &'a Wallet, pool_id: u32, @@ -51,7 +55,8 @@ impl<'a> NodeInviter<'a> { store_context, heartbeats, client: Client::builder() - .timeout(Duration::from_secs(15)) + .timeout(Duration::from_secs(REQUEST_TIMEOUT)) + .connect_timeout(Duration::from_secs(CONNECTION_TIMEOUT)) .build() .unwrap(), } diff --git a/shared/src/security/auth_signature_middleware.rs b/shared/src/security/auth_signature_middleware.rs index 3baab3ac..23373114 100644 --- a/shared/src/security/auth_signature_middleware.rs +++ b/shared/src/security/auth_signature_middleware.rs @@ -1,7 +1,6 @@ use actix_web::dev::Payload; use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}; -use actix_web::error::ErrorBadRequest; -use actix_web::error::PayloadError; +use actix_web::error::{ErrorBadRequest, PayloadError}; use actix_web::web::Bytes; use actix_web::web::BytesMut; use actix_web::HttpMessage; @@ -19,7 +18,12 @@ use std::pin::Pin; use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::time::timeout; // If you're using tokio + +// Maximum request body size in bytes +const MAX_BODY_SIZE: usize = 1024 * 1024 * 10; // 10MB +const BODY_TIMEOUT_SECS: u64 = 20; // 20 seconds type SyncAddressValidator = Arc bool + Send + Sync>; @@ -144,12 +148,40 @@ where .map(|s| s.to_string()); Box::pin(async move { - // Collect the full body + // Collect the full body with size limit and timeout let mut body = BytesMut::new(); let mut payload = req.take_payload(); - while let Some(chunk) = payload.next().await { - body.extend_from_slice(chunk?.as_ref()); - } + let start_time = Instant::now(); + + // Create a timeout for the entire body reading process + let body_read_future = async { + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + + // Check if adding this chunk would exceed the size limit + if body.len() + chunk.len() > MAX_BODY_SIZE { + return Err(ErrorBadRequest("Request body too large")); + } + + // Check if we've exceeded the time limit for reading the body + if start_time.elapsed() > Duration::from_secs(BODY_TIMEOUT_SECS) { + return Err(ErrorBadRequest("Request body read timeout")); + } + + body.extend_from_slice(chunk.as_ref()); + } + Ok::<_, Error>(body) + }; + + // Apply timeout to the entire body reading process as a fallback + let body_result = + match timeout(Duration::from_secs(BODY_TIMEOUT_SECS), body_read_future).await { + Ok(result) => result, + Err(_) => return Err(ErrorBadRequest("Request body read timeout")), + }; + + // If there was an error reading the body, return it + let body = body_result?; // Handle GET requests which do not have a payload let mut payload_string = String::new(); diff --git a/worker/src/operations/provider.rs b/worker/src/operations/provider.rs index d5417754..20e6432e 100644 --- a/worker/src/operations/provider.rs +++ b/worker/src/operations/provider.rs @@ -229,7 +229,7 @@ impl ProviderOperations { ); Console::info( "ETH Balance", - &format!("{} ETH", eth_balance / U256::from(10u128.pow(18))), + &format!("{:.6} ETH", { f64::from(eth_balance) / 10f64.powf(18.0) }), ); if balance < stake { Console::user_error(&format!(