diff --git a/integration-tests/lib/prometheus_metrics_assertions.rs b/integration-tests/lib/prometheus_metrics_assertions.rs index bfeb5364d..0625760b5 100644 --- a/integration-tests/lib/prometheus_metrics_assertions.rs +++ b/integration-tests/lib/prometheus_metrics_assertions.rs @@ -38,15 +38,22 @@ pub(crate) fn parse_metric_value(metrics_text: &str, metric_name: &str) -> Optio if line.starts_with('#') { continue; } - // For labeled metrics, match the prefix up to the closing brace if let Some(rest) = line.strip_prefix(metric_name) { - // The value follows a space after the metric name (or after the closing brace) - let value_str = rest.trim(); - // If there are labels and the name didn't include them, skip - if value_str.starts_with('{') { + let rest = rest.trim(); + if rest.is_empty() { continue; } - return value_str.parse::().ok(); + // Bare metric (no labels): value follows directly after the name + if rest.starts_with(|c: char| c.is_ascii_digit() || c == '-') { + return rest.parse::().ok(); + } + // Labeled metric: skip past the closing brace to get the value + if rest.starts_with('{') { + if let Some(brace_end) = rest.find('}') { + let value_str = rest[brace_end + 1..].trim(); + return value_str.parse::().ok(); + } + } } } None @@ -135,15 +142,11 @@ pub fn assert_metric_present(metrics_text: &str, metric_name: &str) { ); } -/// Poll the `/metrics` endpoint until any line matching `metric_name` (with any labels) has a -/// value >= `min`, then return the full metrics text. Panics if the condition is not met within -/// `timeout`. +/// Poll `/metrics` until `metric_name` is present with a value >= `min`, or panic after +/// `timeout`. Polls every 100ms to react quickly while tolerating cache refresh jitter. /// -/// Use this instead of a fixed `sleep` for `GaugeVec` metrics (per-channel shares, blocks found) -/// that only appear in Prometheus output after the monitoring snapshot cache has refreshed with -/// observed label combinations. The handler calls `.reset()` on every `/metrics` request before -/// repopulating from the cached snapshot, so a label combination is only present when the -/// snapshot contains a non-default value for it. +/// Returns the full metrics text from the successful scrape so callers can make additional +/// assertions without a second fetch. pub async fn poll_until_metric_gte( monitoring_addr: SocketAddr, metric_name: &str, @@ -153,30 +156,10 @@ pub async fn poll_until_metric_gte( let deadline = tokio::time::Instant::now() + timeout; loop { let metrics = fetch_metrics(monitoring_addr).await; - let satisfied = metrics.lines().any(|line| { - if line.starts_with('#') { - return false; - } - if let Some(rest) = line.strip_prefix(metric_name) { - // Match bare name followed by space, or labeled name followed by '{' - let value_str = if rest.starts_with(' ') { - rest.trim() - } else if rest.starts_with('{') { - // Skip past the closing brace to get the value - rest.find('}') - .and_then(|i| rest.get(i + 1..)) - .map(|s| s.trim()) - .unwrap_or("") - } else { - return false; - }; - value_str.parse::().map(|v| v >= min).unwrap_or(false) - } else { - false + if let Some(v) = parse_metric_value(&metrics, metric_name) { + if v >= min { + return metrics; } - }); - if satisfied { - return metrics; } if tokio::time::Instant::now() >= deadline { panic!( @@ -184,7 +167,7 @@ pub async fn poll_until_metric_gte( metric_name, min, timeout, metrics ); } - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } diff --git a/integration-tests/tests/monitoring_integration.rs b/integration-tests/tests/monitoring_integration.rs index d362168ae..89973ae02 100644 --- a/integration-tests/tests/monitoring_integration.rs +++ b/integration-tests/tests/monitoring_integration.rs @@ -9,6 +9,9 @@ use integration_tests_sv2::{ }; use stratum_apps::stratum_core::mining_sv2::*; +/// Timeout for polling metric assertions. Generous enough for slow CI. +const METRIC_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + // --------------------------------------------------------------------------- // 1. Pool + SV2 Mining Device (standard channel) Pool role exposes: client metrics (connections, // channels, shares, hashrate) Pool has NO upstream, so server metrics should be absent. @@ -41,12 +44,12 @@ async fn pool_monitoring_with_sv2_mining_device() { // Health API assert_api_health(pool_mon).await; - // Poll until per-channel share metric is populated in the monitoring cache + // Poll until the monitoring cache has refreshed with the new share data let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_shares_accepted_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); @@ -86,11 +89,13 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() { // -- Pool metrics -- let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); assert_api_health(pool_mon).await; + + // Poll until the monitoring cache has refreshed with the new share data let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_shares_accepted_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); @@ -101,13 +106,7 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() { // -- tProxy metrics -- let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled"); assert_api_health(tproxy_mon).await; - let tproxy_metrics = poll_until_metric_gte( - tproxy_mon, - "sv2_server_shares_accepted_total", - 1.0, - std::time::Duration::from_secs(10), - ) - .await; + let tproxy_metrics = fetch_metrics(tproxy_mon).await; assert_uptime(&tproxy_metrics); // tProxy has 1 upstream extended channel assert_metric_eq( @@ -166,11 +165,13 @@ async fn jd_aggregated_topology_monitoring() { // -- Pool metrics: sees 1 SV2 client (JDC), shares accepted -- let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); assert_api_health(pool_mon).await; + + // Poll until the monitoring cache has refreshed with the new share data let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_shares_accepted_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); @@ -223,13 +224,13 @@ async fn block_found_detected_in_pool_metrics() { .wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SUBMIT_SOLUTION) .await; - // Poll until block found metric appears in monitoring cache + // Poll until the monitoring cache has refreshed with the block found data let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_blocks_found_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); diff --git a/stratum-apps/src/monitoring/http_server.rs b/stratum-apps/src/monitoring/http_server.rs index 2e65bc097..4d0935938 100644 --- a/stratum-apps/src/monitoring/http_server.rs +++ b/stratum-apps/src/monitoring/http_server.rs @@ -161,17 +161,17 @@ impl MonitoringServer { let has_server = server_monitoring.is_some(); let has_sv2_clients = sv2_clients_monitoring.is_some(); - // Create the snapshot cache - let cache = Arc::new(SnapshotCache::new( - refresh_interval, - server_monitoring, - sv2_clients_monitoring, - )); + let metrics = PrometheusMetrics::new(has_server, has_sv2_clients, false)?; - // Do initial refresh - cache.refresh(); + // Create the snapshot cache with metrics attached so refresh() + // updates Prometheus gauges atomically alongside the snapshot data. + let cache = Arc::new( + SnapshotCache::new(refresh_interval, server_monitoring, sv2_clients_monitoring) + .with_metrics(metrics.clone()), + ); - let metrics = PrometheusMetrics::new(has_server, has_sv2_clients, false)?; + // Do initial refresh (populates both snapshot and Prometheus gauges) + cache.refresh(); Ok(Self { bind_address, @@ -196,18 +196,21 @@ impl MonitoringServer { let has_server = snapshot.server_info.is_some(); let has_sv2_clients = snapshot.sv2_clients_summary.is_some(); - // Add Sv1 clients source to the cache + // Re-create metrics with SV1 enabled + let metrics = PrometheusMetrics::new(has_server, has_sv2_clients, true)?; + + // Add Sv1 clients source and attach new metrics to the cache let cache = Arc::new( Arc::try_unwrap(self.state.cache) .unwrap_or_else(|arc| (*arc).clone()) - .with_sv1_clients_source(sv1_monitoring), + .with_sv1_clients_source(sv1_monitoring) + .with_metrics(metrics.clone()), ); - // Refresh cache with new SV1 data + // Refresh cache with new SV1 data (also updates Prometheus gauges) cache.refresh(); - // Re-create metrics with SV1 enabled - self.state.metrics = PrometheusMetrics::new(has_server, has_sv2_clients, true)?; + self.state.metrics = metrics; self.state.cache = cache; Ok(self) @@ -733,10 +736,18 @@ async fn handle_sv1_client_by_id( } } -/// Handler for Prometheus metrics endpoint +/// Handler for Prometheus metrics endpoint. +/// +/// All GaugeVec metric values are updated atomically by the background cache refresh +/// task in `SnapshotCache::refresh()`. This handler only needs to: +/// 1. Set the uptime gauge (requires wall-clock time at scrape time) +/// 2. Gather and encode all registered metrics +/// +/// Because metric values are always kept in sync with the snapshot data, there is +/// never a gap where label series momentarily disappear. Tests can assert on metrics +/// directly after a cache refresh without polling for transient states. async fn handle_prometheus_metrics(State(state): State) -> Response { - let snapshot = state.cache.get_snapshot(); - + // Uptime is the only metric set at scrape time (needs current wall clock) let uptime_secs = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() @@ -744,163 +755,7 @@ async fn handle_prometheus_metrics(State(state): State) -> Response - state.start_time; state.metrics.sv2_uptime_seconds.set(uptime_secs as f64); - // Reset per-channel metrics before repopulating - if let Some(ref metric) = state.metrics.sv2_client_channel_hashrate { - metric.reset(); - } - if let Some(ref metric) = state.metrics.sv2_client_shares_accepted_total { - metric.reset(); - } - if let Some(ref metric) = state.metrics.sv2_server_channel_hashrate { - metric.reset(); - } - if let Some(ref metric) = state.metrics.sv2_server_shares_accepted_total { - metric.reset(); - } - - // Collect server metrics - if let Some(ref summary) = snapshot.server_summary { - if let Some(ref metric) = state.metrics.sv2_server_channels { - metric - .with_label_values(&["extended"]) - .set(summary.extended_channels as f64); - metric - .with_label_values(&["standard"]) - .set(summary.standard_channels as f64); - } - if let Some(ref metric) = state.metrics.sv2_server_hashrate_total { - metric.set(summary.total_hashrate as f64); - } - } - - if let Some(ref server) = snapshot.server_info { - for channel in &server.extended_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_server_shares_accepted_total { - metric - .with_label_values(&[&channel_id, user]) - .set(channel.shares_acknowledged as f64); - } - if let (Some(ref metric), Some(hashrate)) = ( - &state.metrics.sv2_server_channel_hashrate, - channel.nominal_hashrate, - ) { - metric - .with_label_values(&[&channel_id, user]) - .set(hashrate as f64); - } - } - - for channel in &server.standard_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_server_shares_accepted_total { - metric - .with_label_values(&[&channel_id, user]) - .set(channel.shares_accepted as f64); - } - if let (Some(ref metric), Some(hashrate)) = ( - &state.metrics.sv2_server_channel_hashrate, - channel.nominal_hashrate, - ) { - metric - .with_label_values(&[&channel_id, user]) - .set(hashrate as f64); - } - } - - if let Some(ref metric) = state.metrics.sv2_server_blocks_found_total { - let total: u64 = server - .extended_channels - .iter() - .map(|c| c.blocks_found as u64) - .chain( - server - .standard_channels - .iter() - .map(|c| c.blocks_found as u64), - ) - .sum(); - metric.set(total as f64); - } - } - - // Collect Sv2 clients metrics - if let Some(ref summary) = snapshot.sv2_clients_summary { - if let Some(ref metric) = state.metrics.sv2_clients_total { - metric.set(summary.total_clients as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_channels { - metric - .with_label_values(&["extended"]) - .set(summary.extended_channels as f64); - metric - .with_label_values(&["standard"]) - .set(summary.standard_channels as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_hashrate_total { - metric.set(summary.total_hashrate as f64); - } - - let mut client_blocks_total: u64 = 0; - - for client in snapshot.sv2_clients.as_deref().unwrap_or(&[]) { - let client_id = client.client_id.to_string(); - - for channel in &client.extended_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_client_shares_accepted_total { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.shares_accepted as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_channel_hashrate { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.nominal_hashrate as f64); - } - client_blocks_total += channel.blocks_found as u64; - } - - for channel in &client.standard_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_client_shares_accepted_total { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.shares_accepted as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_channel_hashrate { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.nominal_hashrate as f64); - } - client_blocks_total += channel.blocks_found as u64; - } - } - - if let Some(ref metric) = state.metrics.sv2_client_blocks_found_total { - metric.set(client_blocks_total as f64); - } - } - - // Collect SV1 client metrics - if let Some(ref summary) = snapshot.sv1_clients_summary { - if let Some(ref metric) = state.metrics.sv1_clients_total { - metric.set(summary.total_clients as f64); - } - if let Some(ref metric) = state.metrics.sv1_hashrate_total { - metric.set(summary.total_hashrate as f64); - } - } - - // Encode and return metrics + // Gather and encode — all other metrics were set by the last cache refresh let encoder = TextEncoder::new(); let metric_families = state.metrics.registry.gather(); let mut buffer = Vec::new(); @@ -931,6 +786,7 @@ mod tests { use super::*; use axum::body::Body; use http_body_util::BodyExt; + use std::sync::Mutex; use tower::ServiceExt; // ── helpers ────────────────────────────────────────────────────── @@ -1065,7 +921,16 @@ mod tests { clients: Option>, sv1: Option>, ) -> Router { - let cache = Arc::new(SnapshotCache::new(Duration::from_secs(60), server, clients)); + let has_server = server.is_some(); + let has_clients = clients.is_some(); + let has_sv1 = sv1.is_some(); + + let metrics = PrometheusMetrics::new(has_server, has_clients, has_sv1).unwrap(); + + let cache = Arc::new( + SnapshotCache::new(Duration::from_secs(60), server, clients) + .with_metrics(metrics.clone()), + ); let cache = if let Some(sv1_source) = sv1 { Arc::new( @@ -1079,12 +944,6 @@ mod tests { cache.refresh(); - let has_server = cache.get_snapshot().server_info.is_some(); - let has_clients = cache.get_snapshot().sv2_clients_summary.is_some(); - let has_sv1 = cache.get_snapshot().sv1_clients.is_some(); - - let metrics = PrometheusMetrics::new(has_server, has_clients, has_sv1).unwrap(); - let start_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() @@ -1549,4 +1408,90 @@ mod tests { assert!(!body.contains("sv2_server_channels")); assert!(!body.contains("sv2_clients_total")); } + + // Mutable mock that allows changing data between requests + struct MutableMockClients(Mutex>); + impl super::super::client::Sv2ClientsMonitoring for MutableMockClients { + fn get_sv2_clients(&self) -> Vec { + self.0.lock().unwrap().clone() + } + } + + /// Verify that stale channel labels are removed without a reset gap. + /// + /// Scenario: First scrape has client with channel 1 and channel 2. + /// Second scrape: channel 2 is gone. The test verifies that: + /// - Channel 1 metrics are still present (no gap) + /// - Channel 2 metrics are removed (stale cleanup) + #[tokio::test] + async fn metrics_stale_labels_removed_without_reset_gap() { + let initial_clients = vec![Sv2ClientInfo { + client_id: 1, + extended_channels: vec![ + create_extended_channel_info(1, 100.0), + create_extended_channel_info(2, 200.0), + ], + standard_channels: vec![], + }]; + + let mock_clients = Arc::new(MutableMockClients(Mutex::new(initial_clients))); + let metrics = PrometheusMetrics::new(false, true, false).unwrap(); + let cache = Arc::new( + SnapshotCache::new( + Duration::from_secs(60), + None, + Some(mock_clients.clone() + as Arc), + ) + .with_metrics(metrics.clone()), + ); + cache.refresh(); + + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let state = ServerState { + cache: cache.clone(), + start_time, + metrics, + }; + + let app = Router::new() + .route("/metrics", get(handle_prometheus_metrics)) + .with_state(state); + + // First scrape — both channels present + let response = app.clone().oneshot(make_request("/metrics")).await.unwrap(); + let body = get_body(response).await; + // Prometheus sorts label keys alphabetically: channel_id, client_id, user_identity + assert!( + body.contains("sv2_client_shares_accepted_total{channel_id=\"1\",client_id=\"1\""), + "Channel 1 should be present on first scrape" + ); + assert!( + body.contains("sv2_client_shares_accepted_total{channel_id=\"2\",client_id=\"1\""), + "Channel 2 should be present on first scrape" + ); + + // Remove channel 2 from mock data and refresh cache + { + let mut clients = mock_clients.0.lock().unwrap(); + clients[0].extended_channels.retain(|c| c.channel_id == 1); + } + cache.refresh(); + + // Second scrape — channel 2 should be removed, channel 1 still present + let response = app.clone().oneshot(make_request("/metrics")).await.unwrap(); + let body = get_body(response).await; + assert!( + body.contains("sv2_client_shares_accepted_total{channel_id=\"1\",client_id=\"1\""), + "Channel 1 should still be present after stale removal" + ); + assert!( + !body.contains("sv2_client_shares_accepted_total{channel_id=\"2\",client_id=\"1\""), + "Channel 2 should be removed as stale" + ); + } } diff --git a/stratum-apps/src/monitoring/snapshot_cache.rs b/stratum-apps/src/monitoring/snapshot_cache.rs index 0c9b4ad69..51225f1b0 100644 --- a/stratum-apps/src/monitoring/snapshot_cache.rs +++ b/stratum-apps/src/monitoring/snapshot_cache.rs @@ -37,16 +37,31 @@ //! ``` use std::{ - sync::{Arc, RwLock}, + collections::HashSet, + sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }; +use tracing::debug; + use super::{ client::{Sv2ClientInfo, Sv2ClientsMonitoring, Sv2ClientsSummary}, + prometheus_metrics::PrometheusMetrics, server::{ServerInfo, ServerMonitoring, ServerSummary}, sv1::{Sv1ClientInfo, Sv1ClientsMonitoring, Sv1ClientsSummary}, }; +/// Tracks which label combinations were set on the previous refresh so we can +/// remove only stale series instead of calling `.reset()` (which would create a +/// gap where all label series momentarily disappear). +#[derive(Default)] +struct PreviousLabelSets { + /// Labels for server per-channel GaugeVecs: [channel_id, user_identity] + server_channel_labels: HashSet<[String; 2]>, + /// Labels for client per-channel GaugeVecs: [client_id, channel_id, user_identity] + client_channel_labels: HashSet<[String; 3]>, +} + /// Cached snapshot of monitoring data. /// /// This struct holds a point-in-time copy of all monitoring data, @@ -63,24 +78,42 @@ pub struct MonitoringSnapshot { } /// A cache that holds monitoring snapshots and refreshes them periodically. +/// +/// When `PrometheusMetrics` are attached, the cache also updates Prometheus +/// gauges during each refresh, keeping metric values in lockstep with the +/// snapshot data. This means the `/metrics` handler never needs to compute +/// values — it only gathers and encodes. pub struct SnapshotCache { snapshot: RwLock, refresh_interval: Duration, server_source: Option>, sv2_clients_source: Option>, sv1_clients_source: Option>, + metrics: Option, + previous_labels: Mutex, } impl Clone for SnapshotCache { fn clone(&self) -> Self { - // Clone creates a new cache with the same sources and current snapshot + // Clone creates a new cache with the same sources and current snapshot. + // previous_labels is cloned so the new cache can correctly detect stale + // label combinations on its first refresh. let current_snapshot = self.snapshot.read().unwrap().clone(); + let prev = self + .previous_labels + .lock() + .unwrap_or_else(|e| e.into_inner()); Self { snapshot: RwLock::new(current_snapshot), refresh_interval: self.refresh_interval, server_source: self.server_source.clone(), sv2_clients_source: self.sv2_clients_source.clone(), sv1_clients_source: self.sv1_clients_source.clone(), + metrics: self.metrics.clone(), + previous_labels: Mutex::new(PreviousLabelSets { + server_channel_labels: prev.server_channel_labels.clone(), + client_channel_labels: prev.client_channel_labels.clone(), + }), } } } @@ -104,6 +137,8 @@ impl SnapshotCache { server_source, sv2_clients_source, sv1_clients_source: None, + metrics: None, + previous_labels: Mutex::new(PreviousLabelSets::default()), } } @@ -116,6 +151,15 @@ impl SnapshotCache { self } + /// Attach (or replace) Prometheus metrics so they are updated during each `refresh()`. + /// + /// This is called once in `MonitoringServer::new` and may be called again in + /// `with_sv1_monitoring` which re-creates the metrics with SV1 gauges enabled. + pub fn with_metrics(mut self, metrics: PrometheusMetrics) -> Self { + self.metrics = Some(metrics); + self + } + /// Get the current snapshot. /// /// This is a fast read that does NOT acquire any business logic locks. @@ -128,6 +172,10 @@ impl SnapshotCache { /// /// This method DOES acquire the business logic locks (via the trait methods), /// but it's only called periodically by a background task, not on every request. + /// + /// When Prometheus metrics are attached, they are updated atomically alongside + /// the snapshot — eliminating any gap where metrics could be missing or stale + /// relative to the snapshot data. pub fn refresh(&self) { let mut new_snapshot = MonitoringSnapshot { timestamp: Some(Instant::now()), @@ -152,10 +200,203 @@ impl SnapshotCache { new_snapshot.sv1_clients_summary = Some(source.get_sv1_clients_summary()); } + // Update Prometheus gauges from the new snapshot data + if let Some(ref metrics) = self.metrics { + self.update_metrics(metrics, &new_snapshot); + } + // Update the cache *self.snapshot.write().unwrap() = new_snapshot; } + /// Update all Prometheus gauges from the given snapshot, then remove stale + /// label combinations that are no longer present. + fn update_metrics(&self, metrics: &PrometheusMetrics, snapshot: &MonitoringSnapshot) { + let mut current_server_labels: HashSet<[String; 2]> = HashSet::new(); + let mut current_client_labels: HashSet<[String; 3]> = HashSet::new(); + + // Server metrics + if let Some(ref summary) = snapshot.server_summary { + if let Some(ref m) = metrics.sv2_server_channels { + m.with_label_values(&["extended"]) + .set(summary.extended_channels as f64); + m.with_label_values(&["standard"]) + .set(summary.standard_channels as f64); + } + if let Some(ref m) = metrics.sv2_server_hashrate_total { + m.set(summary.total_hashrate as f64); + } + } + + if let Some(ref server) = snapshot.server_info { + for channel in &server.extended_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_server_shares_accepted_total { + m.with_label_values(&[&channel_id, user]) + .set(channel.shares_acknowledged as f64); + } + if let (Some(ref m), Some(hashrate)) = ( + &metrics.sv2_server_channel_hashrate, + channel.nominal_hashrate, + ) { + m.with_label_values(&[&channel_id, user]) + .set(hashrate as f64); + } + current_server_labels.insert(labels); + } + + for channel in &server.standard_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_server_shares_accepted_total { + m.with_label_values(&[&channel_id, user]) + .set(channel.shares_accepted as f64); + } + if let (Some(ref m), Some(hashrate)) = ( + &metrics.sv2_server_channel_hashrate, + channel.nominal_hashrate, + ) { + m.with_label_values(&[&channel_id, user]) + .set(hashrate as f64); + } + current_server_labels.insert(labels); + } + + if let Some(ref m) = metrics.sv2_server_blocks_found_total { + let total: u64 = server + .extended_channels + .iter() + .map(|c| c.blocks_found as u64) + .chain( + server + .standard_channels + .iter() + .map(|c| c.blocks_found as u64), + ) + .sum(); + m.set(total as f64); + } + } + + // Sv2 clients metrics + if let Some(ref summary) = snapshot.sv2_clients_summary { + if let Some(ref m) = metrics.sv2_clients_total { + m.set(summary.total_clients as f64); + } + if let Some(ref m) = metrics.sv2_client_channels { + m.with_label_values(&["extended"]) + .set(summary.extended_channels as f64); + m.with_label_values(&["standard"]) + .set(summary.standard_channels as f64); + } + if let Some(ref m) = metrics.sv2_client_hashrate_total { + m.set(summary.total_hashrate as f64); + } + + let mut client_blocks_total: u64 = 0; + + for client in snapshot.sv2_clients.as_deref().unwrap_or(&[]) { + let client_id = client.client_id.to_string(); + + for channel in &client.extended_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [client_id.clone(), channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_client_shares_accepted_total { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.shares_accepted as f64); + } + if let Some(ref m) = metrics.sv2_client_channel_hashrate { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.nominal_hashrate as f64); + } + current_client_labels.insert(labels); + client_blocks_total += channel.blocks_found as u64; + } + + for channel in &client.standard_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [client_id.clone(), channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_client_shares_accepted_total { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.shares_accepted as f64); + } + if let Some(ref m) = metrics.sv2_client_channel_hashrate { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.nominal_hashrate as f64); + } + current_client_labels.insert(labels); + client_blocks_total += channel.blocks_found as u64; + } + } + + if let Some(ref m) = metrics.sv2_client_blocks_found_total { + m.set(client_blocks_total as f64); + } + } + + // SV1 client metrics + if let Some(ref summary) = snapshot.sv1_clients_summary { + if let Some(ref m) = metrics.sv1_clients_total { + m.set(summary.total_clients as f64); + } + if let Some(ref m) = metrics.sv1_hashrate_total { + m.set(summary.total_hashrate as f64); + } + } + + // Remove stale label combinations that are no longer in the snapshot + let mut prev = self + .previous_labels + .lock() + .unwrap_or_else(|e| e.into_inner()); + + for stale in prev + .server_channel_labels + .difference(¤t_server_labels) + { + let label_refs: Vec<&str> = stale.iter().map(|s| s.as_str()).collect(); + if let Some(ref m) = metrics.sv2_server_shares_accepted_total { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale server shares label"); + } + } + if let Some(ref m) = metrics.sv2_server_channel_hashrate { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale server hashrate label"); + } + } + } + + for stale in prev + .client_channel_labels + .difference(¤t_client_labels) + { + let label_refs: Vec<&str> = stale.iter().map(|s| s.as_str()).collect(); + if let Some(ref m) = metrics.sv2_client_shares_accepted_total { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale client shares label"); + } + } + if let Some(ref m) = metrics.sv2_client_channel_hashrate { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale client hashrate label"); + } + } + } + + prev.server_channel_labels = current_server_labels; + prev.client_channel_labels = current_client_labels; + } + /// Get the refresh interval pub fn refresh_interval(&self) -> Duration { self.refresh_interval