Skip to content

Commit 4db15b0

Browse files
author
Eric Price
committed
feat(monitoring): add per-channel and per-client byte metrics
Add bytes_received/bytes_sent tracking across all three components (pool, jd-client, translator) for SV2 channels and SV1 clients. SV2 byte tracking: - Per-channel (bytes_received, bytes_sent) counters in pool, jd-client, and translator, keyed by channel_id in bytes_by_channel maps - Shared mining_message_channel_id() helper in protocol_message_type.rs to extract channel_id from parsed Mining messages - Raw frame payload guard (len >= 4) for channel_id extraction from unparsed upstream frames in jd-client and translator - Cleanup in all CloseChannel handlers and ChannelManagerData::reset() - JDC record_upstream_sent_bytes() eliminates double-lock on sends SV1 byte tracking (translator only): - Arc<AtomicU64> counters in ConnectionSV1, shared with DownstreamData - Reader counts line.len()+1 (newline stripped by LinesCodec) - Writer counts serialized JSON bytes before write_all Prometheus metrics: - sv2_server_channel_bytes_{received,sent}_total - sv2_client_channel_bytes_{received,sent}_total - sv1_client_bytes_{received,sent}_total - All use gauge-with-reset pattern for stale label cleanup Also updates README with new metric names and design notes on capacity planning, anomaly detection, cost attribution, and reflection attack detection use cases.
1 parent 6ddd6a3 commit 4db15b0

26 files changed

Lines changed: 539 additions & 39 deletions

File tree

miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use stratum_apps::{
3030
use tracing::{debug, error, info, warn};
3131

3232
use crate::{
33-
channel_manager::{ChannelManager, ChannelManagerChannel, FULL_EXTRANONCE_SIZE},
33+
channel_manager::{ChannelManager, FULL_EXTRANONCE_SIZE},
3434
error::{self, JDCError, JDCErrorKind},
3535
jd_mode::{get_jd_mode, JdMode},
3636
utils::create_close_channel_msg,
@@ -97,10 +97,8 @@ impl RouteMessageTo<'_> {
9797
/// - [`RouteMessageTo::JobDeclarator`] → Sends the job declaration message to the JDS.
9898
/// - [`RouteMessageTo::TemplateProvider`] → Sends the template distribution message to the
9999
/// template provider.
100-
pub async fn forward(
101-
self,
102-
channel_manager_channel: &ChannelManagerChannel,
103-
) -> Result<(), JDCErrorKind> {
100+
pub async fn forward(self, channel_manager: &ChannelManager) -> Result<(), JDCErrorKind> {
101+
let channel_manager_channel = &channel_manager.channel_manager_channel;
104102
match self {
105103
RouteMessageTo::Downstream((downstream_id, message)) => {
106104
_ = channel_manager_channel.downstream_sender.send((
@@ -113,10 +111,12 @@ impl RouteMessageTo<'_> {
113111
if get_jd_mode() != JdMode::SoloMining {
114112
let message_static = message.into_static();
115113
let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?;
114+
let sent_bytes = sv2_frame.encoded_length() as u64;
116115
_ = channel_manager_channel
117116
.upstream_sender
118117
.send(sv2_frame)
119118
.await;
119+
channel_manager.record_upstream_sent_bytes(sent_bytes);
120120
}
121121
}
122122
RouteMessageTo::JobDeclarator(message) => {
@@ -201,6 +201,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
201201
downstream.downstream_data.super_safe_lock(|data| {
202202
data.extended_channels.remove(&msg.channel_id);
203203
data.standard_channels.remove(&msg.channel_id);
204+
data.bytes_by_channel.remove(&msg.channel_id);
204205
});
205206
channel_manager_data
206207
.vardiff
@@ -453,7 +454,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
453454
})?;
454455

455456
for messages in messages {
456-
let _ = messages.forward(&self.channel_manager_channel).await;
457+
let _ = messages.forward(self).await;
457458
}
458459
Ok(())
459460
}
@@ -716,7 +717,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
716717
})?;
717718

718719
for messages in messages {
719-
let _ = messages.forward(&self.channel_manager_channel).await;
720+
let _ = messages.forward(self).await;
720721
}
721722

722723
Ok(())
@@ -911,7 +912,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
911912
});
912913

913914
for messages in messages {
914-
let _ = messages.forward(&self.channel_manager_channel).await;
915+
let _ = messages.forward(self).await;
915916
}
916917

917918
Ok(())
@@ -1111,7 +1112,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
11111112
})?;
11121113

11131114
for messages in messages {
1114-
let _ = messages.forward(&self.channel_manager_channel).await;
1115+
let _ = messages.forward(self).await;
11151116
}
11161117

11171118
Ok(())
@@ -1334,7 +1335,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
13341335
})?;
13351336

13361337
for messages in messages {
1337-
_ = messages.forward(&self.channel_manager_channel).await;
1338+
_ = messages.forward(self).await;
13381339
}
13391340

13401341
Ok(())

miner-apps/jd-client/src/lib/channel_manager/extensions_message_handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
144144
AnyMessage::Extensions(new_require_extensions.into_static().into())
145145
.try_into()
146146
.map_err(JDCError::shutdown)?;
147+
let sent_bytes = sv2_frame.encoded_length() as u64;
147148

148149
self.channel_manager_channel
149150
.upstream_sender
@@ -153,6 +154,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
153154
error!("Failed to send message to upstream: {:?}", e);
154155
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
155156
})?;
157+
self.record_upstream_sent_bytes(sent_bytes);
156158
}
157159

158160
Ok(())

miner-apps/jd-client/src/lib/channel_manager/jd_message_handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,13 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
222222
let sv2_frame: Sv2Frame = AnyMessage::Mining(message)
223223
.try_into()
224224
.map_err(JDCError::shutdown)?;
225+
let sent_bytes = sv2_frame.encoded_length() as u64;
225226
self.channel_manager_channel
226227
.upstream_sender
227228
.send(sv2_frame)
228229
.await
229230
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
231+
self.record_upstream_sent(channel_id, sent_bytes);
230232

231233
info!("Successfully sent SetCustomMiningJob to the upstream with channel_id: {channel_id}");
232234
Ok(())

miner-apps/jd-client/src/lib/channel_manager/mod.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ pub struct ChannelManagerData {
158158
supported_extensions: Vec<u16>,
159159
/// Extensions that the JDC requires
160160
required_extensions: Vec<u16>,
161+
/// Per-channel byte counters for the upstream server channel: (bytes_received, bytes_sent)
162+
pub bytes_by_channel: HashMap<ChannelId, (u64, u64)>,
161163
}
162164

163165
impl ChannelManagerData {
@@ -173,6 +175,7 @@ impl ChannelManagerData {
173175
self.template_id_to_upstream_job_id.clear();
174176
self.downstream_channel_id_and_job_id_to_template_id.clear();
175177
self.pending_downstream_requests.clear();
178+
self.bytes_by_channel.clear();
176179

177180
self.downstream_id_factory = AtomicUsize::new(0);
178181
self.request_id_factory = AtomicU32::new(0);
@@ -314,6 +317,7 @@ impl ChannelManager {
314317
negotiated_extensions: vec![],
315318
supported_extensions,
316319
required_extensions,
320+
bytes_by_channel: HashMap::new(),
317321
}));
318322

319323
let channel_manager_channel = ChannelManagerChannel {
@@ -685,6 +689,28 @@ impl ChannelManager {
685689
Ok(())
686690
}
687691

692+
/// Records bytes sent upstream for the given channel_id.
693+
pub fn record_upstream_sent(&self, channel_id: ChannelId, bytes: u64) {
694+
self.channel_manager_data.super_safe_lock(|data| {
695+
let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0));
696+
entry.1 += bytes;
697+
});
698+
}
699+
700+
/// Records bytes sent upstream, resolving the channel_id from current upstream channel.
701+
/// Single lock acquisition instead of separate upstream_channel_id() + record_upstream_sent().
702+
pub fn record_upstream_sent_bytes(&self, bytes: u64) {
703+
self.channel_manager_data.super_safe_lock(|data| {
704+
let ch_id = data
705+
.upstream_channel
706+
.as_ref()
707+
.map(|ch| ch.get_channel_id())
708+
.unwrap_or(0);
709+
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
710+
entry.1 += bytes;
711+
});
712+
}
713+
688714
/// Handles messages received from the Upstream subsystem.
689715
///
690716
/// This method listens for incoming frames on the `upstream_receiver` channel.
@@ -693,10 +719,24 @@ impl ChannelManager {
693719
/// - If the frame contains any unsupported message type, an error is returned.
694720
async fn handle_pool_message_frame(&mut self) -> JDCResult<(), error::ChannelManager> {
695721
if let Ok(mut sv2_frame) = self.channel_manager_channel.upstream_receiver.recv().await {
722+
let frame_bytes = sv2_frame.encoded_length() as u64;
696723
let header = sv2_frame.get_header().ok_or_else(|| {
697724
error!("SV2 frame missing header");
698725
JDCError::fallback(framing_sv2::Error::MissingHeader)
699726
})?;
727+
// Count received bytes keyed by channel_id
728+
if header.channel_msg() {
729+
let payload = sv2_frame.payload();
730+
if payload.len() >= 4 {
731+
let channel_id = u32::from_le_bytes(
732+
payload[..4].try_into().expect("slice is exactly 4 bytes"),
733+
);
734+
self.channel_manager_data.super_safe_lock(|data| {
735+
let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0));
736+
entry.0 += frame_bytes;
737+
});
738+
}
739+
}
700740
let message_type = header.msg_type();
701741
let extension_type = header.ext_type();
702742
let payload = sv2_frame.payload();
@@ -806,13 +846,15 @@ impl ChannelManager {
806846
let sv2_frame: Sv2Frame = AnyMessage::Mining(upstream_message)
807847
.try_into()
808848
.map_err(JDCError::shutdown)?;
849+
let sent_bytes = sv2_frame.encoded_length() as u64;
809850
self.channel_manager_channel
810851
.upstream_sender
811852
.send(sv2_frame)
812853
.await
813854
.map_err(|_| {
814855
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
815856
})?;
857+
self.record_upstream_sent_bytes(sent_bytes);
816858
}
817859
}
818860
UpstreamState::Pending => {
@@ -867,13 +909,15 @@ impl ChannelManager {
867909
let sv2_frame: Sv2Frame = AnyMessage::Mining(message)
868910
.try_into()
869911
.map_err(JDCError::shutdown)?;
912+
let sent_bytes = sv2_frame.encoded_length() as u64;
870913
self.channel_manager_channel
871914
.upstream_sender
872915
.send(sv2_frame)
873916
.await
874917
.map_err(|_| {
875918
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
876919
})?;
920+
self.record_upstream_sent_bytes(sent_bytes);
877921
}
878922
}
879923
UpstreamState::Pending => {
@@ -1169,7 +1213,7 @@ impl ChannelManager {
11691213
});
11701214

11711215
for message in messages {
1172-
let _ = message.forward(&self.channel_manager_channel).await;
1216+
let _ = message.forward(self).await;
11731217
}
11741218

11751219
info!("Vardiff update cycle complete");

miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
223223
}
224224

225225
for message in messages {
226-
let _ = message.forward(&self.channel_manager_channel).await;
226+
let _ = message.forward(self).await;
227227
}
228228

229229
Ok(())
@@ -586,7 +586,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
586586
}
587587

588588
for message in messages {
589-
let _ = message.forward(&self.channel_manager_channel).await;
589+
let _ = message.forward(self).await;
590590
}
591591

592592
Ok(())

miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
247247
let sv2_frame: Sv2Frame = AnyMessage::Mining(set_custom_job)
248248
.try_into()
249249
.map_err(JDCError::shutdown)?;
250+
let sent_bytes = sv2_frame.encoded_length() as u64;
250251
self.channel_manager_channel
251252
.upstream_sender
252253
.send(sv2_frame)
253254
.await
254255
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
256+
self.record_upstream_sent_bytes(sent_bytes);
255257
_ = self.allocate_tokens(1).await;
256258
}
257259
}
@@ -276,11 +278,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
276278
let sv2_frame: Sv2Frame = AnyMessage::Mining(close_channel)
277279
.try_into()
278280
.map_err(JDCError::shutdown)?;
281+
let sent_bytes = sv2_frame.encoded_length() as u64;
279282
self.channel_manager_channel
280283
.upstream_sender
281284
.send(sv2_frame)
282285
.await
283286
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
287+
self.record_upstream_sent_bytes(sent_bytes);
284288
_ = self.allocate_tokens(1).await;
285289
}
286290

@@ -328,6 +332,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
328332
info!("Received: {}", msg);
329333

330334
self.channel_manager_data.super_safe_lock(|data| {
335+
data.bytes_by_channel.remove(&msg.channel_id);
331336
data.upstream_channel = None;
332337
});
333338
Err(JDCError::fallback(JDCErrorKind::CloseChannel))
@@ -459,7 +464,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
459464
})?;
460465

461466
for message in messages_results.into_iter().flatten() {
462-
let _ = message.forward(&self.channel_manager_channel).await;
467+
let _ = message.forward(self).await;
463468
}
464469
Ok(())
465470
}

miner-apps/jd-client/src/lib/downstream/mod.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ use stratum_apps::{
2020
parsers_sv2::{parse_message_frame_with_tlvs, AnyMessage, Mining, Tlv},
2121
},
2222
task_manager::TaskManager,
23-
utils::types::{DownstreamId, Message, Sv2Frame},
23+
utils::{
24+
protocol_message_type::mining_message_channel_id,
25+
types::{ChannelId, DownstreamId, Message, Sv2Frame},
26+
},
2427
};
2528

2629
use bitcoin_core_sv2::CancellationToken;
@@ -33,8 +36,6 @@ use crate::{
3336
status::{handle_error, Status, StatusSender},
3437
};
3538

36-
use stratum_apps::utils::types::ChannelId;
37-
3839
mod common_message_handler;
3940
mod extensions_message_handler;
4041

@@ -45,6 +46,7 @@ mod extensions_message_handler;
4546
/// - An optional [`GroupChannel`] if group channeling is used.
4647
/// - Active [`ExtendedChannel`]s keyed by channel ID.
4748
/// - Active [`StandardChannel`]s keyed by channel ID.
49+
/// - Per-channel byte counters (bytes_received, bytes_sent)
4850
pub struct DownstreamData {
4951
pub require_std_job: bool,
5052
pub group_channel: GroupChannel<'static, DefaultJobStore<ExtendedJob<'static>>>,
@@ -59,6 +61,8 @@ pub struct DownstreamData {
5961
pub supported_extensions: Vec<u16>,
6062
/// Extensions that the JDC requires
6163
pub required_extensions: Vec<u16>,
64+
/// Per-channel byte counters: (bytes_received, bytes_sent)
65+
pub bytes_by_channel: HashMap<ChannelId, (u64, u64)>,
6266
}
6367

6468
/// Communication layer for a downstream connection.
@@ -144,6 +148,7 @@ impl Downstream {
144148
negotiated_extensions: vec![],
145149
supported_extensions,
146150
required_extensions,
151+
bytes_by_channel: HashMap::new(),
147152
}));
148153

149154
Downstream {
@@ -270,8 +275,10 @@ impl Downstream {
270275
return Ok(());
271276
}
272277

278+
let channel_id = mining_message_channel_id(&message);
273279
let message = AnyMessage::Mining(message);
274280
let sv2_frame: Sv2Frame = message.try_into().map_err(JDCError::shutdown)?;
281+
let frame_bytes = sv2_frame.encoded_length() as u64;
275282

276283
self.downstream_channel
277284
.downstream_sender
@@ -282,6 +289,13 @@ impl Downstream {
282289
JDCError::disconnect(JDCErrorKind::ChannelErrorSender, self.downstream_id)
283290
})?;
284291

292+
if let Some(ch_id) = channel_id {
293+
self.downstream_data.super_safe_lock(|data| {
294+
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
295+
entry.1 += frame_bytes;
296+
});
297+
}
298+
285299
Ok(())
286300
}
287301

@@ -293,6 +307,7 @@ impl Downstream {
293307
.recv()
294308
.await
295309
.map_err(|error| JDCError::disconnect(error, self.downstream_id))?;
310+
let frame_bytes = sv2_frame.encoded_length() as u64;
296311
let header = sv2_frame
297312
.get_header()
298313
.expect("frame header must be present");
@@ -305,6 +320,12 @@ impl Downstream {
305320
.map_err(|error| JDCError::disconnect(error, self.downstream_id))?;
306321
match any_message {
307322
AnyMessage::Mining(message) => {
323+
if let Some(ch_id) = mining_message_channel_id(&message) {
324+
self.downstream_data.super_safe_lock(|data| {
325+
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
326+
entry.0 += frame_bytes;
327+
});
328+
}
308329
self.downstream_channel
309330
.channel_manager_sender
310331
.send((self.downstream_id, message, tlv_fields))

0 commit comments

Comments
 (0)