Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use stratum_apps::{
use tracing::{debug, error, info, warn};

use crate::{
channel_manager::{ChannelManager, ChannelManagerChannel, FULL_EXTRANONCE_SIZE},
channel_manager::{ChannelManager, FULL_EXTRANONCE_SIZE},
error::{self, JDCError, JDCErrorKind},
jd_mode::{get_jd_mode, JdMode},
utils::create_close_channel_msg,
Expand Down Expand Up @@ -97,10 +97,8 @@ impl RouteMessageTo<'_> {
/// - [`RouteMessageTo::JobDeclarator`] → Sends the job declaration message to the JDS.
/// - [`RouteMessageTo::TemplateProvider`] → Sends the template distribution message to the
/// template provider.
pub async fn forward(
self,
channel_manager_channel: &ChannelManagerChannel,
) -> Result<(), JDCErrorKind> {
pub async fn forward(self, channel_manager: &ChannelManager) -> Result<(), JDCErrorKind> {
let channel_manager_channel = &channel_manager.channel_manager_channel;
match self {
RouteMessageTo::Downstream((downstream_id, message)) => {
_ = channel_manager_channel.downstream_sender.send((
Expand All @@ -113,10 +111,12 @@ impl RouteMessageTo<'_> {
if get_jd_mode() != JdMode::SoloMining {
let message_static = message.into_static();
let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?;
let sent_bytes = sv2_frame.encoded_length() as u64;
_ = channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await;
channel_manager.record_upstream_sent_bytes(sent_bytes);
}
}
RouteMessageTo::JobDeclarator(message) => {
Expand Down Expand Up @@ -201,6 +201,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
downstream.downstream_data.super_safe_lock(|data| {
data.extended_channels.remove(&msg.channel_id);
data.standard_channels.remove(&msg.channel_id);
data.bytes_by_channel.remove(&msg.channel_id);
});
channel_manager_data
.vardiff
Expand Down Expand Up @@ -453,7 +454,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})?;

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
let _ = messages.forward(self).await;
}
Ok(())
}
Expand Down Expand Up @@ -716,7 +717,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})?;

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
let _ = messages.forward(self).await;
}

Ok(())
Expand Down Expand Up @@ -911,7 +912,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
});

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
let _ = messages.forward(self).await;
}

Ok(())
Expand Down Expand Up @@ -1111,7 +1112,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})?;

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
let _ = messages.forward(self).await;
}

Ok(())
Expand Down Expand Up @@ -1334,7 +1335,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})?;

for messages in messages {
_ = messages.forward(&self.channel_manager_channel).await;
_ = messages.forward(self).await;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
AnyMessage::Extensions(new_require_extensions.into_static().into())
.try_into()
.map_err(JDCError::shutdown)?;
let sent_bytes = sv2_frame.encoded_length() as u64;

self.channel_manager_channel
.upstream_sender
Expand All @@ -153,6 +154,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
error!("Failed to send message to upstream: {:?}", e);
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
})?;
self.record_upstream_sent_bytes(sent_bytes);
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,13 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
let sv2_frame: Sv2Frame = AnyMessage::Mining(message)
.try_into()
.map_err(JDCError::shutdown)?;
let sent_bytes = sv2_frame.encoded_length() as u64;
self.channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
self.record_upstream_sent(channel_id, sent_bytes);

info!("Successfully sent SetCustomMiningJob to the upstream with channel_id: {channel_id}");
Ok(())
Expand Down
46 changes: 45 additions & 1 deletion miner-apps/jd-client/src/lib/channel_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub struct ChannelManagerData {
supported_extensions: Vec<u16>,
/// Extensions that the JDC requires
required_extensions: Vec<u16>,
/// Per-channel byte counters for the upstream server channel: (bytes_received, bytes_sent)
pub bytes_by_channel: HashMap<ChannelId, (u64, u64)>,
}

impl ChannelManagerData {
Expand All @@ -180,6 +182,7 @@ impl ChannelManagerData {
self.template_id_to_upstream_job_id.clear();
self.downstream_channel_id_and_job_id_to_template_id.clear();
self.pending_downstream_requests.clear();
self.bytes_by_channel.clear();

self.downstream_id_factory = AtomicUsize::new(0);
self.request_id_factory = AtomicU32::new(0);
Expand Down Expand Up @@ -321,6 +324,7 @@ impl ChannelManager {
negotiated_extensions: vec![],
supported_extensions,
required_extensions,
bytes_by_channel: HashMap::new(),
}));

let channel_manager_channel = ChannelManagerChannel {
Expand Down Expand Up @@ -692,6 +696,28 @@ impl ChannelManager {
Ok(())
}

/// Records bytes sent upstream for the given channel_id.
pub fn record_upstream_sent(&self, channel_id: ChannelId, bytes: u64) {
self.channel_manager_data.super_safe_lock(|data| {
let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0));
entry.1 += bytes;
});
}

/// Records bytes sent upstream, resolving the channel_id from current upstream channel.
/// Single lock acquisition instead of separate upstream_channel_id() + record_upstream_sent().
pub fn record_upstream_sent_bytes(&self, bytes: u64) {
self.channel_manager_data.super_safe_lock(|data| {
let ch_id = data
.upstream_channel
.as_ref()
.map(|ch| ch.get_channel_id())
.unwrap_or(0);
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
entry.1 += bytes;
});
}

/// Handles messages received from the Upstream subsystem.
///
/// This method listens for incoming frames on the `upstream_receiver` channel.
Expand All @@ -700,10 +726,24 @@ impl ChannelManager {
/// - If the frame contains any unsupported message type, an error is returned.
async fn handle_pool_message_frame(&mut self) -> JDCResult<(), error::ChannelManager> {
if let Ok(mut sv2_frame) = self.channel_manager_channel.upstream_receiver.recv().await {
let frame_bytes = sv2_frame.encoded_length() as u64;
let header = sv2_frame.get_header().ok_or_else(|| {
error!("SV2 frame missing header");
JDCError::fallback(framing_sv2::Error::MissingHeader)
})?;
// Count received bytes keyed by channel_id
if header.channel_msg() {
let payload = sv2_frame.payload();
if payload.len() >= 4 {
let channel_id = u32::from_le_bytes(
payload[..4].try_into().expect("slice is exactly 4 bytes"),
);
self.channel_manager_data.super_safe_lock(|data| {
let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0));
entry.0 += frame_bytes;
});
}
}
let message_type = header.msg_type();
let extension_type = header.ext_type();
let payload = sv2_frame.payload();
Expand Down Expand Up @@ -812,13 +852,15 @@ impl ChannelManager {
let sv2_frame: Sv2Frame = AnyMessage::Mining(upstream_message)
.try_into()
.map_err(JDCError::shutdown)?;
let sent_bytes = sv2_frame.encoded_length() as u64;
self.channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await
.map_err(|_| {
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
})?;
self.record_upstream_sent_bytes(sent_bytes);
}
}
UpstreamState::Pending => {
Expand Down Expand Up @@ -873,13 +915,15 @@ impl ChannelManager {
let sv2_frame: Sv2Frame = AnyMessage::Mining(message)
.try_into()
.map_err(JDCError::shutdown)?;
let sent_bytes = sv2_frame.encoded_length() as u64;
self.channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await
.map_err(|_| {
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
})?;
self.record_upstream_sent_bytes(sent_bytes);
}
}
UpstreamState::Pending => {
Expand Down Expand Up @@ -1175,7 +1219,7 @@ impl ChannelManager {
});

for message in messages {
let _ = message.forward(&self.channel_manager_channel).await;
let _ = message.forward(self).await;
}

info!("Vardiff update cycle complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
}

for message in messages {
let _ = message.forward(&self.channel_manager_channel).await;
let _ = message.forward(self).await;
}

Ok(())
Expand Down Expand Up @@ -586,7 +586,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
}

for message in messages {
let _ = message.forward(&self.channel_manager_channel).await;
let _ = message.forward(self).await;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
let sv2_frame: Sv2Frame = AnyMessage::Mining(set_custom_job)
.try_into()
.map_err(JDCError::shutdown)?;
let sent_bytes = sv2_frame.encoded_length() as u64;
self.channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
self.record_upstream_sent_bytes(sent_bytes);
_ = self.allocate_tokens(1).await;
}
}
Expand All @@ -284,11 +286,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
let sv2_frame: Sv2Frame = AnyMessage::Mining(close_channel)
.try_into()
.map_err(JDCError::shutdown)?;
let sent_bytes = sv2_frame.encoded_length() as u64;
self.channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;
self.record_upstream_sent_bytes(sent_bytes);
_ = self.allocate_tokens(1).await;
}

Expand Down Expand Up @@ -336,6 +340,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
info!("Received: {}", msg);

self.channel_manager_data.super_safe_lock(|data| {
data.bytes_by_channel.remove(&msg.channel_id);
data.upstream_channel = None;
});
Err(JDCError::fallback(JDCErrorKind::CloseChannel))
Expand Down Expand Up @@ -476,7 +481,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
})?;

for message in messages_results.into_iter().flatten() {
let _ = message.forward(&self.channel_manager_channel).await;
let _ = message.forward(self).await;
}
Ok(())
}
Expand Down
27 changes: 24 additions & 3 deletions miner-apps/jd-client/src/lib/downstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use stratum_apps::{
parsers_sv2::{parse_message_frame_with_tlvs, AnyMessage, Mining, Tlv},
},
task_manager::TaskManager,
utils::types::{DownstreamId, Message, Sv2Frame},
utils::{
protocol_message_type::mining_message_channel_id,
types::{ChannelId, DownstreamId, Message, Sv2Frame},
},
};

use bitcoin_core_sv2::CancellationToken;
Expand All @@ -33,8 +36,6 @@ use crate::{
status::{handle_error, Status, StatusSender},
};

use stratum_apps::utils::types::ChannelId;

mod common_message_handler;
mod extensions_message_handler;

Expand All @@ -45,6 +46,7 @@ mod extensions_message_handler;
/// - An optional [`GroupChannel`] if group channeling is used.
/// - Active [`ExtendedChannel`]s keyed by channel ID.
/// - Active [`StandardChannel`]s keyed by channel ID.
/// - Per-channel byte counters (bytes_received, bytes_sent)
pub struct DownstreamData {
pub require_std_job: bool,
pub group_channel: GroupChannel<'static, DefaultJobStore<ExtendedJob<'static>>>,
Expand All @@ -59,6 +61,8 @@ pub struct DownstreamData {
pub supported_extensions: Vec<u16>,
/// Extensions that the JDC requires
pub required_extensions: Vec<u16>,
/// Per-channel byte counters: (bytes_received, bytes_sent)
pub bytes_by_channel: HashMap<ChannelId, (u64, u64)>,
}

/// Communication layer for a downstream connection.
Expand Down Expand Up @@ -144,6 +148,7 @@ impl Downstream {
negotiated_extensions: vec![],
supported_extensions,
required_extensions,
bytes_by_channel: HashMap::new(),
}));

Downstream {
Expand Down Expand Up @@ -270,8 +275,10 @@ impl Downstream {
return Ok(());
}

let channel_id = mining_message_channel_id(&message);
let message = AnyMessage::Mining(message);
let sv2_frame: Sv2Frame = message.try_into().map_err(JDCError::shutdown)?;
let frame_bytes = sv2_frame.encoded_length() as u64;

self.downstream_channel
.downstream_sender
Expand All @@ -282,6 +289,13 @@ impl Downstream {
JDCError::disconnect(JDCErrorKind::ChannelErrorSender, self.downstream_id)
})?;

if let Some(ch_id) = channel_id {
self.downstream_data.super_safe_lock(|data| {
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
entry.1 += frame_bytes;
});
}

Ok(())
}

Expand All @@ -293,6 +307,7 @@ impl Downstream {
.recv()
.await
.map_err(|error| JDCError::disconnect(error, self.downstream_id))?;
let frame_bytes = sv2_frame.encoded_length() as u64;
let header = sv2_frame
.get_header()
.expect("frame header must be present");
Expand All @@ -305,6 +320,12 @@ impl Downstream {
.map_err(|error| JDCError::disconnect(error, self.downstream_id))?;
match any_message {
AnyMessage::Mining(message) => {
if let Some(ch_id) = mining_message_channel_id(&message) {
self.downstream_data.super_safe_lock(|data| {
let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0));
entry.0 += frame_bytes;
});
}
self.downstream_channel
.channel_manager_sender
.send((self.downstream_id, message, tlv_fields))
Expand Down
Loading
Loading