Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 21 additions & 38 deletions integration-tests/lib/prometheus_metrics_assertions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ pub(crate) fn parse_metric_value(metrics_text: &str, metric_name: &str) -> Optio
if line.starts_with('#') {
continue;
}
// For labeled metrics, match the prefix up to the closing brace
if let Some(rest) = line.strip_prefix(metric_name) {
// The value follows a space after the metric name (or after the closing brace)
let value_str = rest.trim();
// If there are labels and the name didn't include them, skip
if value_str.starts_with('{') {
let rest = rest.trim();
if rest.is_empty() {
continue;
}
return value_str.parse::<f64>().ok();
// Bare metric (no labels): value follows directly after the name
if rest.starts_with(|c: char| c.is_ascii_digit() || c == '-') {
return rest.parse::<f64>().ok();
}
// Labeled metric: skip past the closing brace to get the value
if rest.starts_with('{') {
if let Some(brace_end) = rest.find('}') {
let value_str = rest[brace_end + 1..].trim();
return value_str.parse::<f64>().ok();
}
}
}
}
None
Expand Down Expand Up @@ -135,15 +142,11 @@ pub fn assert_metric_present(metrics_text: &str, metric_name: &str) {
);
}

/// Poll the `/metrics` endpoint until any line matching `metric_name` (with any labels) has a
/// value >= `min`, then return the full metrics text. Panics if the condition is not met within
/// `timeout`.
/// Poll `/metrics` until `metric_name` is present with a value >= `min`, or panic after
/// `timeout`. Polls every 100ms to react quickly while tolerating cache refresh jitter.
///
/// Use this instead of a fixed `sleep` for `GaugeVec` metrics (per-channel shares, blocks found)
/// that only appear in Prometheus output after the monitoring snapshot cache has refreshed with
/// observed label combinations. The handler calls `.reset()` on every `/metrics` request before
/// repopulating from the cached snapshot, so a label combination is only present when the
/// snapshot contains a non-default value for it.
/// Returns the full metrics text from the successful scrape so callers can make additional
/// assertions without a second fetch.
pub async fn poll_until_metric_gte(
monitoring_addr: SocketAddr,
metric_name: &str,
Expand All @@ -153,38 +156,18 @@ pub async fn poll_until_metric_gte(
let deadline = tokio::time::Instant::now() + timeout;
loop {
let metrics = fetch_metrics(monitoring_addr).await;
let satisfied = metrics.lines().any(|line| {
if line.starts_with('#') {
return false;
}
if let Some(rest) = line.strip_prefix(metric_name) {
// Match bare name followed by space, or labeled name followed by '{'
let value_str = if rest.starts_with(' ') {
rest.trim()
} else if rest.starts_with('{') {
// Skip past the closing brace to get the value
rest.find('}')
.and_then(|i| rest.get(i + 1..))
.map(|s| s.trim())
.unwrap_or("")
} else {
return false;
};
value_str.parse::<f64>().map(|v| v >= min).unwrap_or(false)
} else {
false
if let Some(v) = parse_metric_value(&metrics, metric_name) {
if v >= min {
return metrics;
}
});
if satisfied {
return metrics;
}
if tokio::time::Instant::now() >= deadline {
panic!(
"Metric '{}' never reached >= {} within {:?}. Last /metrics response:\n{}",
metric_name, min, timeout, metrics
);
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}

Expand Down
27 changes: 14 additions & 13 deletions integration-tests/tests/monitoring_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use integration_tests_sv2::{
};
use stratum_apps::stratum_core::mining_sv2::*;

/// Timeout for polling metric assertions. Generous enough for slow CI.
const METRIC_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

// ---------------------------------------------------------------------------
// 1. Pool + SV2 Mining Device (standard channel) Pool role exposes: client metrics (connections,
// channels, shares, hashrate) Pool has NO upstream, so server metrics should be absent.
Expand Down Expand Up @@ -41,12 +44,12 @@ async fn pool_monitoring_with_sv2_mining_device() {
// Health API
assert_api_health(pool_mon).await;

// Poll until per-channel share metric is populated in the monitoring cache
// Poll until the monitoring cache has refreshed with the new share data
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand Down Expand Up @@ -86,11 +89,13 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() {
// -- Pool metrics --
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
assert_api_health(pool_mon).await;

// Poll until the monitoring cache has refreshed with the new share data
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand All @@ -101,13 +106,7 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() {
// -- tProxy metrics --
let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled");
assert_api_health(tproxy_mon).await;
let tproxy_metrics = poll_until_metric_gte(
tproxy_mon,
"sv2_server_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
)
.await;
let tproxy_metrics = fetch_metrics(tproxy_mon).await;
assert_uptime(&tproxy_metrics);
// tProxy has 1 upstream extended channel
assert_metric_eq(
Expand Down Expand Up @@ -166,11 +165,13 @@ async fn jd_aggregated_topology_monitoring() {
// -- Pool metrics: sees 1 SV2 client (JDC), shares accepted --
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
assert_api_health(pool_mon).await;

// Poll until the monitoring cache has refreshed with the new share data
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand Down Expand Up @@ -223,13 +224,13 @@ async fn block_found_detected_in_pool_metrics() {
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SUBMIT_SOLUTION)
.await;

// Poll until block found metric appears in monitoring cache
// Poll until the monitoring cache has refreshed with the block found data
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_blocks_found_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand Down
Loading
Loading