Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ spec:
value: "{{ .Values.env.LOG_LEVEL }}"
- name: HOURLY_S3_UPLOAD_LIMIT
value: "{{ .Values.env.HOURLY_S3_UPLOAD_LIMIT }}"
{{- if .Values.env.WEBHOOK_URLS }}
- name: WEBHOOK_URLS
value: "{{ .Values.env.WEBHOOK_URLS }}"
{{- end }}
2 changes: 2 additions & 0 deletions orchestrator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ENV S3_CREDENTIALS=""
ENV BUCKET_NAME=""
ENV LOG_LEVEL=""
ENV HOURLY_S3_UPLOAD_LIMIT="2"
ENV WEBHOOK_URLS=""

RUN echo '#!/bin/sh\n\
exec /usr/local/bin/orchestrator \
Expand All @@ -40,6 +41,7 @@ $([ ! -z "$S3_CREDENTIALS" ] && echo "--s3-credentials $S3_CREDENTIALS") \
$([ ! -z "$BUCKET_NAME" ] && echo "--bucket-name $BUCKET_NAME") \
$([ ! -z "$LOG_LEVEL" ] && echo "--log-level $LOG_LEVEL") \
$([ ! -z "$HOURLY_S3_UPLOAD_LIMIT" ] && echo "--hourly-s3-upload-limit $HOURLY_S3_UPLOAD_LIMIT") \
$([ ! -z "$WEBHOOK_URLS" ] && echo "--webhook-urls $WEBHOOK_URLS") \
"$@"' > /entrypoint.sh && \
chmod +x /entrypoint.sh

Expand Down
12 changes: 12 additions & 0 deletions orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ struct Args {
/// Log level
#[arg(short = 'l', long, default_value = "info")]
log_level: String,

/// Webhook urls (comma-separated string)
#[arg(long, default_value = "")]
webhook_urls: Option<String>,
}

#[tokio::main]
Expand Down Expand Up @@ -174,6 +178,13 @@ async fn main() -> Result<()> {
let status_update_store_context = store_context.clone();
let status_update_heartbeats = heartbeats.clone();
let status_update_contracts = contracts.clone();
let webhook_urls = args
.webhook_urls
.clone()
.unwrap_or_default()
.split(',')
.map(|s| s.to_string())
.collect();
tasks.spawn(async move {
let status_updater = NodeStatusUpdater::new(
status_update_store_context.clone(),
Expand All @@ -183,6 +194,7 @@ async fn main() -> Result<()> {
compute_pool_id,
args.disable_ejection,
status_update_heartbeats.clone(),
webhook_urls,
);
status_updater.run().await
});
Expand Down
9 changes: 7 additions & 2 deletions orchestrator/src/models/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use shared::models::node::DiscoveryNode;
use shared::models::task::TaskState;
use std::fmt;
use std::fmt::{self, Display};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestratorNode {
Expand Down Expand Up @@ -57,7 +57,6 @@ impl fmt::Display for OrchestratorNode {
write!(f, "{}", serde_json::to_string(self).unwrap())
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum NodeStatus {
Discovered,
Expand All @@ -68,3 +67,9 @@ pub enum NodeStatus {
Ejected,
Banned,
}

impl Display for NodeStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
122 changes: 96 additions & 26 deletions orchestrator/src/node/status_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::store::core::StoreContext;
use crate::utils::loop_heartbeats::LoopHeartbeats;
use anyhow::Ok;
use log::{debug, error, info};
use reqwest::Client;
use serde_json::json;
use shared::web3::contracts::core::builder::Contracts;
use std::result::Result;
use std::sync::Arc;
Expand All @@ -17,9 +19,12 @@ pub struct NodeStatusUpdater {
pool_id: u32,
disable_ejection: bool,
heartbeats: Arc<LoopHeartbeats>,
webhooks: Vec<String>,
http_client: Client,
}

impl NodeStatusUpdater {
#[allow(clippy::too_many_arguments)]
pub fn new(
store_context: Arc<StoreContext>,
update_interval: u64,
Expand All @@ -28,6 +33,7 @@ impl NodeStatusUpdater {
pool_id: u32,
disable_ejection: bool,
heartbeats: Arc<LoopHeartbeats>,
webhooks: Vec<String>,
) -> Self {
Self {
store_context,
Expand All @@ -37,6 +43,8 @@ impl NodeStatusUpdater {
pool_id,
disable_ejection,
heartbeats,
webhooks,
http_client: Client::new(),
}
}

Expand Down Expand Up @@ -108,6 +116,55 @@ impl NodeStatusUpdater {
Ok(())
}

async fn trigger_webhooks(
&self,
node: &OrchestratorNode,
old_status: NodeStatus,
) -> Result<(), anyhow::Error> {
if old_status == node.status
|| node.status == NodeStatus::Unhealthy
|| node.status == NodeStatus::Discovered
{
return Ok(());
}

// If no webhooks configured, return early
if self.webhooks.is_empty() {
return Ok(());
}

let payload = json!({
"node_address": node.address.to_string(),
"ip_address": node.ip_address,
"port": node.port,
"old_status": old_status.to_string(),
"new_status": node.status.to_string(),
"timestamp": chrono::Utc::now().to_rfc3339(),
});

let webhooks = self.webhooks.clone();
let client = self.http_client.clone();
tokio::spawn(async move {
for webhook_url in webhooks {
if let Err(e) = client
.post(&webhook_url)
.json(&payload)
.timeout(Duration::from_secs(5)) // Add timeout to prevent hanging
.send()
.await
{
error!("Failed to send webhook to {}: {}", webhook_url, e);
} else {
debug!("Webhook to {} triggered successfully", webhook_url);
}
}
});

tokio::time::sleep(Duration::from_millis(50)).await;

Ok(())
}

pub async fn sync_chain_with_nodes(&self) -> Result<(), anyhow::Error> {
let nodes = self.store_context.node_store.get_nodes();
for node in nodes {
Expand All @@ -134,7 +191,8 @@ impl NodeStatusUpdater {
pub async fn process_nodes(&self) -> Result<(), anyhow::Error> {
let nodes = self.store_context.node_store.get_nodes();
for node in nodes {
let mut node = node.clone();
let node = node.clone();
let old_status = node.status.clone();
let heartbeat = self
.store_context
.heartbeat_store
Expand All @@ -145,6 +203,9 @@ impl NodeStatusUpdater {
.get_unhealthy_counter(&node.address);

let is_node_in_pool = self.is_node_in_pool(&node).await;
let mut status_changed = false;
let mut new_status = node.status.clone();

match heartbeat {
Some(beat) => {
// Update version if necessary
Expand All @@ -164,29 +225,23 @@ impl NodeStatusUpdater {
|| node.status == NodeStatus::WaitingForHeartbeat
{
if is_node_in_pool {
node.status = NodeStatus::Healthy;
new_status = NodeStatus::Healthy;
} else {
// Reset to discovered to init re-invite to pool
node.status = NodeStatus::Discovered;
new_status = NodeStatus::Discovered;
}
let _: () = self
.store_context
.node_store
.update_node_status(&node.address, node.status);
status_changed = true;
}
// If node is Discovered or Dead:
else if node.status == NodeStatus::Discovered
|| node.status == NodeStatus::Dead
{
if is_node_in_pool {
node.status = NodeStatus::Healthy;
new_status = NodeStatus::Healthy;
} else {
node.status = NodeStatus::Discovered;
new_status = NodeStatus::Discovered;
}
let _: () = self
.store_context
.node_store
.update_node_status(&node.address, node.status);
status_changed = true;
}

// Clear unhealthy counter on heartbeat receipt
Expand All @@ -203,15 +258,13 @@ impl NodeStatusUpdater {

match node.status {
NodeStatus::Healthy => {
self.store_context
.node_store
.update_node_status(&node.address, NodeStatus::Unhealthy);
new_status = NodeStatus::Unhealthy;
status_changed = true;
}
NodeStatus::Unhealthy => {
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
self.store_context
.node_store
.update_node_status(&node.address, NodeStatus::Dead);
new_status = NodeStatus::Dead;
status_changed = true;
}
}
NodeStatus::Discovered => {
Expand All @@ -220,24 +273,33 @@ impl NodeStatusUpdater {
// The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator?
// Node invites fail now since the node cannot be in pool again.
// We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected.
self.store_context
.node_store
.update_node_status(&node.address, NodeStatus::Unhealthy);
new_status = NodeStatus::Unhealthy;
status_changed = true;
}
}
NodeStatus::WaitingForHeartbeat => {
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
// Unhealthy counter is reset when node is invited
// usually it starts directly with heartbeat
self.store_context
.node_store
.update_node_status(&node.address, NodeStatus::Unhealthy);
new_status = NodeStatus::Unhealthy;
status_changed = true;
}
}
_ => (),
}
}
}

if status_changed {
let _: () = self
.store_context
.node_store
.update_node_status(&node.address, new_status);

if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) {
let _ = self.trigger_webhooks(&updated_node, old_status).await;
}
}
}
Ok(())
}
Expand Down Expand Up @@ -269,6 +331,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
let node = OrchestratorNode {
address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
Expand Down Expand Up @@ -342,6 +405,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -386,6 +450,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -440,6 +505,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -498,6 +564,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -564,6 +631,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -629,6 +697,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -681,7 +750,6 @@ mod tests {
version: None,
last_status_change: None,
};
println!("Node: {:?}", node);

let _: () = app_state.store_context.node_store.add_node(node.clone());
let updater = NodeStatusUpdater::new(
Expand All @@ -692,6 +760,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down Expand Up @@ -748,6 +817,7 @@ mod tests {
0,
false,
Arc::new(LoopHeartbeats::new()),
vec![],
);
tokio::spawn(async move {
updater
Expand Down