Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ iroh = "0.34.1"
rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] }
rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
[workspace.package]
version = "0.3.6"
version = "0.3.7"
edition = "2021"

[workspace.features]
Expand Down
53 changes: 40 additions & 13 deletions crates/orchestrator/src/discovery/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,23 +309,40 @@ impl DiscoveryMonitor {
if !discovery_node.is_active && existing_node.status == NodeStatus::Healthy {
// Node is active False but we have it in store and it is healthy
// This means that the node likely got kicked by e.g. the validator
// We simply remove it from the store now and will rediscover it later?
info!(
"Node {} is no longer active on chain, marking as ejected",
node_address
);
if !discovery_node.is_provider_whitelisted {
if let Err(e) = self
.update_node_status(&node_address, NodeStatus::Ejected)
// Add a grace period check to avoid immediately marking nodes that just became healthy
let should_mark_inactive =
if let Some(last_status_change) = existing_node.last_status_change {
let grace_period = chrono::Duration::minutes(5); // 5 minute grace period
Comment thread
JannikSt marked this conversation as resolved.
let now = chrono::Utc::now();
now.signed_duration_since(last_status_change) > grace_period
} else {
// If no last_status_change, assume it's been healthy for a while
true
};

if should_mark_inactive {
info!(
"Node {} is no longer active on chain, marking as ejected",
node_address
);
if !discovery_node.is_provider_whitelisted {
if let Err(e) = self
.update_node_status(&node_address, NodeStatus::Ejected)
.await
{
error!("Error updating node status: {}", e);
}
} else if let Err(e) = self
.update_node_status(&node_address, NodeStatus::Dead)
.await
{
error!("Error updating node status: {}", e);
}
} else if let Err(e) = self
.update_node_status(&node_address, NodeStatus::Dead)
.await
{
error!("Error updating node status: {}", e);
} else {
info!(
"Node {} is no longer active on chain but recently became healthy, waiting before marking inactive",
node_address
);
}
}

Expand Down Expand Up @@ -359,6 +376,16 @@ impl DiscoveryMonitor {
) {
if last_change < last_updated {
info!("Node {} is dead but has been updated on discovery, marking as discovered", node_address);

if existing_node.compute_specs != discovery_node.compute_specs {
info!(
"Node {} compute specs changed, marking as discovered",
node_address
);
let mut node = existing_node.clone();
node.compute_specs = discovery_node.compute_specs.clone();
let _ = self.store_context.node_store.add_node(node.clone()).await;
}
if let Err(e) = self
.update_node_status(&node_address, NodeStatus::Discovered)
.await
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ async fn main() -> Result<()> {

let status_update_store_context = store_context.clone();
let status_update_heartbeats = heartbeats.clone();
let status_update_metrics = metrics_context.clone();
tasks.spawn({
let contracts = contracts.clone();
async move {
Expand All @@ -393,6 +394,7 @@ async fn main() -> Result<()> {
args.disable_ejection,
status_update_heartbeats.clone(),
status_updater_plugins,
status_update_metrics,
);
status_updater.run().await
}
Expand Down
25 changes: 23 additions & 2 deletions crates/orchestrator/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use prometheus::{CounterVec, GaugeVec, Opts, Registry, TextEncoder};
use prometheus::{CounterVec, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry, TextEncoder};
pub mod sync_service;
pub mod webhook_sender;

Expand All @@ -14,6 +14,7 @@ pub struct MetricsContext {
pub heartbeat_requests_total: CounterVec,
pub nodes_per_task: GaugeVec,
pub task_state: GaugeVec,
pub status_update_execution_time: HistogramVec,
}

impl MetricsContext {
Expand Down Expand Up @@ -90,7 +91,6 @@ impl MetricsContext {
&["task_id", "task_name", "pool_id"],
)
.unwrap();

let task_state = GaugeVec::new(
Opts::new(
"orchestrator_task_state",
Expand All @@ -100,6 +100,19 @@ impl MetricsContext {
)
.unwrap();

let status_update_execution_time = HistogramVec::new(
HistogramOpts::new(
"orchestrator_status_update_execution_time_seconds",
"Duration of status update execution",
)
.buckets(vec![
0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 15.0, 30.0, 45.0, 60.0, 90.0,
120.0,
]),
&["node_address", "pool_id"],
)
.unwrap();

let registry = Registry::new();
let _ = registry.register(Box::new(compute_task_gauges.clone()));
let _ = registry.register(Box::new(task_info.clone()));
Expand All @@ -110,6 +123,7 @@ impl MetricsContext {
let _ = registry.register(Box::new(heartbeat_requests_total.clone()));
let _ = registry.register(Box::new(nodes_per_task.clone()));
let _ = registry.register(Box::new(task_state.clone()));
let _ = registry.register(Box::new(status_update_execution_time.clone()));

Self {
compute_task_gauges,
Expand All @@ -123,6 +137,7 @@ impl MetricsContext {
heartbeat_requests_total,
nodes_per_task,
task_state,
status_update_execution_time,
}
}

Expand Down Expand Up @@ -218,6 +233,12 @@ impl MetricsContext {
self.compute_task_gauges.reset();
}

pub fn record_status_update_execution_time(&self, node_address: &str, duration: f64) {
self.status_update_execution_time
.with_label_values(&[node_address, &self.pool_id])
.observe(duration);
}

/// Clear all orchestrator statistics metrics
pub fn clear_orchestrator_statistics(&self) {
self.nodes_total.reset();
Expand Down
22 changes: 21 additions & 1 deletion crates/orchestrator/src/status_update/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::metrics::MetricsContext;
use crate::models::node::{NodeStatus, OrchestratorNode};
use crate::plugins::StatusUpdatePlugin;
use crate::store::core::StoreContext;
Expand All @@ -7,7 +8,7 @@ use shared::web3::contracts::core::builder::Contracts;
use shared::web3::wallet::WalletProvider;
use std::result::Result;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::time::interval;

pub struct NodeStatusUpdater {
Expand All @@ -19,6 +20,7 @@ pub struct NodeStatusUpdater {
disable_ejection: bool,
heartbeats: Arc<LoopHeartbeats>,
plugins: Vec<Box<dyn StatusUpdatePlugin>>,
metrics: Arc<MetricsContext>,
}

impl NodeStatusUpdater {
Expand All @@ -32,6 +34,7 @@ impl NodeStatusUpdater {
disable_ejection: bool,
heartbeats: Arc<LoopHeartbeats>,
plugins: Vec<Box<dyn StatusUpdatePlugin>>,
metrics: Arc<MetricsContext>,
) -> Self {
Self {
store_context,
Expand All @@ -42,6 +45,7 @@ impl NodeStatusUpdater {
disable_ejection,
heartbeats,
plugins,
metrics,
}
}

Expand Down Expand Up @@ -133,6 +137,7 @@ impl NodeStatusUpdater {
pub async fn process_nodes(&self) -> Result<(), anyhow::Error> {
let nodes = self.store_context.node_store.get_nodes().await?;
for node in nodes {
let start_time = Instant::now();
let node = node.clone();
let old_status = node.status.clone();
let heartbeat = self
Expand Down Expand Up @@ -313,6 +318,12 @@ impl NodeStatusUpdater {
}
}
}
// Record status update execution time
let duration = start_time.elapsed();
self.metrics.record_status_update_execution_time(
&node.address.to_string(),
duration.as_secs_f64(),
);
}
Ok(())
}
Expand Down Expand Up @@ -346,6 +357,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
let node = OrchestratorNode {
address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
Expand Down Expand Up @@ -450,6 +462,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -502,6 +515,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -570,6 +584,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -648,6 +663,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -734,6 +750,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -817,6 +834,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -894,6 +912,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -962,6 +981,7 @@ mod tests {
false,
Arc::new(LoopHeartbeats::new(&mode)),
vec![],
app_state.metrics.clone(),
);
tokio::spawn(async move {
updater
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/store/domains/heartbeat_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl HeartbeatStore {
con.set_options::<_, _, ()>(
&key,
payload_string,
redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(60)),
redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(90)),
Comment thread
JannikSt marked this conversation as resolved.
)
.await
.map_err(|_| anyhow!("Failed to set options"))?;
Expand Down
3 changes: 2 additions & 1 deletion crates/shared/src/security/auth_signature_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const MAX_NONCE_LENGTH: usize = 64;
const MIN_NONCE_LENGTH: usize = 16;
const RATE_LIMIT_WINDOW_SECS: u64 = 60;
const MAX_REQUESTS_PER_WINDOW: usize = 100;
const REQUEST_EXPIRY_SECS: u64 = 300;

type SyncAddressValidator = Arc<dyn Fn(&Address) -> bool + Send + Sync>;
type AsyncAddressValidator = Arc<dyn Fn(&Address) -> LocalBoxFuture<'static, bool> + Send + Sync>;
Expand Down Expand Up @@ -445,7 +446,7 @@ where
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if current_time - timestamp > 10 {
if current_time - timestamp > REQUEST_EXPIRY_SECS {
return Err(ErrorBadRequest(json!({
"error": "Request expired",
"code": "REQUEST_EXPIRED",
Expand Down
Loading