diff --git a/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml b/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml index 7cb85299..a2ada6f0 100644 --- a/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml +++ b/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml @@ -73,3 +73,7 @@ spec: value: "{{ .Values.env.LOG_LEVEL }}" - name: HOURLY_S3_UPLOAD_LIMIT value: "{{ .Values.env.HOURLY_S3_UPLOAD_LIMIT }}" + {{- if .Values.env.WEBHOOK_URLS }} + - name: WEBHOOK_URLS + value: "{{ .Values.env.WEBHOOK_URLS }}" + {{- end }} diff --git a/orchestrator/Dockerfile b/orchestrator/Dockerfile index 87945232..bd006d52 100644 --- a/orchestrator/Dockerfile +++ b/orchestrator/Dockerfile @@ -21,6 +21,7 @@ ENV S3_CREDENTIALS="" ENV BUCKET_NAME="" ENV LOG_LEVEL="" ENV HOURLY_S3_UPLOAD_LIMIT="2" +ENV WEBHOOK_URLS="" RUN echo '#!/bin/sh\n\ exec /usr/local/bin/orchestrator \ @@ -40,6 +41,7 @@ $([ ! -z "$S3_CREDENTIALS" ] && echo "--s3-credentials $S3_CREDENTIALS") \ $([ ! -z "$BUCKET_NAME" ] && echo "--bucket-name $BUCKET_NAME") \ $([ ! -z "$LOG_LEVEL" ] && echo "--log-level $LOG_LEVEL") \ $([ ! -z "$HOURLY_S3_UPLOAD_LIMIT" ] && echo "--hourly-s3-upload-limit $HOURLY_S3_UPLOAD_LIMIT") \ +$([ ! -z "$WEBHOOK_URLS" ] && echo "--webhook-urls $WEBHOOK_URLS") \ "$@"' > /entrypoint.sh && \ chmod +x /entrypoint.sh diff --git a/orchestrator/src/main.rs b/orchestrator/src/main.rs index 2e13569e..5b6056c3 100644 --- a/orchestrator/src/main.rs +++ b/orchestrator/src/main.rs @@ -86,6 +86,10 @@ struct Args { /// Log level #[arg(short = 'l', long, default_value = "info")] log_level: String, + + /// Webhook urls (comma-separated string) + #[arg(long, default_value = "")] + webhook_urls: Option, } #[tokio::main] @@ -174,6 +178,13 @@ async fn main() -> Result<()> { let status_update_store_context = store_context.clone(); let status_update_heartbeats = heartbeats.clone(); let status_update_contracts = contracts.clone(); + let webhook_urls = args + .webhook_urls + .clone() + .unwrap_or_default() + .split(',') + .map(|s| s.to_string()) + .collect(); tasks.spawn(async move { let status_updater = NodeStatusUpdater::new( status_update_store_context.clone(), @@ -183,6 +194,7 @@ async fn main() -> Result<()> { compute_pool_id, args.disable_ejection, status_update_heartbeats.clone(), + webhook_urls, ); status_updater.run().await }); diff --git a/orchestrator/src/models/node.rs b/orchestrator/src/models/node.rs index b408a9c8..b2129424 100644 --- a/orchestrator/src/models/node.rs +++ b/orchestrator/src/models/node.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use shared::models::node::DiscoveryNode; use shared::models::task::TaskState; -use std::fmt; +use std::fmt::{self, Display}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OrchestratorNode { @@ -57,7 +57,6 @@ impl fmt::Display for OrchestratorNode { write!(f, "{}", serde_json::to_string(self).unwrap()) } } - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum NodeStatus { Discovered, @@ -68,3 +67,9 @@ pub enum NodeStatus { Ejected, Banned, } + +impl Display for NodeStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/orchestrator/src/node/status_update.rs b/orchestrator/src/node/status_update.rs index b0110b20..dc3ee636 100644 --- a/orchestrator/src/node/status_update.rs +++ b/orchestrator/src/node/status_update.rs @@ -3,6 +3,8 @@ use crate::store::core::StoreContext; use crate::utils::loop_heartbeats::LoopHeartbeats; use anyhow::Ok; use log::{debug, error, info}; +use reqwest::Client; +use serde_json::json; use shared::web3::contracts::core::builder::Contracts; use std::result::Result; use std::sync::Arc; @@ -17,9 +19,12 @@ pub struct NodeStatusUpdater { pool_id: u32, disable_ejection: bool, heartbeats: Arc, + webhooks: Vec, + http_client: Client, } impl NodeStatusUpdater { + #[allow(clippy::too_many_arguments)] pub fn new( store_context: Arc, update_interval: u64, @@ -28,6 +33,7 @@ impl NodeStatusUpdater { pool_id: u32, disable_ejection: bool, heartbeats: Arc, + webhooks: Vec, ) -> Self { Self { store_context, @@ -37,6 +43,8 @@ impl NodeStatusUpdater { pool_id, disable_ejection, heartbeats, + webhooks, + http_client: Client::new(), } } @@ -108,6 +116,55 @@ impl NodeStatusUpdater { Ok(()) } + async fn trigger_webhooks( + &self, + node: &OrchestratorNode, + old_status: NodeStatus, + ) -> Result<(), anyhow::Error> { + if old_status == node.status + || node.status == NodeStatus::Unhealthy + || node.status == NodeStatus::Discovered + { + return Ok(()); + } + + // If no webhooks configured, return early + if self.webhooks.is_empty() { + return Ok(()); + } + + let payload = json!({ + "node_address": node.address.to_string(), + "ip_address": node.ip_address, + "port": node.port, + "old_status": old_status.to_string(), + "new_status": node.status.to_string(), + "timestamp": chrono::Utc::now().to_rfc3339(), + }); + + let webhooks = self.webhooks.clone(); + let client = self.http_client.clone(); + tokio::spawn(async move { + for webhook_url in webhooks { + if let Err(e) = client + .post(&webhook_url) + .json(&payload) + .timeout(Duration::from_secs(5)) // Add timeout to prevent hanging + .send() + .await + { + error!("Failed to send webhook to {}: {}", webhook_url, e); + } else { + debug!("Webhook to {} triggered successfully", webhook_url); + } + } + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + Ok(()) + } + pub async fn sync_chain_with_nodes(&self) -> Result<(), anyhow::Error> { let nodes = self.store_context.node_store.get_nodes(); for node in nodes { @@ -134,7 +191,8 @@ impl NodeStatusUpdater { pub async fn process_nodes(&self) -> Result<(), anyhow::Error> { let nodes = self.store_context.node_store.get_nodes(); for node in nodes { - let mut node = node.clone(); + let node = node.clone(); + let old_status = node.status.clone(); let heartbeat = self .store_context .heartbeat_store @@ -145,6 +203,9 @@ impl NodeStatusUpdater { .get_unhealthy_counter(&node.address); let is_node_in_pool = self.is_node_in_pool(&node).await; + let mut status_changed = false; + let mut new_status = node.status.clone(); + match heartbeat { Some(beat) => { // Update version if necessary @@ -164,29 +225,23 @@ impl NodeStatusUpdater { || node.status == NodeStatus::WaitingForHeartbeat { if is_node_in_pool { - node.status = NodeStatus::Healthy; + new_status = NodeStatus::Healthy; } else { // Reset to discovered to init re-invite to pool - node.status = NodeStatus::Discovered; + new_status = NodeStatus::Discovered; } - let _: () = self - .store_context - .node_store - .update_node_status(&node.address, node.status); + status_changed = true; } // If node is Discovered or Dead: else if node.status == NodeStatus::Discovered || node.status == NodeStatus::Dead { if is_node_in_pool { - node.status = NodeStatus::Healthy; + new_status = NodeStatus::Healthy; } else { - node.status = NodeStatus::Discovered; + new_status = NodeStatus::Discovered; } - let _: () = self - .store_context - .node_store - .update_node_status(&node.address, node.status); + status_changed = true; } // Clear unhealthy counter on heartbeat receipt @@ -203,15 +258,13 @@ impl NodeStatusUpdater { match node.status { NodeStatus::Healthy => { - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Unhealthy); + new_status = NodeStatus::Unhealthy; + status_changed = true; } NodeStatus::Unhealthy => { if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Dead); + new_status = NodeStatus::Dead; + status_changed = true; } } NodeStatus::Discovered => { @@ -220,24 +273,33 @@ impl NodeStatusUpdater { // The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator? // Node invites fail now since the node cannot be in pool again. // We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected. - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Unhealthy); + new_status = NodeStatus::Unhealthy; + status_changed = true; } } NodeStatus::WaitingForHeartbeat => { if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { // Unhealthy counter is reset when node is invited // usually it starts directly with heartbeat - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Unhealthy); + new_status = NodeStatus::Unhealthy; + status_changed = true; } } _ => (), } } } + + if status_changed { + let _: () = self + .store_context + .node_store + .update_node_status(&node.address, new_status); + + if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) { + let _ = self.trigger_webhooks(&updated_node, old_status).await; + } + } } Ok(()) } @@ -269,6 +331,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); let node = OrchestratorNode { address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), @@ -342,6 +405,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -386,6 +450,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -440,6 +505,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -498,6 +564,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -564,6 +631,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -629,6 +697,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -681,7 +750,6 @@ mod tests { version: None, last_status_change: None, }; - println!("Node: {:?}", node); let _: () = app_state.store_context.node_store.add_node(node.clone()); let updater = NodeStatusUpdater::new( @@ -692,6 +760,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -748,6 +817,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater