diff --git a/Cargo.toml b/Cargo.toml index 30234086..878eec1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,4 +51,4 @@ uninlined_format_args = "warn" [workspace.lints.rust] unreachable_pub = "warn" -manual_let_else = "warn" +manual_let_else = "warn" \ No newline at end of file diff --git a/crates/worker/src/checks/hardware/gpu.rs b/crates/worker/src/checks/hardware/gpu.rs index 75c4ac28..059030f7 100644 --- a/crates/worker/src/checks/hardware/gpu.rs +++ b/crates/worker/src/checks/hardware/gpu.rs @@ -22,7 +22,7 @@ struct GpuDevice { indices: Vec, } -pub fn detect_gpu() -> Vec { +pub(crate) fn detect_gpu() -> Vec { Console::title("GPU Detection"); let gpu_devices = get_gpu_status(); diff --git a/crates/worker/src/checks/hardware/hardware_check.rs b/crates/worker/src/checks/hardware/hardware_check.rs index 684d417d..b90f0ef5 100644 --- a/crates/worker/src/checks/hardware/hardware_check.rs +++ b/crates/worker/src/checks/hardware/hardware_check.rs @@ -14,13 +14,13 @@ use std::sync::Arc; use sysinfo::{self, System}; use tokio::sync::RwLock; -pub struct HardwareChecker { +pub(crate) struct HardwareChecker { sys: System, issues: Arc>, } impl HardwareChecker { - pub fn new(issues: Option>>) -> Self { + pub(crate) fn new(issues: Option>>) -> Self { let mut sys = System::new_all(); sys.refresh_all(); @@ -30,7 +30,7 @@ impl HardwareChecker { } } - pub async fn check_hardware( + pub(crate) async fn check_hardware( &mut self, node_config: Node, storage_path_override: Option, diff --git a/crates/worker/src/checks/hardware/interconnect.rs b/crates/worker/src/checks/hardware/interconnect.rs index 9b49813c..21725686 100644 --- a/crates/worker/src/checks/hardware/interconnect.rs +++ b/crates/worker/src/checks/hardware/interconnect.rs @@ -2,10 +2,10 @@ use rand::RngCore; use reqwest::Client; use std::time::Instant; -pub struct InterconnectCheck; +pub(crate) struct InterconnectCheck; impl InterconnectCheck { - pub async fn check_speeds() -> Result<(f64, f64), Box> { + pub(crate) async fn check_speeds() -> Result<(f64, f64), Box> { let client = Client::new(); // Download test: Request a 10 MB file using the query parameter. diff --git a/crates/worker/src/checks/hardware/memory.rs b/crates/worker/src/checks/hardware/memory.rs index d44f3355..5e266331 100644 --- a/crates/worker/src/checks/hardware/memory.rs +++ b/crates/worker/src/checks/hardware/memory.rs @@ -3,17 +3,17 @@ use sysinfo::System; const BYTES_TO_GB: u64 = 1024 * 1024 * 1024; -pub fn get_memory_info(sys: &System) -> (u64, u64) { +pub(crate) fn get_memory_info(sys: &System) -> (u64, u64) { let total_memory = sys.total_memory(); let free_memory = sys.available_memory(); (total_memory, free_memory) } -pub fn convert_to_mb(memory: u64) -> u64 { +pub(crate) fn convert_to_mb(memory: u64) -> u64 { memory / (1024 * 1024) } -pub fn print_memory_info(total_memory: u64, free_memory: u64) { +pub(crate) fn print_memory_info(total_memory: u64, free_memory: u64) { let total_gb = (total_memory + BYTES_TO_GB / 2) / BYTES_TO_GB; let free_gb = (free_memory + BYTES_TO_GB / 2) / BYTES_TO_GB; Console::title("Memory Information:"); diff --git a/crates/worker/src/checks/hardware/mod.rs b/crates/worker/src/checks/hardware/mod.rs index 34f925f6..3a48f609 100644 --- a/crates/worker/src/checks/hardware/mod.rs +++ b/crates/worker/src/checks/hardware/mod.rs @@ -1,7 +1,7 @@ -pub mod gpu; -pub mod hardware_check; -pub mod interconnect; -pub mod memory; -pub mod storage; -pub mod storage_path; -pub use hardware_check::HardwareChecker; +pub(crate) mod gpu; +pub(crate) mod hardware_check; +pub(crate) mod interconnect; +pub(crate) mod memory; +pub(crate) mod storage; +pub(crate) mod storage_path; +pub(crate) use hardware_check::HardwareChecker; diff --git a/crates/worker/src/checks/hardware/storage.rs b/crates/worker/src/checks/hardware/storage.rs index 62418c0b..9509e731 100644 --- a/crates/worker/src/checks/hardware/storage.rs +++ b/crates/worker/src/checks/hardware/storage.rs @@ -6,17 +6,17 @@ use std::env; use std::ffi::CString; #[cfg(target_os = "linux")] use std::fs; -pub const BYTES_TO_GB: f64 = 1024.0 * 1024.0 * 1024.0; -pub const APP_DIR_NAME: &str = "prime-worker"; +pub(crate) const BYTES_TO_GB: f64 = 1024.0 * 1024.0 * 1024.0; +pub(crate) const APP_DIR_NAME: &str = "prime-worker"; #[derive(Clone)] -pub struct MountPoint { +pub(crate) struct MountPoint { pub path: String, pub available_space: u64, } #[cfg(unix)] -pub fn get_storage_info() -> Result<(f64, f64), std::io::Error> { +pub(crate) fn get_storage_info() -> Result<(f64, f64), std::io::Error> { let mut stat: libc::statvfs = unsafe { std::mem::zeroed() }; // Use current directory instead of hardcoded "." @@ -68,7 +68,7 @@ pub fn get_storage_info() -> Result<(f64, f64), std::io::Error> { )) } #[cfg(target_os = "linux")] -pub fn find_largest_storage() -> Option { +pub(crate) fn find_largest_storage() -> Option { const VALID_FS: [&str; 4] = ["ext4", "xfs", "btrfs", "zfs"]; const MIN_SPACE: u64 = 1_000_000_000; // 1GB minimum @@ -221,7 +221,7 @@ pub fn find_largest_storage() -> Option { } #[cfg(target_os = "linux")] -pub fn get_available_space(path: &str) -> Option { +pub(crate) fn get_available_space(path: &str) -> Option { let mut stats: statvfs_t = unsafe { std::mem::zeroed() }; if let Ok(path_c) = CString::new(path) { if unsafe { statvfs(path_c.as_ptr(), &mut stats) } == 0 { @@ -238,7 +238,7 @@ pub fn get_available_space(_path: &str) -> Option { } #[allow(dead_code)] -pub fn print_storage_info() { +pub(crate) fn print_storage_info() { match get_storage_info() { Ok((total, free)) => { Console::title("Storage Information:"); diff --git a/crates/worker/src/checks/hardware/storage_path.rs b/crates/worker/src/checks/hardware/storage_path.rs index c6a5f14e..387f057d 100644 --- a/crates/worker/src/checks/hardware/storage_path.rs +++ b/crates/worker/src/checks/hardware/storage_path.rs @@ -8,16 +8,16 @@ use log::info; use std::sync::Arc; use tokio::sync::RwLock; -pub struct StoragePathDetector { +pub(crate) struct StoragePathDetector { issues: Arc>, } impl StoragePathDetector { - pub fn new(issues: Arc>) -> Self { + pub(crate) fn new(issues: Arc>) -> Self { Self { issues } } - pub async fn detect_storage_path( + pub(crate) async fn detect_storage_path( &self, storage_path_override: Option, ) -> Result<(String, Option), Box> { diff --git a/crates/worker/src/checks/issue.rs b/crates/worker/src/checks/issue.rs index 50dfcded..850fd1c5 100644 --- a/crates/worker/src/checks/issue.rs +++ b/crates/worker/src/checks/issue.rs @@ -3,13 +3,13 @@ use std::fmt; use std::sync::{Arc, RwLock}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Severity { +pub(crate) enum Severity { Warning, Error, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum IssueType { +pub(crate) enum IssueType { NoGpu, // GPU required for compute DockerNotInstalled, // Docker required for containers ContainerToolkitNotInstalled, // Container toolkit required for GPU @@ -22,7 +22,7 @@ pub enum IssueType { } impl IssueType { - pub const fn severity(&self) -> Severity { + pub(crate) const fn severity(&self) -> Severity { match self { Self::NetworkConnectivityIssue | Self::InsufficientCpu @@ -38,24 +38,24 @@ impl IssueType { } #[derive(Debug, Clone)] -pub struct Issue { +pub(crate) struct Issue { issue_type: IssueType, message: String, } impl Issue { - pub fn new(issue_type: IssueType, message: impl Into) -> Self { + pub(crate) fn new(issue_type: IssueType, message: impl Into) -> Self { Self { issue_type, message: message.into(), } } - pub const fn severity(&self) -> Severity { + pub(crate) const fn severity(&self) -> Severity { self.issue_type.severity() } - pub fn print(&self) { + pub(crate) fn print(&self) { match self.severity() { Severity::Error => Console::user_error(&format!("{self}")), Severity::Warning => Console::warning(&format!("{self}")), @@ -70,22 +70,22 @@ impl fmt::Display for Issue { } #[derive(Debug, Default, Clone)] -pub struct IssueReport { +pub(crate) struct IssueReport { issues: Arc>>, } impl IssueReport { - pub fn new() -> Self { + pub(crate) fn new() -> Self { Self::default() } - pub fn add_issue(&self, issue_type: IssueType, message: impl Into) { + pub(crate) fn add_issue(&self, issue_type: IssueType, message: impl Into) { if let Ok(mut issues) = self.issues.write() { issues.push(Issue::new(issue_type, message)); } } - pub fn print_issues(&self) { + pub(crate) fn print_issues(&self) { if let Ok(issues) = self.issues.read() { if issues.is_empty() { Console::success("No issues found"); @@ -104,7 +104,7 @@ impl IssueReport { } } - pub fn has_critical_issues(&self) -> bool { + pub(crate) fn has_critical_issues(&self) -> bool { if let Ok(issues) = self.issues.read() { return issues .iter() diff --git a/crates/worker/src/checks/mod.rs b/crates/worker/src/checks/mod.rs index 37dd60ad..360de345 100644 --- a/crates/worker/src/checks/mod.rs +++ b/crates/worker/src/checks/mod.rs @@ -1,4 +1,4 @@ -pub mod hardware; -pub mod issue; -pub mod software; -pub mod stun; +pub(crate) mod hardware; +pub(crate) mod issue; +pub(crate) mod software; +pub(crate) mod stun; diff --git a/crates/worker/src/checks/software/docker.rs b/crates/worker/src/checks/software/docker.rs index e38a5178..9bd30536 100644 --- a/crates/worker/src/checks/software/docker.rs +++ b/crates/worker/src/checks/software/docker.rs @@ -5,7 +5,7 @@ use bollard::Docker; use std::sync::Arc; use tokio::sync::RwLock; -pub async fn check_docker_installed( +pub(crate) async fn check_docker_installed( issues: &Arc>, ) -> Result<(), Box> { let issue_tracker = issues.read().await; diff --git a/crates/worker/src/checks/software/mod.rs b/crates/worker/src/checks/software/mod.rs index 6a58c97c..37a915d9 100644 --- a/crates/worker/src/checks/software/mod.rs +++ b/crates/worker/src/checks/software/mod.rs @@ -1,4 +1,4 @@ -pub mod docker; -pub mod port; -pub mod software_check; -pub use software_check::SoftwareChecker; +pub(crate) mod docker; +pub(crate) mod port; +pub(crate) mod software_check; +pub(crate) use software_check::SoftwareChecker; diff --git a/crates/worker/src/checks/software/port.rs b/crates/worker/src/checks/software/port.rs index 382fcc2b..8deb6c7c 100644 --- a/crates/worker/src/checks/software/port.rs +++ b/crates/worker/src/checks/software/port.rs @@ -13,7 +13,10 @@ fn try_bind_port(port: u16) -> Result<()> { Ok(()) } -pub async fn check_port_available(issues: &Arc>, port: u16) -> Result<()> { +pub(crate) async fn check_port_available( + issues: &Arc>, + port: u16, +) -> Result<()> { let issue_tracker = issues.read().await; match try_bind_port(port) { diff --git a/crates/worker/src/checks/software/software_check.rs b/crates/worker/src/checks/software/software_check.rs index dfa8c8f1..2de88a17 100644 --- a/crates/worker/src/checks/software/software_check.rs +++ b/crates/worker/src/checks/software/software_check.rs @@ -5,18 +5,18 @@ use shared::models::node::Node; use std::sync::Arc; use tokio::sync::RwLock; -pub struct SoftwareChecker { +pub(crate) struct SoftwareChecker { issues: Arc>, } impl SoftwareChecker { - pub fn new(issues: Option>>) -> Self { + pub(crate) fn new(issues: Option>>) -> Self { Self { issues: issues.unwrap_or_else(|| Arc::new(RwLock::new(IssueReport::new()))), } } - pub async fn check_software( + pub(crate) async fn check_software( &self, node_config: &Node, ) -> Result<(), Box> { diff --git a/crates/worker/src/checks/stun.rs b/crates/worker/src/checks/stun.rs index a1393483..5830b49e 100644 --- a/crates/worker/src/checks/stun.rs +++ b/crates/worker/src/checks/stun.rs @@ -11,13 +11,13 @@ use stun::xoraddr::*; use tracing::{debug, error, info}; -pub struct StunCheck { +pub(crate) struct StunCheck { pub timeout: Duration, pub port: u16, } impl StunCheck { - pub fn new(timeout: Duration, port: u16) -> Self { + pub(crate) fn new(timeout: Duration, port: u16) -> Self { Self { timeout, port } } @@ -101,7 +101,9 @@ impl StunCheck { Ok(public_ip.to_string()) } - pub async fn get_public_ip(&self) -> Result> { + pub(crate) async fn get_public_ip( + &self, + ) -> Result> { let stun_servers = [ "stun.l.google.com:19302", "stun.stunprotocol.org:3478", diff --git a/crates/worker/src/cli/command.rs b/crates/worker/src/cli/command.rs index cf26a680..e7ad52bd 100644 --- a/crates/worker/src/cli/command.rs +++ b/crates/worker/src/cli/command.rs @@ -44,6 +44,7 @@ pub struct Cli { #[command(subcommand)] pub command: Commands, } + #[derive(Subcommand)] pub enum Commands { Run { diff --git a/crates/worker/src/cli/mod.rs b/crates/worker/src/cli/mod.rs index fc489769..e703cd42 100644 --- a/crates/worker/src/cli/mod.rs +++ b/crates/worker/src/cli/mod.rs @@ -1,2 +1,2 @@ -pub mod command; +pub(crate) mod command; pub use command::{execute_command, Cli}; diff --git a/crates/worker/src/console/console_logger.rs b/crates/worker/src/console/console_logger.rs index 937df46f..9703e4bd 100644 --- a/crates/worker/src/console/console_logger.rs +++ b/crates/worker/src/console/console_logger.rs @@ -2,7 +2,7 @@ use console::{style, Term}; use std::cmp; use unicode_width::UnicodeWidthStr; -pub struct Console; +pub(crate) struct Console; impl Console { /// Maximum content width for the box. @@ -35,7 +35,7 @@ impl Console { } /// Prints a section header as an aligned box. - pub fn section(title: &str) { + pub(crate) fn section(title: &str) { let content_width = Self::get_content_width(); let top_border = format!("╔{}╗", "═".repeat(content_width)); let centered_title = Self::center_text(title, content_width); @@ -49,23 +49,23 @@ impl Console { } /// Prints a sub-title. - pub fn title(text: &str) { + pub(crate) fn title(text: &str) { println!(); println!("{}", style(text).white().bold()); } /// Prints an informational message. - pub fn info(label: &str, value: &str) { + pub(crate) fn info(label: &str, value: &str) { println!("{}: {}", style(label).dim().white(), style(value).white()); } /// Prints a success message. - pub fn success(text: &str) { + pub(crate) fn success(text: &str) { println!("{} {}", style("✓").green().bold(), style(text).green()); } /// Prints a warning message. - pub fn warning(text: &str) { + pub(crate) fn warning(text: &str) { println!("{} {}", style("⚠").yellow().bold(), style(text).yellow()); } @@ -74,12 +74,12 @@ impl Console { /// rather than a system error. These errors are not logged to central logging systems /// and are only displayed to the user to help them resolve the issue. /// For actual system errors that should be tracked, use proper error logging instead. - pub fn user_error(text: &str) { + pub(crate) fn user_error(text: &str) { println!("{} {}", style("✗").red().bold(), style(text).red()); } /// Prints a progress message. - pub fn progress(text: &str) { + pub(crate) fn progress(text: &str) { println!("{} {}", style("→").cyan().bold(), style(text).cyan()); } } diff --git a/crates/worker/src/console/mod.rs b/crates/worker/src/console/mod.rs index 493b54d2..de601ba6 100644 --- a/crates/worker/src/console/mod.rs +++ b/crates/worker/src/console/mod.rs @@ -1,2 +1,2 @@ -pub use console_logger::*; +pub(crate) use console_logger::*; mod console_logger; diff --git a/crates/worker/src/docker/docker_manager.rs b/crates/worker/src/docker/docker_manager.rs index 5625f515..e9a37b7d 100644 --- a/crates/worker/src/docker/docker_manager.rs +++ b/crates/worker/src/docker/docker_manager.rs @@ -20,7 +20,7 @@ use std::time::Duration; use strip_ansi_escapes::strip; #[derive(Debug, Clone)] -pub struct ContainerInfo { +pub(crate) struct ContainerInfo { pub id: String, #[allow(unused)] pub image: String, @@ -30,7 +30,7 @@ pub struct ContainerInfo { } #[derive(Debug, Clone)] -pub struct ContainerDetails { +pub(crate) struct ContainerDetails { #[allow(unused)] pub id: String, #[allow(unused)] @@ -43,7 +43,7 @@ pub struct ContainerDetails { pub created: i64, } -pub struct DockerManager { +pub(crate) struct DockerManager { docker: Docker, storage_path: String, /// Controls whether to use host network mode for containers. @@ -137,7 +137,10 @@ impl DockerManager { } /// Create a new DockerManager instance - pub fn new(storage_path: String, disable_host_network_mode: bool) -> Result { + pub(crate) fn new( + storage_path: String, + disable_host_network_mode: bool, + ) -> Result { let docker = match Docker::connect_with_unix_defaults() { Ok(docker) => docker, Err(e) => { @@ -170,7 +173,7 @@ impl DockerManager { } /// Pull a Docker image if it doesn't exist locally - pub async fn pull_image(&self, image: &str) -> Result<(), DockerError> { + pub(crate) async fn pull_image(&self, image: &str) -> Result<(), DockerError> { debug!("Checking if image needs to be pulled: {image}"); // Check if the image uses :latest or :main tag @@ -219,7 +222,7 @@ impl DockerManager { #[allow(clippy::too_many_arguments)] /// Start a new container with the given image and configuration - pub async fn start_container( + pub(crate) async fn start_container( &self, image: &str, name: &str, @@ -482,7 +485,7 @@ impl DockerManager { } /// Remove container, volumes, and directories - pub async fn remove_container(&self, container_id: &str) -> Result<(), DockerError> { + pub(crate) async fn remove_container(&self, container_id: &str) -> Result<(), DockerError> { let container = (self.get_container_details(container_id).await).ok(); if container.is_some() { @@ -707,7 +710,10 @@ impl DockerManager { Ok(()) } - pub async fn list_containers(&self, list_all: bool) -> Result, DockerError> { + pub(crate) async fn list_containers( + &self, + list_all: bool, + ) -> Result, DockerError> { debug!("Listing running containers"); let options = Some(ListContainersOptions:: { all: list_all, // If true, list all containers. If false, only list running containers @@ -729,7 +735,7 @@ impl DockerManager { } /// Get details for a specific container by ID - pub async fn get_container_details( + pub(crate) async fn get_container_details( &self, container_id: &str, ) -> Result { @@ -753,14 +759,14 @@ impl DockerManager { Ok(info) } - pub async fn restart_container(&self, container_id: &str) -> Result<(), DockerError> { + pub(crate) async fn restart_container(&self, container_id: &str) -> Result<(), DockerError> { debug!("Restarting container: {container_id}"); self.docker.restart_container(container_id, None).await?; debug!("Container {container_id} restarted successfully"); Ok(()) } - pub async fn get_container_logs( + pub(crate) async fn get_container_logs( &self, container_id: &str, tail: Option, @@ -833,7 +839,7 @@ impl DockerManager { Ok(logs) } - pub async fn inspect_container( + pub(crate) async fn inspect_container( &self, container_id: &str, ) -> Result { diff --git a/crates/worker/src/docker/mod.rs b/crates/worker/src/docker/mod.rs index 1d582b6d..535640e9 100644 --- a/crates/worker/src/docker/mod.rs +++ b/crates/worker/src/docker/mod.rs @@ -1,9 +1,9 @@ -pub mod docker_manager; -pub mod service; -pub mod state; -pub mod task_container; -pub mod taskbridge; +pub(crate) mod docker_manager; +pub(crate) mod service; +pub(crate) mod state; +pub(crate) mod task_container; +pub(crate) mod taskbridge; -pub use docker_manager::DockerManager; -pub use service::DockerService; -pub use state::DockerState; +pub(crate) use docker_manager::DockerManager; +pub(crate) use service::DockerService; +pub(crate) use state::DockerState; diff --git a/crates/worker/src/docker/service.rs b/crates/worker/src/docker/service.rs index 741ce8cd..63425e2d 100644 --- a/crates/worker/src/docker/service.rs +++ b/crates/worker/src/docker/service.rs @@ -16,7 +16,7 @@ use tokio::sync::Mutex; use tokio::time::{interval, Duration}; use tokio_util::sync::CancellationToken; -pub struct DockerService { +pub(crate) struct DockerService { docker_manager: Arc, cancellation_token: CancellationToken, pub state: Arc, @@ -32,7 +32,7 @@ const RESTART_INTERVAL_SECONDS: i64 = 10; impl DockerService { #[allow(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( cancellation_token: CancellationToken, gpu: Option, system_memory_mb: Option, @@ -56,7 +56,7 @@ impl DockerService { } } - pub async fn run(&self) -> Result<(), Box> { + pub(crate) async fn run(&self) -> Result<(), Box> { let mut interval = interval(Duration::from_secs(5)); let manager = self.docker_manager.clone(); let cancellation_token = self.cancellation_token.clone(); @@ -319,7 +319,7 @@ impl DockerService { Ok(()) } - pub async fn get_logs(&self) -> Result> { + pub(crate) async fn get_logs(&self) -> Result> { let current_task = self.state.get_current_task().await; match current_task { Some(task) => { @@ -340,7 +340,7 @@ impl DockerService { } } - pub async fn restart_task(&self) -> Result<(), Box> { + pub(crate) async fn restart_task(&self) -> Result<(), Box> { let current_task = self.state.get_current_task().await; match current_task { Some(task) => { @@ -353,7 +353,7 @@ impl DockerService { } } - pub async fn get_task_details(&self, task: &Task) -> Option { + pub(crate) async fn get_task_details(&self, task: &Task) -> Option { let config_hash = task.generate_config_hash(); let container_name = format!("{}-{}-{:x}", TASK_PREFIX, task.id, config_hash); diff --git a/crates/worker/src/docker/state.rs b/crates/worker/src/docker/state.rs index 2527140e..fb44ac66 100644 --- a/crates/worker/src/docker/state.rs +++ b/crates/worker/src/docker/state.rs @@ -3,14 +3,14 @@ use shared::models::task::{Task, TaskState}; use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; -pub struct DockerState { +pub(crate) struct DockerState { current_task: Arc>>, last_started: Arc>>>, is_running: Arc>, } impl DockerState { - pub fn new() -> Self { + pub(crate) fn new() -> Self { Self { current_task: Arc::new(Mutex::new(None)), last_started: Arc::new(Mutex::new(None)), @@ -18,7 +18,7 @@ impl DockerState { } } - pub async fn set_current_task(&self, task: Option) { + pub(crate) async fn set_current_task(&self, task: Option) { let mut current_task = self.current_task.lock().await; if let Some(existing_task) = &*current_task { if let Some(new_task) = &task { @@ -32,7 +32,7 @@ impl DockerState { *current_task = task; } - pub async fn update_task_state(&self, task_id: Uuid, state: TaskState) { + pub(crate) async fn update_task_state(&self, task_id: Uuid, state: TaskState) { let mut current_task = self.current_task.lock().await; if let Some(task) = current_task.as_mut() { if task.id == task_id { @@ -43,27 +43,27 @@ impl DockerState { } } - pub async fn get_current_task(&self) -> Option { + pub(crate) async fn get_current_task(&self) -> Option { let current_task = self.current_task.lock().await; current_task.clone() } - pub async fn get_last_started(&self) -> Option> { + pub(crate) async fn get_last_started(&self) -> Option> { let last_started = self.last_started.lock().await; *last_started } - pub async fn set_last_started(&self, last_started: DateTime) { + pub(crate) async fn set_last_started(&self, last_started: DateTime) { let mut last_started_guard = self.last_started.lock().await; *last_started_guard = Some(last_started); } - pub async fn get_is_running(&self) -> bool { + pub(crate) async fn get_is_running(&self) -> bool { let is_running = self.is_running.lock().await; *is_running } - pub async fn set_is_running(&self, is_running: bool) { + pub(crate) async fn set_is_running(&self, is_running: bool) { let mut is_running_guard = self.is_running.lock().await; *is_running_guard = is_running; } diff --git a/crates/worker/src/docker/task_container.rs b/crates/worker/src/docker/task_container.rs index 5a6f903f..726c230c 100644 --- a/crates/worker/src/docker/task_container.rs +++ b/crates/worker/src/docker/task_container.rs @@ -1,11 +1,11 @@ #[derive(Debug, Clone, PartialEq)] -pub struct TaskContainer { +pub(crate) struct TaskContainer { pub task_id: String, pub config_hash: String, } impl TaskContainer { - pub fn data_dir_name(&self) -> String { + pub(crate) fn data_dir_name(&self) -> String { format!("prime-task-{}", self.task_id) } } diff --git a/crates/worker/src/docker/taskbridge/bridge.rs b/crates/worker/src/docker/taskbridge/bridge.rs index 9a461b43..962ce908 100644 --- a/crates/worker/src/docker/taskbridge/bridge.rs +++ b/crates/worker/src/docker/taskbridge/bridge.rs @@ -24,7 +24,7 @@ use tokio::{io::BufReader, net::UnixListener}; const DEFAULT_SOCKET_FILE: &str = "prime-worker/com.prime.worker/metrics.sock"; -pub struct TaskBridge { +pub(crate) struct TaskBridge { socket_path: std::path::PathBuf, config: TaskBridgeConfig, } diff --git a/crates/worker/src/docker/taskbridge/file_handler.rs b/crates/worker/src/docker/taskbridge/file_handler.rs index 07736e90..4263d489 100644 --- a/crates/worker/src/docker/taskbridge/file_handler.rs +++ b/crates/worker/src/docker/taskbridge/file_handler.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; /// Handles a file upload request -pub async fn handle_file_upload( +pub(crate) async fn handle_file_upload( storage_path: String, task_id: String, file_name: String, @@ -309,7 +309,7 @@ pub async fn handle_file_upload( } /// Handles a file validation request -pub async fn handle_file_validation( +pub(crate) async fn handle_file_validation( file_sha: String, contracts: Contracts, node: Node, diff --git a/crates/worker/src/docker/taskbridge/json_helper.rs b/crates/worker/src/docker/taskbridge/json_helper.rs index 222058f8..c18c84dc 100644 --- a/crates/worker/src/docker/taskbridge/json_helper.rs +++ b/crates/worker/src/docker/taskbridge/json_helper.rs @@ -1,5 +1,5 @@ // Helper function to extract the next complete JSON object from a string -pub fn extract_next_json(input: &[u8]) -> Option<(&str, usize)> { +pub(super) fn extract_next_json(input: &[u8]) -> Option<(&str, usize)> { // Skip any leading whitespace (including newlines) let mut start_pos = 0; while start_pos < input.len() && (input[start_pos] <= 32) { diff --git a/crates/worker/src/docker/taskbridge/mod.rs b/crates/worker/src/docker/taskbridge/mod.rs index b1bd471e..ea2bf128 100644 --- a/crates/worker/src/docker/taskbridge/mod.rs +++ b/crates/worker/src/docker/taskbridge/mod.rs @@ -1,5 +1,5 @@ -pub mod bridge; -pub mod file_handler; +pub(crate) mod bridge; +pub(crate) mod file_handler; mod json_helper; -pub use bridge::TaskBridge; +pub(crate) use bridge::TaskBridge; diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs new file mode 100644 index 00000000..8fe8e1ea --- /dev/null +++ b/crates/worker/src/lib.rs @@ -0,0 +1,16 @@ +mod checks; +mod cli; +mod console; +mod docker; +mod metrics; +mod operations; +mod p2p; +mod services; +mod state; +mod utils; + +pub use cli::execute_command; +pub use cli::Cli; +pub use utils::logging::setup_logging; + +pub type TaskHandles = std::sync::Arc>>>; diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 6ca4cab2..e8032d5a 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,23 +1,13 @@ -mod checks; -mod cli; -mod console; -mod docker; -mod metrics; -mod operations; -mod p2p; -mod services; -mod state; -mod utils; use clap::Parser; -use cli::{execute_command, Cli}; use std::panic; use std::sync::Arc; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use utils::logging::setup_logging; -pub type TaskHandles = Arc>>>; + +use worker::TaskHandles; +use worker::{execute_command, setup_logging, Cli}; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/crates/worker/src/metrics/mod.rs b/crates/worker/src/metrics/mod.rs index 55c88cbf..43654ebb 100644 --- a/crates/worker/src/metrics/mod.rs +++ b/crates/worker/src/metrics/mod.rs @@ -1 +1 @@ -pub mod store; +pub(crate) mod store; diff --git a/crates/worker/src/metrics/store.rs b/crates/worker/src/metrics/store.rs index ba8aedf6..e6712d27 100644 --- a/crates/worker/src/metrics/store.rs +++ b/crates/worker/src/metrics/store.rs @@ -4,18 +4,23 @@ use std::collections::HashMap; use tokio::sync::RwLock; #[derive(Debug)] -pub struct MetricsStore { +pub(crate) struct MetricsStore { metrics: RwLock>, } impl MetricsStore { - pub fn new() -> Self { + pub(crate) fn new() -> Self { Self { metrics: RwLock::new(HashMap::new()), } } - pub async fn update_metric(&self, task_id: String, label: String, value: f64) -> Result<()> { + pub(crate) async fn update_metric( + &self, + task_id: String, + label: String, + value: f64, + ) -> Result<()> { if !value.is_finite() { anyhow::bail!("Value must be a finite number"); } @@ -25,7 +30,7 @@ impl MetricsStore { Ok(()) } - pub async fn get_metrics_for_task(&self, task_id: String) -> Vec { + pub(crate) async fn get_metrics_for_task(&self, task_id: String) -> Vec { self.metrics .read() .await @@ -38,13 +43,13 @@ impl MetricsStore { .collect() } - pub async fn clear_metrics_for_task(&self, task_id: &str) { + pub(crate) async fn clear_metrics_for_task(&self, task_id: &str) { let mut metrics = self.metrics.write().await; metrics.retain(|key, _| key.task_id != task_id); } #[allow(dead_code)] - pub async fn get_all_metrics(&self) -> HashMap { + pub(crate) async fn get_all_metrics(&self) -> HashMap { let metrics = self.metrics.read().await; metrics.clone() } diff --git a/crates/worker/src/operations/compute_node.rs b/crates/worker/src/operations/compute_node.rs index 86bd6dca..39b18c29 100644 --- a/crates/worker/src/operations/compute_node.rs +++ b/crates/worker/src/operations/compute_node.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use tokio::time::{sleep, Duration}; use tokio_util::sync::CancellationToken; -pub struct ComputeNodeOperations<'c> { +pub(crate) struct ComputeNodeOperations<'c> { provider_wallet: &'c Wallet, node_wallet: &'c Wallet, contracts: Contracts, @@ -15,7 +15,7 @@ pub struct ComputeNodeOperations<'c> { } impl<'c> ComputeNodeOperations<'c> { - pub fn new( + pub(crate) fn new( provider_wallet: &'c Wallet, node_wallet: &'c Wallet, contracts: Contracts, @@ -29,7 +29,7 @@ impl<'c> ComputeNodeOperations<'c> { } } - pub fn start_monitoring( + pub(crate) fn start_monitoring( &self, cancellation_token: CancellationToken, pool_id: String, @@ -115,7 +115,9 @@ impl<'c> ComputeNodeOperations<'c> { Ok(()) } - pub async fn check_compute_node_exists(&self) -> Result> { + pub(crate) async fn check_compute_node_exists( + &self, + ) -> Result> { let compute_node = self .contracts .compute_registry @@ -132,7 +134,7 @@ impl<'c> ComputeNodeOperations<'c> { } // Returns true if the compute node was added, false if it already exists - pub async fn add_compute_node( + pub(crate) async fn add_compute_node( &self, compute_units: U256, ) -> Result> { @@ -164,7 +166,7 @@ impl<'c> ComputeNodeOperations<'c> { Ok(true) } - pub async fn remove_compute_node(&self) -> Result> { + pub(crate) async fn remove_compute_node(&self) -> Result> { Console::title("🔄 Removing compute node"); if !self.check_compute_node_exists().await? { diff --git a/crates/worker/src/operations/heartbeat/mod.rs b/crates/worker/src/operations/heartbeat/mod.rs index 1f278a4d..60f97844 100644 --- a/crates/worker/src/operations/heartbeat/mod.rs +++ b/crates/worker/src/operations/heartbeat/mod.rs @@ -1 +1 @@ -pub mod service; +pub(crate) mod service; diff --git a/crates/worker/src/operations/heartbeat/service.rs b/crates/worker/src/operations/heartbeat/service.rs index ea37294d..0d77d783 100644 --- a/crates/worker/src/operations/heartbeat/service.rs +++ b/crates/worker/src/operations/heartbeat/service.rs @@ -2,7 +2,6 @@ use crate::docker::DockerService; use crate::metrics::store::MetricsStore; use crate::state::system_state::SystemState; use crate::TaskHandles; -use log; use log::info; use reqwest::Client; use shared::models::api::ApiResponse; @@ -13,8 +12,9 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{interval, Duration}; use tokio_util::sync::CancellationToken; + #[derive(Clone)] -pub struct HeartbeatService { +pub(crate) struct HeartbeatService { state: Arc, interval: Duration, client: Client, @@ -26,7 +26,7 @@ pub struct HeartbeatService { } #[derive(Debug, Clone, thiserror::Error)] -pub enum HeartbeatError { +pub(crate) enum HeartbeatError { #[error("HTTP request failed")] RequestFailed, #[error("Service initialization failed")] @@ -34,7 +34,7 @@ pub enum HeartbeatError { } impl HeartbeatService { #[allow(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( interval: Duration, cancellation_token: CancellationToken, task_handles: TaskHandles, @@ -60,14 +60,14 @@ impl HeartbeatService { })) } - pub async fn activate_heartbeat_if_endpoint_exists(&self) { + pub(crate) async fn activate_heartbeat_if_endpoint_exists(&self) { if let Some(endpoint) = self.state.get_heartbeat_endpoint().await { info!("Starting heartbeat from recovered state"); self.start(endpoint).await.unwrap(); } } - pub async fn start(&self, endpoint: String) -> Result<(), HeartbeatError> { + pub(crate) async fn start(&self, endpoint: String) -> Result<(), HeartbeatError> { if self.state.is_running().await { return Ok(()); } @@ -130,7 +130,7 @@ impl HeartbeatService { } #[allow(dead_code)] - pub async fn stop(&self) { + pub(crate) async fn stop(&self) { if let Err(e) = self.state.set_running(false, None).await { log::error!("Failed to set running to false: {e:?}"); } diff --git a/crates/worker/src/operations/mod.rs b/crates/worker/src/operations/mod.rs index 4a4df454..193b64ae 100644 --- a/crates/worker/src/operations/mod.rs +++ b/crates/worker/src/operations/mod.rs @@ -1,3 +1,3 @@ -pub mod compute_node; -pub mod heartbeat; -pub mod provider; +pub(crate) mod compute_node; +pub(crate) mod heartbeat; +pub(crate) mod provider; diff --git a/crates/worker/src/operations/provider.rs b/crates/worker/src/operations/provider.rs index a683c0bf..fb8aba5f 100644 --- a/crates/worker/src/operations/provider.rs +++ b/crates/worker/src/operations/provider.rs @@ -9,14 +9,18 @@ use std::{fmt, io}; use tokio::time::{sleep, Duration}; use tokio_util::sync::CancellationToken; -pub struct ProviderOperations { +pub(crate) struct ProviderOperations { wallet: Wallet, contracts: Contracts, auto_accept: bool, } impl ProviderOperations { - pub fn new(wallet: Wallet, contracts: Contracts, auto_accept: bool) -> Self { + pub(crate) fn new( + wallet: Wallet, + contracts: Contracts, + auto_accept: bool, + ) -> Self { Self { wallet, contracts, @@ -40,7 +44,7 @@ impl ProviderOperations { } } - pub fn start_monitoring(&self, cancellation_token: CancellationToken) { + pub(crate) fn start_monitoring(&self, cancellation_token: CancellationToken) { let provider_address = self.wallet.wallet.default_signer().address(); let contracts = self.contracts.clone(); @@ -142,7 +146,7 @@ impl ProviderOperations { }); } - pub async fn check_provider_exists(&self) -> Result { + pub(crate) async fn check_provider_exists(&self) -> Result { let address = self.wallet.wallet.default_signer().address(); let provider = self @@ -155,7 +159,7 @@ impl ProviderOperations { Ok(provider.provider_address != Address::default()) } - pub async fn check_provider_whitelisted(&self) -> Result { + pub(crate) async fn check_provider_whitelisted(&self) -> Result { let address = self.wallet.wallet.default_signer().address(); let provider = self @@ -168,7 +172,7 @@ impl ProviderOperations { Ok(provider.is_whitelisted) } - pub async fn retry_register_provider( + pub(crate) async fn retry_register_provider( &self, stake: U256, max_attempts: u32, @@ -202,7 +206,7 @@ impl ProviderOperations { Err(ProviderError::Other) } - pub async fn register_provider(&self, stake: U256) -> Result<(), ProviderError> { + pub(crate) async fn register_provider(&self, stake: U256) -> Result<(), ProviderError> { let address = self.wallet.wallet.default_signer().address(); let balance: U256 = self .contracts @@ -326,7 +330,7 @@ impl ProviderOperations { Ok(()) } - pub async fn increase_stake(&self, additional_stake: U256) -> Result<(), ProviderError> { + pub(crate) async fn increase_stake(&self, additional_stake: U256) -> Result<(), ProviderError> { Console::title("💰 Increasing Provider Stake"); let address = self.wallet.wallet.default_signer().address(); @@ -379,7 +383,7 @@ impl ProviderOperations { Ok(()) } - pub async fn reclaim_stake(&self, amount: U256) -> Result<(), ProviderError> { + pub(crate) async fn reclaim_stake(&self, amount: U256) -> Result<(), ProviderError> { Console::progress("Reclaiming stake"); let reclaim_tx = match self.contracts.prime_network.reclaim_stake(amount).await { Ok(tx) => tx, @@ -398,7 +402,7 @@ impl ProviderOperations { } #[derive(Debug)] -pub enum ProviderError { +pub(crate) enum ProviderError { NotWhitelisted, UserCancelled, Other, diff --git a/crates/worker/src/p2p/mod.rs b/crates/worker/src/p2p/mod.rs index 167f5859..9393f985 100644 --- a/crates/worker/src/p2p/mod.rs +++ b/crates/worker/src/p2p/mod.rs @@ -1,4 +1,4 @@ -pub mod service; +pub(crate) mod service; -pub use service::P2PContext; -pub use service::P2PService; +pub(crate) use service::P2PContext; +pub(crate) use service::P2PService; diff --git a/crates/worker/src/p2p/service.rs b/crates/worker/src/p2p/service.rs index 6e88d32a..51a68405 100644 --- a/crates/worker/src/p2p/service.rs +++ b/crates/worker/src/p2p/service.rs @@ -30,7 +30,7 @@ lazy_static! { } #[derive(Clone)] -pub struct P2PContext { +pub(crate) struct P2PContext { pub docker_service: Arc, pub heartbeat_service: Arc, pub system_state: Arc, @@ -40,7 +40,7 @@ pub struct P2PContext { } #[derive(Clone)] -pub struct P2PService { +pub(crate) struct P2PService { endpoint: Endpoint, secret_key: SecretKey, node_id: String, @@ -58,7 +58,7 @@ enum EndpointLoopResult { impl P2PService { /// Create a new P2P service with a unique worker identity - pub async fn new( + pub(crate) async fn new( worker_p2p_seed: Option, cancellation_token: CancellationToken, context: Option, @@ -111,12 +111,12 @@ impl P2PService { } /// Get the P2P node ID - pub fn node_id(&self) -> &str { + pub(crate) fn node_id(&self) -> &str { &self.node_id } /// Get the listening addresses - pub fn listening_addresses(&self) -> &[String] { + pub(crate) fn listening_addresses(&self) -> &[String] { &self.listening_addrs } @@ -143,7 +143,7 @@ impl P2PService { Ok(endpoint) } /// Start accepting incoming connections with automatic recovery - pub fn start(&self) -> Result<()> { + pub(crate) fn start(&self) -> Result<()> { let service = Arc::new(self.clone()); let cancellation_token = self.cancellation_token.clone(); diff --git a/crates/worker/src/services/discovery.rs b/crates/worker/src/services/discovery.rs index 90364745..2088215c 100644 --- a/crates/worker/src/services/discovery.rs +++ b/crates/worker/src/services/discovery.rs @@ -3,14 +3,14 @@ use shared::models::node::Node; use shared::security::request_signer::sign_request_with_nonce; use shared::web3::wallet::Wallet; -pub struct DiscoveryService { +pub(crate) struct DiscoveryService { wallet: Wallet, base_urls: Vec, endpoint: String, } impl DiscoveryService { - pub fn new(wallet: Wallet, base_urls: Vec, endpoint: Option) -> Self { + pub(crate) fn new(wallet: Wallet, base_urls: Vec, endpoint: Option) -> Self { let urls = if base_urls.is_empty() { vec!["http://localhost:8089".to_string()] } else { @@ -73,7 +73,7 @@ impl DiscoveryService { Ok(()) } - pub async fn upload_discovery_info(&self, node_config: &Node) -> Result<()> { + pub(crate) async fn upload_discovery_info(&self, node_config: &Node) -> Result<()> { let mut last_error: Option = None; for base_url in &self.base_urls { diff --git a/crates/worker/src/services/discovery_updater.rs b/crates/worker/src/services/discovery_updater.rs index 17813170..148a5f10 100644 --- a/crates/worker/src/services/discovery_updater.rs +++ b/crates/worker/src/services/discovery_updater.rs @@ -10,7 +10,7 @@ use tokio_util::sync::CancellationToken; const INITIAL_UPDATE_DELAY: Duration = Duration::from_secs(120); const UPDATE_INTERVAL: Duration = Duration::from_secs(120); -pub struct DiscoveryUpdater { +pub(crate) struct DiscoveryUpdater { discovery_service: Arc, is_running: Arc, system_state: Arc, @@ -18,7 +18,7 @@ pub struct DiscoveryUpdater { } impl DiscoveryUpdater { - pub fn new(discovery_service: DiscoveryService, system_state: Arc) -> Self { + pub(crate) fn new(discovery_service: DiscoveryService, system_state: Arc) -> Self { Self { discovery_service: Arc::new(discovery_service), is_running: Arc::new(AtomicBool::new(false)), @@ -27,7 +27,7 @@ impl DiscoveryUpdater { } } - pub fn start_auto_update(&self, node_config: Node) { + pub(crate) fn start_auto_update(&self, node_config: Node) { if self.is_running.load(Ordering::SeqCst) { debug!("Auto update already running, skipping start"); return; diff --git a/crates/worker/src/services/mod.rs b/crates/worker/src/services/mod.rs index 4903673d..82907023 100644 --- a/crates/worker/src/services/mod.rs +++ b/crates/worker/src/services/mod.rs @@ -1,2 +1,2 @@ -pub mod discovery; -pub mod discovery_updater; +pub(crate) mod discovery; +pub(crate) mod discovery_updater; diff --git a/crates/worker/src/state/mod.rs b/crates/worker/src/state/mod.rs index bfe42102..4e58ee4e 100644 --- a/crates/worker/src/state/mod.rs +++ b/crates/worker/src/state/mod.rs @@ -1 +1 @@ -pub mod system_state; +pub(crate) mod system_state; diff --git a/crates/worker/src/state/system_state.rs b/crates/worker/src/state/system_state.rs index 5a1de63d..fd8f0a3a 100644 --- a/crates/worker/src/state/system_state.rs +++ b/crates/worker/src/state/system_state.rs @@ -28,7 +28,7 @@ struct PersistedSystemState { } #[derive(Debug, Clone)] -pub struct SystemState { +pub(crate) struct SystemState { last_heartbeat: Arc>>, is_running: Arc>, // Keep is_running in the normal heartbeat state endpoint: Arc>>, @@ -42,7 +42,7 @@ pub struct SystemState { } impl SystemState { - pub fn new( + pub(crate) fn new( state_dir: Option, disable_state_storing: bool, compute_pool_id: Option, @@ -160,24 +160,24 @@ impl SystemState { Ok(None) } - pub fn get_p2p_seed(&self) -> Option { + pub(crate) fn get_p2p_seed(&self) -> Option { self.p2p_seed } - pub fn get_p2p_id(&self) -> Option { + pub(crate) fn get_p2p_id(&self) -> Option { self.p2p_id.clone() } - pub async fn update_last_heartbeat(&self) { + pub(crate) async fn update_last_heartbeat(&self) { let mut heartbeat = self.last_heartbeat.write().await; *heartbeat = Some(std::time::Instant::now()); } - pub async fn is_running(&self) -> bool { + pub(crate) async fn is_running(&self) -> bool { *self.is_running.read().await } - pub async fn set_running( + pub(crate) async fn set_running( &self, running: bool, heartbeat_endpoint: Option, @@ -209,7 +209,7 @@ impl SystemState { Ok(()) } - pub async fn get_heartbeat_endpoint(&self) -> Option { + pub(crate) async fn get_heartbeat_endpoint(&self) -> Option { let endpoint = self.endpoint.read().await; endpoint.clone() } diff --git a/crates/worker/src/utils/mod.rs b/crates/worker/src/utils/mod.rs index cf5eb61e..210f1e35 100644 --- a/crates/worker/src/utils/mod.rs +++ b/crates/worker/src/utils/mod.rs @@ -1,2 +1,2 @@ -pub mod logging; -pub mod p2p; +pub(crate) mod logging; +pub(crate) mod p2p; diff --git a/crates/worker/src/utils/p2p.rs b/crates/worker/src/utils/p2p.rs index e20608a3..ef07b28c 100644 --- a/crates/worker/src/utils/p2p.rs +++ b/crates/worker/src/utils/p2p.rs @@ -4,12 +4,12 @@ use rand_v8::{rngs::StdRng, SeedableRng}; use std::error::Error; /// Generate a random seed -pub fn generate_random_seed() -> u64 { +pub(crate) fn generate_random_seed() -> u64 { rand_v8::thread_rng().gen() } // Generate an Iroh node ID from a seed -pub fn generate_iroh_node_id_from_seed(seed: u64) -> Result> { +pub(crate) fn generate_iroh_node_id_from_seed(seed: u64) -> Result> { // Create a deterministic RNG from the seed let mut rng = StdRng::seed_from_u64(seed);