From 392e0566d3735b6b2ce102eace90e24e3d53c219 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Fri, 18 Apr 2025 20:22:14 +0200 Subject: [PATCH 1/4] add basic webhook support --- orchestrator/Dockerfile | 2 + orchestrator/src/main.rs | 12 +++ orchestrator/src/models/node.rs | 9 +- orchestrator/src/node/status_update.rs | 119 +++++++++++++++++++------ 4 files changed, 115 insertions(+), 27 deletions(-) diff --git a/orchestrator/Dockerfile b/orchestrator/Dockerfile index 87945232..bd006d52 100644 --- a/orchestrator/Dockerfile +++ b/orchestrator/Dockerfile @@ -21,6 +21,7 @@ ENV S3_CREDENTIALS="" ENV BUCKET_NAME="" ENV LOG_LEVEL="" ENV HOURLY_S3_UPLOAD_LIMIT="2" +ENV WEBHOOK_URLS="" RUN echo '#!/bin/sh\n\ exec /usr/local/bin/orchestrator \ @@ -40,6 +41,7 @@ $([ ! -z "$S3_CREDENTIALS" ] && echo "--s3-credentials $S3_CREDENTIALS") \ $([ ! -z "$BUCKET_NAME" ] && echo "--bucket-name $BUCKET_NAME") \ $([ ! -z "$LOG_LEVEL" ] && echo "--log-level $LOG_LEVEL") \ $([ ! -z "$HOURLY_S3_UPLOAD_LIMIT" ] && echo "--hourly-s3-upload-limit $HOURLY_S3_UPLOAD_LIMIT") \ +$([ ! -z "$WEBHOOK_URLS" ] && echo "--webhook-urls $WEBHOOK_URLS") \ "$@"' > /entrypoint.sh && \ chmod +x /entrypoint.sh diff --git a/orchestrator/src/main.rs b/orchestrator/src/main.rs index 2e13569e..5b6056c3 100644 --- a/orchestrator/src/main.rs +++ b/orchestrator/src/main.rs @@ -86,6 +86,10 @@ struct Args { /// Log level #[arg(short = 'l', long, default_value = "info")] log_level: String, + + /// Webhook urls (comma-separated string) + #[arg(long, default_value = "")] + webhook_urls: Option, } #[tokio::main] @@ -174,6 +178,13 @@ async fn main() -> Result<()> { let status_update_store_context = store_context.clone(); let status_update_heartbeats = heartbeats.clone(); let status_update_contracts = contracts.clone(); + let webhook_urls = args + .webhook_urls + .clone() + .unwrap_or_default() + .split(',') + .map(|s| s.to_string()) + .collect(); tasks.spawn(async move { let status_updater = NodeStatusUpdater::new( status_update_store_context.clone(), @@ -183,6 +194,7 @@ async fn main() -> Result<()> { compute_pool_id, args.disable_ejection, status_update_heartbeats.clone(), + webhook_urls, ); status_updater.run().await }); diff --git a/orchestrator/src/models/node.rs b/orchestrator/src/models/node.rs index b408a9c8..b2129424 100644 --- a/orchestrator/src/models/node.rs +++ b/orchestrator/src/models/node.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use shared::models::node::DiscoveryNode; use shared::models::task::TaskState; -use std::fmt; +use std::fmt::{self, Display}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OrchestratorNode { @@ -57,7 +57,6 @@ impl fmt::Display for OrchestratorNode { write!(f, "{}", serde_json::to_string(self).unwrap()) } } - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum NodeStatus { Discovered, @@ -68,3 +67,9 @@ pub enum NodeStatus { Ejected, Banned, } + +impl Display for NodeStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/orchestrator/src/node/status_update.rs b/orchestrator/src/node/status_update.rs index b0110b20..e1e6ba3b 100644 --- a/orchestrator/src/node/status_update.rs +++ b/orchestrator/src/node/status_update.rs @@ -3,6 +3,8 @@ use crate::store::core::StoreContext; use crate::utils::loop_heartbeats::LoopHeartbeats; use anyhow::Ok; use log::{debug, error, info}; +use reqwest::Client; +use serde_json::json; use shared::web3::contracts::core::builder::Contracts; use std::result::Result; use std::sync::Arc; @@ -17,9 +19,12 @@ pub struct NodeStatusUpdater { pool_id: u32, disable_ejection: bool, heartbeats: Arc, + webhooks: Vec, + http_client: Client, } impl NodeStatusUpdater { + #[allow(clippy::too_many_arguments)] pub fn new( store_context: Arc, update_interval: u64, @@ -28,7 +33,9 @@ impl NodeStatusUpdater { pool_id: u32, disable_ejection: bool, heartbeats: Arc, + webhooks: Vec, ) -> Self { + println!("webhooks: {:?}", webhooks); Self { store_context, update_interval, @@ -37,6 +44,8 @@ impl NodeStatusUpdater { pool_id, disable_ejection, heartbeats, + webhooks, + http_client: Client::new(), } } @@ -108,6 +117,52 @@ impl NodeStatusUpdater { Ok(()) } + async fn trigger_webhooks( + &self, + node: &OrchestratorNode, + old_status: NodeStatus, + ) -> Result<(), anyhow::Error> { + if old_status == node.status || node.status == NodeStatus::Unhealthy { + return Ok(()); + } + + // If no webhooks configured, return early + if self.webhooks.is_empty() { + return Ok(()); + } + + let payload = json!({ + "node_address": node.address.to_string(), + "ip_address": node.ip_address, + "port": node.port, + "old_status": old_status.to_string(), + "new_status": node.status.to_string(), + "timestamp": chrono::Utc::now().to_rfc3339(), + }); + + let webhooks = self.webhooks.clone(); + let client = self.http_client.clone(); + tokio::spawn(async move { + for webhook_url in webhooks { + if let Err(e) = client + .post(&webhook_url) + .json(&payload) + .timeout(Duration::from_secs(5)) // Add timeout to prevent hanging + .send() + .await + { + error!("Failed to send webhook to {}: {}", webhook_url, e); + } else { + debug!("Webhook to {} triggered successfully", webhook_url); + } + } + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + Ok(()) + } + pub async fn sync_chain_with_nodes(&self) -> Result<(), anyhow::Error> { let nodes = self.store_context.node_store.get_nodes(); for node in nodes { @@ -134,7 +189,8 @@ impl NodeStatusUpdater { pub async fn process_nodes(&self) -> Result<(), anyhow::Error> { let nodes = self.store_context.node_store.get_nodes(); for node in nodes { - let mut node = node.clone(); + let node = node.clone(); + let old_status = node.status.clone(); let heartbeat = self .store_context .heartbeat_store @@ -145,6 +201,9 @@ impl NodeStatusUpdater { .get_unhealthy_counter(&node.address); let is_node_in_pool = self.is_node_in_pool(&node).await; + let mut status_changed = false; + let mut new_status = node.status.clone(); + match heartbeat { Some(beat) => { // Update version if necessary @@ -164,29 +223,23 @@ impl NodeStatusUpdater { || node.status == NodeStatus::WaitingForHeartbeat { if is_node_in_pool { - node.status = NodeStatus::Healthy; + new_status = NodeStatus::Healthy; } else { // Reset to discovered to init re-invite to pool - node.status = NodeStatus::Discovered; + new_status = NodeStatus::Discovered; } - let _: () = self - .store_context - .node_store - .update_node_status(&node.address, node.status); + status_changed = true; } // If node is Discovered or Dead: else if node.status == NodeStatus::Discovered || node.status == NodeStatus::Dead { if is_node_in_pool { - node.status = NodeStatus::Healthy; + new_status = NodeStatus::Healthy; } else { - node.status = NodeStatus::Discovered; + new_status = NodeStatus::Discovered; } - let _: () = self - .store_context - .node_store - .update_node_status(&node.address, node.status); + status_changed = true; } // Clear unhealthy counter on heartbeat receipt @@ -203,15 +256,13 @@ impl NodeStatusUpdater { match node.status { NodeStatus::Healthy => { - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Unhealthy); + new_status = NodeStatus::Unhealthy; + status_changed = true; } NodeStatus::Unhealthy => { if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Dead); + new_status = NodeStatus::Dead; + status_changed = true; } } NodeStatus::Discovered => { @@ -220,24 +271,33 @@ impl NodeStatusUpdater { // The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator? // Node invites fail now since the node cannot be in pool again. // We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected. - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Unhealthy); + new_status = NodeStatus::Unhealthy; + status_changed = true; } } NodeStatus::WaitingForHeartbeat => { if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { // Unhealthy counter is reset when node is invited // usually it starts directly with heartbeat - self.store_context - .node_store - .update_node_status(&node.address, NodeStatus::Unhealthy); + new_status = NodeStatus::Unhealthy; + status_changed = true; } } _ => (), } } } + + if status_changed { + let _: () = self + .store_context + .node_store + .update_node_status(&node.address, new_status); + + if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) { + let _ = self.trigger_webhooks(&updated_node, old_status).await; + } + } } Ok(()) } @@ -269,6 +329,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); let node = OrchestratorNode { address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), @@ -342,6 +403,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -386,6 +448,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -440,6 +503,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -498,6 +562,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -564,6 +629,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -629,6 +695,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -692,6 +759,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater @@ -748,6 +816,7 @@ mod tests { 0, false, Arc::new(LoopHeartbeats::new()), + vec![], ); tokio::spawn(async move { updater From d4055a4cd3b282a59cc3355477dab69d398e6820 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Fri, 18 Apr 2025 20:24:29 +0200 Subject: [PATCH 2/4] update helm chart --- .../orchestrator-chart/templates/orchestrator-deployment.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml b/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml index 7cb85299..a2ada6f0 100644 --- a/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml +++ b/deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml @@ -73,3 +73,7 @@ spec: value: "{{ .Values.env.LOG_LEVEL }}" - name: HOURLY_S3_UPLOAD_LIMIT value: "{{ .Values.env.HOURLY_S3_UPLOAD_LIMIT }}" + {{- if .Values.env.WEBHOOK_URLS }} + - name: WEBHOOK_URLS + value: "{{ .Values.env.WEBHOOK_URLS }}" + {{- end }} From 56d79d06574a5198e01ad4b52a9037fe684cdaad Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Fri, 18 Apr 2025 20:26:01 +0200 Subject: [PATCH 3/4] remove print --- orchestrator/src/node/status_update.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/orchestrator/src/node/status_update.rs b/orchestrator/src/node/status_update.rs index e1e6ba3b..c05a8367 100644 --- a/orchestrator/src/node/status_update.rs +++ b/orchestrator/src/node/status_update.rs @@ -35,7 +35,6 @@ impl NodeStatusUpdater { heartbeats: Arc, webhooks: Vec, ) -> Self { - println!("webhooks: {:?}", webhooks); Self { store_context, update_interval, @@ -122,7 +121,7 @@ impl NodeStatusUpdater { node: &OrchestratorNode, old_status: NodeStatus, ) -> Result<(), anyhow::Error> { - if old_status == node.status || node.status == NodeStatus::Unhealthy { + if old_status == node.status || node.status == NodeStatus::Unhealthy || node.status == NodeStatus::Discovered{ return Ok(()); } @@ -748,7 +747,6 @@ mod tests { version: None, last_status_change: None, }; - println!("Node: {:?}", node); let _: () = app_state.store_context.node_store.add_node(node.clone()); let updater = NodeStatusUpdater::new( From c294b1f3e2e1b8b5e0d6541a7973352b58502311 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Fri, 18 Apr 2025 20:27:41 +0200 Subject: [PATCH 4/4] fmt --- orchestrator/src/node/status_update.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/orchestrator/src/node/status_update.rs b/orchestrator/src/node/status_update.rs index c05a8367..dc3ee636 100644 --- a/orchestrator/src/node/status_update.rs +++ b/orchestrator/src/node/status_update.rs @@ -121,7 +121,10 @@ impl NodeStatusUpdater { node: &OrchestratorNode, old_status: NodeStatus, ) -> Result<(), anyhow::Error> { - if old_status == node.status || node.status == NodeStatus::Unhealthy || node.status == NodeStatus::Discovered{ + if old_status == node.status + || node.status == NodeStatus::Unhealthy + || node.status == NodeStatus::Discovered + { return Ok(()); }