Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ iroh = { workspace = true }
rand_v8 = { workspace = true }
rand_core_v6 = { workspace = true }
dashmap = "6.1.0"
homedir = "0.3"
16 changes: 13 additions & 3 deletions crates/worker/src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,21 @@ pub async fn execute_command(
.expect("Hardware check should have populated compute_specs")
.storage_path
.clone();
let task_bridge = TaskBridge::new(
let task_bridge = match TaskBridge::new(
None,
metrics_store,
Some(bridge_contracts),
Some(node_config.clone()),
Some(bridge_wallet),
docker_storage_path.clone(),
state.clone(),
);
) {
Ok(bridge) => bridge,
Err(e) => {
error!("❌ Failed to create Task Bridge: {e}");
std::process::exit(1);
}
};

let system_memory = node_config
.compute_specs
Expand All @@ -445,7 +451,11 @@ pub async fn execute_command(
cancellation_token.clone(),
gpu,
system_memory,
task_bridge.socket_path.clone(),
task_bridge
.socket_path
.to_str()
.expect("path is valid utf-8 string")
.to_string(),
docker_storage_path,
node_wallet_instance
.wallet
Expand Down
37 changes: 19 additions & 18 deletions crates/worker/src/docker/taskbridge/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use std::{fs, path::Path};
use tokio::io::AsyncReadExt;
use tokio::{io::BufReader, net::UnixListener};

pub const SOCKET_NAME: &str = "metrics.sock";
const DEFAULT_MACOS_SOCKET: &str = "/tmp/com.prime.worker/";
const DEFAULT_LINUX_SOCKET: &str = "/tmp/com.prime.worker/";
const DEFAULT_SOCKET_FILE: &str = "prime-worker/com.prime.worker/metrics.sock";

pub struct TaskBridge {
pub socket_path: String,
pub socket_path: std::path::PathBuf,
pub metrics_store: Arc<MetricsStore>,
pub contracts: Option<Contracts<WalletProvider>>,
pub node_config: Option<Node>,
Expand All @@ -47,27 +45,25 @@ impl TaskBridge {
node_wallet: Option<Wallet>,
docker_storage_path: String,
state: Arc<SystemState>,
) -> Arc<Self> {
) -> Result<Arc<Self>> {
let path = match socket_path {
Some(path) => path.to_string(),
Some(path) => std::path::PathBuf::from(path),
None => {
if cfg!(target_os = "macos") {
format!("{DEFAULT_MACOS_SOCKET}{SOCKET_NAME}")
} else {
format!("{DEFAULT_LINUX_SOCKET}{SOCKET_NAME}")
}
let path =
homedir::my_home()?.ok_or(anyhow::anyhow!("failed to get home directory"))?;
path.join(DEFAULT_SOCKET_FILE)
}
};

Arc::new(Self {
Ok(Arc::new(Self {
socket_path: path,
metrics_store,
contracts,
node_config,
node_wallet,
docker_storage_path,
state,
})
}))
}

async fn handle_metric(self: Arc<Self>, input: &MetricInput) -> Result<()> {
Expand Down Expand Up @@ -357,7 +353,8 @@ mod tests {
None,
"test_storage_path".to_string(),
state,
);
)
.unwrap();

// Run the bridge in background
let bridge_handle = tokio::spawn(async move { bridge.run().await });
Expand Down Expand Up @@ -388,7 +385,8 @@ mod tests {
None,
"test_storage_path".to_string(),
state,
);
)
.unwrap();

// Run bridge in background
let bridge_handle = tokio::spawn(async move { bridge.run().await });
Expand Down Expand Up @@ -421,7 +419,8 @@ mod tests {
None,
"test_storage_path".to_string(),
state,
);
)
.unwrap();

let bridge_handle = tokio::spawn(async move { bridge.run().await });

Expand Down Expand Up @@ -468,7 +467,8 @@ mod tests {
None,
"test_storage_path".to_string(),
state,
);
)
.unwrap();

let bridge_handle = tokio::spawn(async move { bridge.run().await });

Expand Down Expand Up @@ -515,7 +515,8 @@ mod tests {
None,
"test_storage_path".to_string(),
state,
);
)
.unwrap();

let bridge_handle = tokio::spawn(async move { bridge.run().await });

Expand Down
6 changes: 4 additions & 2 deletions examples/python/taskbridge_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import os
import threading
import platform
from pathlib import Path

def get_default_socket_path():
"""Returns the default socket path based on the operating system."""
return "/tmp/com.prime.worker/metrics.sock" if platform.system() == "Darwin" else "/var/run/com.prime.worker/metrics.sock"
home = Path.home()
return str(home) + "/prime-worker/com.prime.worker/metrics.sock"

def send_message(metrics, task_id=None):
"""Sends a message to the socket."""
Expand Down Expand Up @@ -88,4 +90,4 @@ def send_file_info():
for thread in threads:
thread.join()

print("All messages sent!")
print("All messages sent!")