Skip to content

Commit e152487

Browse files
Eric Pricegimballock
authored andcommitted
feat(monitoring): add byte metrics with aggregate Prometheus gauges
Add bytes_received/bytes_sent tracking across all three components (pool, jd-client, translator) for SV2 channels and SV1 clients. SV2 byte tracking: - Per-channel (bytes_received, bytes_sent) counters in pool, jd-client, and translator, keyed by channel_id in bytes_by_channel maps - Shared mining_message_channel_id() helper in protocol_message_type.rs to extract channel_id from parsed Mining messages - Raw frame payload guard (len >= 4) for channel_id extraction from unparsed upstream frames in jd-client and translator - Cleanup in all CloseChannel handlers and ChannelManagerData::reset() - JDC record_upstream_sent_bytes() eliminates double-lock on sends SV1 byte tracking (translator only): - Arc<AtomicU64> counters in ConnectionSV1, shared with DownstreamData - Reader counts line.len()+1 (newline stripped by LinesCodec) - Writer counts serialized JSON bytes before write_all Prometheus metrics (fixed cardinality): - sv2_server_bytes_{received,sent}_total — aggregate scalar Gauges - sv2_client_bytes_{received,sent}_total — aggregate scalar Gauges - sv1_client_bytes_{received,sent}_total — per-client GaugeVec (1:1 with TCP connections, appropriate granularity) Per-channel byte detail remains available via the JSON REST API (/api/v1/server/channels, /api/v1/clients/{id}/channels, /api/v1/sv1/clients) for drill-down without inflating Prometheus time series cardinality. Aggregate byte totals added to ServerSummary and Sv2ClientsSummary so /api/v1/global includes byte totals alongside hashrate and channel counts.
1 parent 72cc650 commit e152487

26 files changed

Lines changed: 531 additions & 39 deletions

File tree

miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use stratum_apps::{
3030
use tracing::{debug, error, info, warn};
3131

3232
use crate::{
33-
channel_manager::{ChannelManager, ChannelManagerChannel, FULL_EXTRANONCE_SIZE},
33+
channel_manager::{ChannelManager, FULL_EXTRANONCE_SIZE},
3434
error::{self, JDCError, JDCErrorKind},
3535
jd_mode::{get_jd_mode, JdMode},
3636
utils::create_close_channel_msg,
@@ -97,10 +97,8 @@ impl RouteMessageTo<'_> {
9797
/// - [`RouteMessageTo::JobDeclarator`] → Sends the job declaration message to the JDS.
9898
/// - [`RouteMessageTo::TemplateProvider`] → Sends the template distribution message to the
9999
/// template provider.
100-
pub async fn forward(
101-
self,
102-
channel_manager_channel: &ChannelManagerChannel,
103-
) -> Result<(), JDCErrorKind> {
100+
pub async fn forward(self, channel_manager: &ChannelManager) -> Result<(), JDCErrorKind> {
101+
let channel_manager_channel = &channel_manager.channel_manager_channel;
104102
match self {
105103
RouteMessageTo::Downstream((downstream_id, message)) => {
106104
_ = channel_manager_channel.downstream_sender.send((
@@ -113,10 +111,12 @@ impl RouteMessageTo<'_> {
113111
if get_jd_mode() != JdMode::SoloMining {
114112
let message_static = message.into_static();
115113
let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?;
114+
let sent_bytes = sv2_frame.encoded_length() as u64;
116115
_ = channel_manager_channel
117116
.upstream_sender
118117
.send(sv2_frame)
119118
.await;
119+
channel_manager.record_upstream_sent_bytes(sent_bytes);
120120
}
121121
}
122122
RouteMessageTo::JobDeclarator(message) => {
@@ -201,6 +201,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
201201
downstream.downstream_data.super_safe_lock(|data| {
202202
data.extended_channels.remove(&msg.channel_id);
203203
data.standard_channels.remove(&msg.channel_id);
204+
data.bytes_by_channel.remove(&msg.channel_id);
204205
});
205206
channel_manager_data
206207
.vardiff
@@ -453,7 +454,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
453454
})?;
454455

455456
for messages in messages {
456-
let _ = messages.forward(&self.channel_manager_channel).await;
457+
let _ = messages.forward(self).await;
457458
}
458459
Ok(())
459460
}
@@ -716,7 +717,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
716717
})?;
717718

718719
for messages in messages {
719-
let _ = messages.forward(&self.channel_manager_channel).await;
720+
let _ = messages.forward(self).await;
720721
}
721722

722723
Ok(())
@@ -911,7 +912,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
911912
});
912913

913914
for messages in messages {
914-
let _ = messages.forward(&self.channel_manager_channel).await;
915+
let _ = messages.forward(self).await;
915916
}
916917

917918
Ok(())
@@ -1111,7 +1112,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
11111112
})?;
11121113

11131114
for messages in messages {
1114-
let _ = messages.forward(&self.channel_manager_channel).await;
1115+
let _ = messages.forward(self).await;
11151116
}
11161117

11171118
Ok(())
@@ -1334,7 +1335,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
13341335
})?;
13351336

13361337
for messages in messages {
1337-
_ = messages.forward(&self.channel_manager_channel).await;
1338+
_ = messages.forward(self).await;
13381339
}
13391340

13401341
Ok(())

miner-apps/jd-client/src/lib/channel_manager/extensions_message_handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
144144
AnyMessage::Extensions(new_require_extensions.into_static().into())
145145
.try_into()
146146
.map_err(JDCError::shutdown)?;
147+
let sent_bytes = sv2_frame.encoded_length() as u64;
147148

148149
self.channel_manager_channel
149150
.upstream_sender
@@ -153,6 +154,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
153154
error!("Failed to send message to upstream: {:?}", e);
154155
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
155156
})?;
157+
self.record_upstream_sent_bytes(sent_bytes);
156158
}
157159

158160
Ok(())

miner-apps/jd-client/src/lib/channel_manager/jd_message_handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,13 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
222222
let sv2_frame: Sv2Frame = AnyMessage::Mining(message)
223223
.try_into()
224224
.map_err(JDCError::shutdown)?;
225+
let sent_bytes = sv2_frame.encoded_length() as u64;
225226
self.channel_manager_channel
226227
.upstream_sender
227228
.send(sv2_frame)
228229
.await
229230
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
231+
self.record_upstream_sent(channel_id, sent_bytes);
230232

231233
info!("Successfully sent SetCustomMiningJob to the upstream with channel_id: {channel_id}");
232234
Ok(())

miner-apps/jd-client/src/lib/channel_manager/mod.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ pub struct ChannelManagerData {
165165
supported_extensions: Vec<u16>,
166166
/// Extensions that the JDC requires
167167
required_extensions: Vec<u16>,
168+
/// Per-channel byte counters for the upstream server channel: (bytes_received, bytes_sent)
169+
pub bytes_by_channel: HashMap<ChannelId, (u64, u64)>,
168170
}
169171

170172
impl ChannelManagerData {
@@ -180,6 +182,7 @@ impl ChannelManagerData {
180182
self.template_id_to_upstream_job_id.clear();
181183
self.downstream_channel_id_and_job_id_to_template_id.clear();
182184
self.pending_downstream_requests.clear();
185+
self.bytes_by_channel.clear();
183186

184187
self.downstream_id_factory = AtomicUsize::new(0);
185188
self.request_id_factory = AtomicU32::new(0);
@@ -321,6 +324,7 @@ impl ChannelManager {
321324
negotiated_extensions: vec![],
322325
supported_extensions,
323326
required_extensions,
327+
bytes_by_channel: HashMap::new(),
324328
}));
325329

326330
let channel_manager_channel = ChannelManagerChannel {
@@ -692,6 +696,28 @@ impl ChannelManager {
692696
Ok(())
693697
}
694698

699+
/// Records bytes sent upstream for the given channel_id.
700+
pub fn record_upstream_sent(&self, channel_id: ChannelId, bytes: u64) {
701+
self.channel_manager_data.super_safe_lock(|data| {
702+
let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0));
703+
entry.1 += bytes;
704+
});
705+
}
706+
707+
/// Records bytes sent upstream, resolving the channel_id from current upstream channel.
708+
/// Single lock acquisition instead of separate upstream_channel_id() + record_upstream_sent().
709+
pub fn record_upstream_sent_bytes(&self, bytes: u64) {
710+
self.channel_manager_data.super_safe_lock(|data| {
711+
let ch_id = data
712+
.upstream_channel
713+
.as_ref()
714+
.map(|ch| ch.get_channel_id())
715+
.unwrap_or(0);
716+
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
717+
entry.1 += bytes;
718+
});
719+
}
720+
695721
/// Handles messages received from the Upstream subsystem.
696722
///
697723
/// This method listens for incoming frames on the `upstream_receiver` channel.
@@ -700,10 +726,24 @@ impl ChannelManager {
700726
/// - If the frame contains any unsupported message type, an error is returned.
701727
async fn handle_pool_message_frame(&mut self) -> JDCResult<(), error::ChannelManager> {
702728
if let Ok(mut sv2_frame) = self.channel_manager_channel.upstream_receiver.recv().await {
729+
let frame_bytes = sv2_frame.encoded_length() as u64;
703730
let header = sv2_frame.get_header().ok_or_else(|| {
704731
error!("SV2 frame missing header");
705732
JDCError::fallback(framing_sv2::Error::MissingHeader)
706733
})?;
734+
// Count received bytes keyed by channel_id
735+
if header.channel_msg() {
736+
let payload = sv2_frame.payload();
737+
if payload.len() >= 4 {
738+
let channel_id = u32::from_le_bytes(
739+
payload[..4].try_into().expect("slice is exactly 4 bytes"),
740+
);
741+
self.channel_manager_data.super_safe_lock(|data| {
742+
let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0));
743+
entry.0 += frame_bytes;
744+
});
745+
}
746+
}
707747
let message_type = header.msg_type();
708748
let extension_type = header.ext_type();
709749
let payload = sv2_frame.payload();
@@ -812,13 +852,15 @@ impl ChannelManager {
812852
let sv2_frame: Sv2Frame = AnyMessage::Mining(upstream_message)
813853
.try_into()
814854
.map_err(JDCError::shutdown)?;
855+
let sent_bytes = sv2_frame.encoded_length() as u64;
815856
self.channel_manager_channel
816857
.upstream_sender
817858
.send(sv2_frame)
818859
.await
819860
.map_err(|_| {
820861
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
821862
})?;
863+
self.record_upstream_sent_bytes(sent_bytes);
822864
}
823865
}
824866
UpstreamState::Pending => {
@@ -873,13 +915,15 @@ impl ChannelManager {
873915
let sv2_frame: Sv2Frame = AnyMessage::Mining(message)
874916
.try_into()
875917
.map_err(JDCError::shutdown)?;
918+
let sent_bytes = sv2_frame.encoded_length() as u64;
876919
self.channel_manager_channel
877920
.upstream_sender
878921
.send(sv2_frame)
879922
.await
880923
.map_err(|_| {
881924
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
882925
})?;
926+
self.record_upstream_sent_bytes(sent_bytes);
883927
}
884928
}
885929
UpstreamState::Pending => {
@@ -1175,7 +1219,7 @@ impl ChannelManager {
11751219
});
11761220

11771221
for message in messages {
1178-
let _ = message.forward(&self.channel_manager_channel).await;
1222+
let _ = message.forward(self).await;
11791223
}
11801224

11811225
info!("Vardiff update cycle complete");

miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
223223
}
224224

225225
for message in messages {
226-
let _ = message.forward(&self.channel_manager_channel).await;
226+
let _ = message.forward(self).await;
227227
}
228228

229229
Ok(())
@@ -586,7 +586,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
586586
}
587587

588588
for message in messages {
589-
let _ = message.forward(&self.channel_manager_channel).await;
589+
let _ = message.forward(self).await;
590590
}
591591

592592
Ok(())

miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
255255
let sv2_frame: Sv2Frame = AnyMessage::Mining(set_custom_job)
256256
.try_into()
257257
.map_err(JDCError::shutdown)?;
258+
let sent_bytes = sv2_frame.encoded_length() as u64;
258259
self.channel_manager_channel
259260
.upstream_sender
260261
.send(sv2_frame)
261262
.await
262263
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
264+
self.record_upstream_sent_bytes(sent_bytes);
263265
_ = self.allocate_tokens(1).await;
264266
}
265267
}
@@ -284,11 +286,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
284286
let sv2_frame: Sv2Frame = AnyMessage::Mining(close_channel)
285287
.try_into()
286288
.map_err(JDCError::shutdown)?;
289+
let sent_bytes = sv2_frame.encoded_length() as u64;
287290
self.channel_manager_channel
288291
.upstream_sender
289292
.send(sv2_frame)
290293
.await
291294
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
295+
self.record_upstream_sent_bytes(sent_bytes);
292296
_ = self.allocate_tokens(1).await;
293297
}
294298

@@ -336,6 +340,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
336340
info!("Received: {}", msg);
337341

338342
self.channel_manager_data.super_safe_lock(|data| {
343+
data.bytes_by_channel.remove(&msg.channel_id);
339344
data.upstream_channel = None;
340345
});
341346
Err(JDCError::fallback(JDCErrorKind::CloseChannel))
@@ -476,7 +481,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
476481
})?;
477482

478483
for message in messages_results.into_iter().flatten() {
479-
let _ = message.forward(&self.channel_manager_channel).await;
484+
let _ = message.forward(self).await;
480485
}
481486
Ok(())
482487
}

miner-apps/jd-client/src/lib/downstream/mod.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ use stratum_apps::{
2020
parsers_sv2::{parse_message_frame_with_tlvs, AnyMessage, Mining, Tlv},
2121
},
2222
task_manager::TaskManager,
23-
utils::types::{DownstreamId, Message, Sv2Frame},
23+
utils::{
24+
protocol_message_type::mining_message_channel_id,
25+
types::{ChannelId, DownstreamId, Message, Sv2Frame},
26+
},
2427
};
2528

2629
use bitcoin_core_sv2::CancellationToken;
@@ -33,8 +36,6 @@ use crate::{
3336
status::{handle_error, Status, StatusSender},
3437
};
3538

36-
use stratum_apps::utils::types::ChannelId;
37-
3839
mod common_message_handler;
3940
mod extensions_message_handler;
4041

@@ -45,6 +46,7 @@ mod extensions_message_handler;
4546
/// - An optional [`GroupChannel`] if group channeling is used.
4647
/// - Active [`ExtendedChannel`]s keyed by channel ID.
4748
/// - Active [`StandardChannel`]s keyed by channel ID.
49+
/// - Per-channel byte counters (bytes_received, bytes_sent)
4850
pub struct DownstreamData {
4951
pub require_std_job: bool,
5052
pub group_channel: GroupChannel<'static, DefaultJobStore<ExtendedJob<'static>>>,
@@ -59,6 +61,8 @@ pub struct DownstreamData {
5961
pub supported_extensions: Vec<u16>,
6062
/// Extensions that the JDC requires
6163
pub required_extensions: Vec<u16>,
64+
/// Per-channel byte counters: (bytes_received, bytes_sent)
65+
pub bytes_by_channel: HashMap<ChannelId, (u64, u64)>,
6266
}
6367

6468
/// Communication layer for a downstream connection.
@@ -144,6 +148,7 @@ impl Downstream {
144148
negotiated_extensions: vec![],
145149
supported_extensions,
146150
required_extensions,
151+
bytes_by_channel: HashMap::new(),
147152
}));
148153

149154
Downstream {
@@ -270,8 +275,10 @@ impl Downstream {
270275
return Ok(());
271276
}
272277

278+
let channel_id = mining_message_channel_id(&message);
273279
let message = AnyMessage::Mining(message);
274280
let sv2_frame: Sv2Frame = message.try_into().map_err(JDCError::shutdown)?;
281+
let frame_bytes = sv2_frame.encoded_length() as u64;
275282

276283
self.downstream_channel
277284
.downstream_sender
@@ -282,6 +289,13 @@ impl Downstream {
282289
JDCError::disconnect(JDCErrorKind::ChannelErrorSender, self.downstream_id)
283290
})?;
284291

292+
if let Some(ch_id) = channel_id {
293+
self.downstream_data.super_safe_lock(|data| {
294+
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
295+
entry.1 += frame_bytes;
296+
});
297+
}
298+
285299
Ok(())
286300
}
287301

@@ -293,6 +307,7 @@ impl Downstream {
293307
.recv()
294308
.await
295309
.map_err(|error| JDCError::disconnect(error, self.downstream_id))?;
310+
let frame_bytes = sv2_frame.encoded_length() as u64;
296311
let header = sv2_frame
297312
.get_header()
298313
.expect("frame header must be present");
@@ -305,6 +320,12 @@ impl Downstream {
305320
.map_err(|error| JDCError::disconnect(error, self.downstream_id))?;
306321
match any_message {
307322
AnyMessage::Mining(message) => {
323+
if let Some(ch_id) = mining_message_channel_id(&message) {
324+
self.downstream_data.super_safe_lock(|data| {
325+
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
326+
entry.0 += frame_bytes;
327+
});
328+
}
308329
self.downstream_channel
309330
.channel_manager_sender
310331
.send((self.downstream_id, message, tlv_fields))

0 commit comments

Comments
 (0)