Skip to content

Commit 8333bc8

Browse files
committed
remove broadcast channel from tproxy
1 parent e7a8cd7 commit 8333bc8

8 files changed

Lines changed: 148 additions & 146 deletions

File tree

miner-apps/jd-client/src/lib/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use stratum_apps::{
1616
tp_type::TemplateProviderType,
1717
utils::types::{Sv2Frame, GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS},
1818
};
19+
use tokio::sync::Notify;
1920
use tracing::{debug, error, info, warn};
2021

2122
use crate::{

miner-apps/translator/src/lib/error.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use stratum_apps::{
2929
MessageType,
3030
},
3131
};
32-
use tokio::sync::broadcast;
3332

3433
pub type TproxyResult<T, Owner> = Result<T, TproxyError<Owner>>;
3534

@@ -158,10 +157,6 @@ pub enum TproxyErrorKind {
158157
ChannelErrorReceiver(async_channel::RecvError),
159158
/// Channel sender error
160159
ChannelErrorSender,
161-
/// Broadcast channel receiver error
162-
BroadcastChannelErrorReceiver(broadcast::error::RecvError),
163-
/// Tokio channel receiver error
164-
TokioChannelErrorRecv(tokio::sync::broadcast::error::RecvError),
165160
/// Error converting SetDifficulty to Message
166161
SetDifficultyToMessage(SetDifficulty),
167162
/// Received an unexpected message type
@@ -223,11 +218,7 @@ impl fmt::Display for TproxyErrorKind {
223218
ParseInt(ref e) => write!(f, "Bad convert from `String` to `int`: `{e:?}`"),
224219
PoisonLock => write!(f, "Poison Lock error"),
225220
ChannelErrorReceiver(ref e) => write!(f, "Channel receive error: `{e:?}`"),
226-
BroadcastChannelErrorReceiver(ref e) => {
227-
write!(f, "Broadcast channel receive error: {e:?}")
228-
}
229221
ChannelErrorSender => write!(f, "Sender error"),
230-
TokioChannelErrorRecv(ref e) => write!(f, "Channel receive error: `{e:?}`"),
231222
SetDifficultyToMessage(ref e) => {
232223
write!(f, "Error converting SetDifficulty to Message: `{e:?}`")
233224
}
@@ -333,12 +324,6 @@ impl From<async_channel::RecvError> for TproxyErrorKind {
333324
}
334325
}
335326

336-
impl From<tokio::sync::broadcast::error::RecvError> for TproxyErrorKind {
337-
fn from(e: tokio::sync::broadcast::error::RecvError) -> Self {
338-
TproxyErrorKind::TokioChannelErrorRecv(e)
339-
}
340-
}
341-
342327
//*** LOCK ERRORS ***
343328
impl<T> From<PoisonError<T>> for TproxyErrorKind {
344329
fn from(_e: PoisonError<T>) -> Self {

miner-apps/translator/src/lib/sv1/downstream/channel.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
use async_channel::{Receiver, Sender};
2-
use stratum_apps::{
3-
stratum_core::sv1_api::json_rpc,
4-
utils::types::{ChannelId, DownstreamId},
5-
};
6-
use tokio::sync::broadcast;
2+
use stratum_apps::{stratum_core::sv1_api::json_rpc, utils::types::DownstreamId};
73
use tokio_util::sync::CancellationToken;
84
use tracing::debug;
95

@@ -12,8 +8,7 @@ pub struct DownstreamChannelState {
128
pub downstream_sv1_sender: Sender<json_rpc::Message>,
139
pub downstream_sv1_receiver: Receiver<json_rpc::Message>,
1410
pub sv1_server_sender: Sender<(DownstreamId, json_rpc::Message)>,
15-
pub sv1_server_broadcast:
16-
broadcast::Sender<(ChannelId, Option<DownstreamId>, json_rpc::Message)>, /* channel_id, optional downstream_id, message */
11+
pub sv1_server_receiver: Receiver<json_rpc::Message>,
1712
/// Per-connection cancellation token (child of the global token).
1813
/// Cancelled when this downstream's task loop exits, causing
1914
/// the associated SV1 I/O task to shut down.
@@ -26,17 +21,13 @@ impl DownstreamChannelState {
2621
downstream_sv1_sender: Sender<json_rpc::Message>,
2722
downstream_sv1_receiver: Receiver<json_rpc::Message>,
2823
sv1_server_sender: Sender<(DownstreamId, json_rpc::Message)>,
29-
sv1_server_broadcast: broadcast::Sender<(
30-
ChannelId,
31-
Option<DownstreamId>,
32-
json_rpc::Message,
33-
)>,
24+
sv1_server_receiver: Receiver<json_rpc::Message>,
3425
connection_token: CancellationToken,
3526
) -> Self {
3627
Self {
3728
downstream_sv1_receiver,
3829
downstream_sv1_sender,
39-
sv1_server_broadcast,
30+
sv1_server_receiver,
4031
sv1_server_sender,
4132
connection_token,
4233
}

miner-apps/translator/src/lib/sv1/downstream/downstream.rs

Lines changed: 19 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::{
22
error::{self, TproxyError, TproxyErrorKind, TproxyResult},
33
status::{handle_error, StatusSender},
44
sv1::downstream::{channel::DownstreamChannelState, data::DownstreamData},
5-
utils::AGGREGATED_CHANNEL_ID,
65
};
76
use async_channel::{Receiver, Sender};
87
use std::{
@@ -23,9 +22,8 @@ use stratum_apps::{
2322
},
2423
},
2524
task_manager::TaskManager,
26-
utils::types::{ChannelId, DownstreamId, Hashrate},
25+
utils::types::{DownstreamId, Hashrate},
2726
};
28-
use tokio::sync::broadcast;
2927
use tokio_util::sync::CancellationToken;
3028
use tracing::{debug, error, info, warn};
3129

@@ -62,11 +60,7 @@ impl Downstream {
6260
downstream_sv1_sender: Sender<json_rpc::Message>,
6361
downstream_sv1_receiver: Receiver<json_rpc::Message>,
6462
sv1_server_sender: Sender<(DownstreamId, json_rpc::Message)>,
65-
sv1_server_broadcast: broadcast::Sender<(
66-
ChannelId,
67-
Option<DownstreamId>,
68-
json_rpc::Message,
69-
)>,
63+
sv1_server_receiver: Receiver<json_rpc::Message>,
7064
target: Target,
7165
hashrate: Option<Hashrate>,
7266
connection_token: CancellationToken,
@@ -76,7 +70,7 @@ impl Downstream {
7670
downstream_sv1_sender,
7771
downstream_sv1_receiver,
7872
sv1_server_sender,
79-
sv1_server_broadcast,
73+
sv1_server_receiver,
8074
connection_token,
8175
);
8276
Self {
@@ -106,10 +100,6 @@ impl Downstream {
106100
status_sender: StatusSender,
107101
task_manager: Arc<TaskManager>,
108102
) {
109-
let mut sv1_server_receiver = self
110-
.downstream_channel_state
111-
.sv1_server_broadcast
112-
.subscribe();
113103
let downstream_id = self.downstream_id;
114104
task_manager.spawn(async move {
115105
// we just spawned a new task that's relevant to fallback coordination
@@ -141,7 +131,7 @@ impl Downstream {
141131
}
142132

143133
// Handle server -> downstream message
144-
res = self.handle_sv1_server_message(&mut sv1_server_receiver) => {
134+
res = self.handle_sv1_server_message() => {
145135
if let Err(e) = res {
146136
error!("Downstream {downstream_id}: error in server message handler: {e:?}");
147137
if handle_error(&status_sender, e).await {
@@ -181,25 +171,16 @@ impl Downstream {
181171
/// complete
182172
/// - On handshake completion: sends cached messages in correct order (set_difficulty first,
183173
/// then notify)
184-
pub async fn handle_sv1_server_message(
185-
&self,
186-
sv1_server_receiver: &mut broadcast::Receiver<(
187-
ChannelId,
188-
Option<DownstreamId>,
189-
json_rpc::Message,
190-
)>,
191-
) -> TproxyResult<(), error::Downstream> {
192-
match sv1_server_receiver.recv().await {
193-
Ok((channel_id, downstream_id, message)) => {
194-
let my_channel_id = self.downstream_data.super_safe_lock(|d| d.channel_id);
195-
let my_downstream_id = self.downstream_id;
174+
pub async fn handle_sv1_server_message(&self) -> TproxyResult<(), error::Downstream> {
175+
match self
176+
.downstream_channel_state
177+
.sv1_server_receiver
178+
.recv()
179+
.await
180+
{
181+
Ok(message) => {
182+
let downstream_id = self.downstream_id;
196183
let handshake_complete = self.sv1_handshake_complete.load(Ordering::SeqCst);
197-
let id_matches = (my_channel_id == Some(channel_id)
198-
|| channel_id == AGGREGATED_CHANNEL_ID)
199-
&& (downstream_id.is_none() || downstream_id == Some(my_downstream_id));
200-
if !id_matches {
201-
return Ok(()); // Message not intended for this downstream
202-
}
203184

204185
// Check if this is a queued message response
205186
let is_queued_sv1_handshake_response = self
@@ -267,7 +248,7 @@ impl Downstream {
267248
"Down: Failed to send mining.set_difficulty to downstream: {:?}",
268249
e
269250
);
270-
TproxyError::disconnect(TproxyErrorKind::ChannelErrorSender, downstream_id.unwrap_or(0))
251+
TproxyError::disconnect(TproxyErrorKind::ChannelErrorSender, downstream_id)
271252
})?;
272253
}
273254

@@ -279,7 +260,7 @@ impl Downstream {
279260
.await
280261
.map_err(|e| {
281262
error!("Down: Failed to send mining.notify to downstream: {:?}", e);
282-
TproxyError::disconnect(TproxyErrorKind::ChannelErrorSender, downstream_id.unwrap_or(0))
263+
TproxyError::disconnect(TproxyErrorKind::ChannelErrorSender, downstream_id)
283264
})?;
284265
}
285266
return Ok(());
@@ -297,7 +278,7 @@ impl Downstream {
297278
);
298279
TproxyError::disconnect(
299280
TproxyErrorKind::ChannelErrorSender,
300-
downstream_id.unwrap_or(0),
281+
downstream_id,
301282
)
302283
})?;
303284
}
@@ -338,7 +319,7 @@ impl Downstream {
338319
error!("Down: Failed to send queued message to downstream: {:?}", e);
339320
TproxyError::disconnect(
340321
TproxyErrorKind::ChannelErrorSender,
341-
downstream_id.unwrap_or(0),
322+
downstream_id,
342323
)
343324
})?;
344325
} else {
@@ -348,12 +329,11 @@ impl Downstream {
348329
}
349330
}
350331
Err(e) => {
351-
let downstream_id = self.downstream_id;
352332
error!(
353333
"Sv1 message handler error for downstream {}: {:?}",
354-
downstream_id, e
334+
self.downstream_id, e
355335
);
356-
return Err(TproxyError::disconnect(e, downstream_id));
336+
return Err(TproxyError::disconnect(e, self.downstream_id));
357337
}
358338
}
359339

miner-apps/translator/src/lib/sv1/sv1_server/channel.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
use async_channel::{unbounded, Receiver, Sender};
2+
use dashmap::DashMap;
3+
use std::sync::Arc;
24
use stratum_apps::stratum_core::parsers_sv2::{Mining, Tlv};
35

4-
use stratum_apps::{
5-
stratum_core::sv1_api::json_rpc,
6-
utils::types::{ChannelId, DownstreamId},
7-
};
8-
use tokio::sync::broadcast;
6+
use stratum_apps::{stratum_core::sv1_api::json_rpc, utils::types::DownstreamId};
97

108
#[derive(Clone)]
119
pub struct Sv1ServerChannelState {
12-
pub sv1_server_to_downstream_sender:
13-
broadcast::Sender<(ChannelId, Option<DownstreamId>, json_rpc::Message)>,
10+
pub sv1_server_to_downstream_sender: Arc<DashMap<DownstreamId, Sender<json_rpc::Message>>>,
1411
pub downstream_to_sv1_server_sender: Sender<(DownstreamId, json_rpc::Message)>,
1512
pub downstream_to_sv1_server_receiver: Receiver<(DownstreamId, json_rpc::Message)>,
1613
pub channel_manager_receiver: Receiver<(Mining<'static>, Option<Vec<Tlv>>)>,
@@ -23,11 +20,10 @@ impl Sv1ServerChannelState {
2320
channel_manager_receiver: Receiver<(Mining<'static>, Option<Vec<Tlv>>)>,
2421
channel_manager_sender: Sender<(Mining<'static>, Option<Vec<Tlv>>)>,
2522
) -> Self {
26-
let (sv1_server_to_downstream_sender, _) = broadcast::channel(1000);
2723
let (downstream_to_sv1_server_sender, downstream_to_sv1_server_receiver) = unbounded();
2824

2925
Self {
30-
sv1_server_to_downstream_sender,
26+
sv1_server_to_downstream_sender: Arc::new(DashMap::new()),
3127
downstream_to_sv1_server_receiver,
3228
downstream_to_sv1_server_sender,
3329
channel_manager_receiver,

miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -150,24 +150,26 @@ impl Sv1Server {
150150
}
151151

152152
// Process immediate set_difficulty updates (for new_target >= upstream_target)
153-
for (channel_id, downstream_id, target) in immediate_updates {
153+
for (_channel_id, downstream_id, target) in immediate_updates {
154154
// Send set_difficulty message immediately
155155
if let Ok(set_difficulty_msg) = build_sv1_set_difficulty_from_sv2_target(target) {
156-
if let Err(e) = self
156+
let ds_id = downstream_id.unwrap_or(0);
157+
if let Some(sender) = self
157158
.sv1_server_channel_state
158159
.sv1_server_to_downstream_sender
159-
.send((channel_id, downstream_id, set_difficulty_msg))
160+
.get(&ds_id)
160161
{
161-
error!(
162-
"Failed to send immediate SetDifficulty message to downstream {}: {:?}",
163-
downstream_id.unwrap_or(0),
164-
e
165-
);
166-
} else {
167-
trace!(
168-
"Sent immediate SetDifficulty to downstream {} (new_target >= upstream_target)",
169-
downstream_id.unwrap_or(0)
170-
);
162+
if let Err(e) = sender.value().try_send(set_difficulty_msg) {
163+
error!(
164+
"Failed to send immediate SetDifficulty message to downstream {}: {:?}",
165+
ds_id, e
166+
);
167+
} else {
168+
trace!(
169+
"Sent immediate SetDifficulty to downstream {} (new_target >= upstream_target)",
170+
ds_id
171+
);
172+
}
171173
}
172174
}
173175
}
@@ -426,7 +428,7 @@ impl Sv1Server {
426428
.get(&update.downstream_id)
427429
.and_then(|ds| ds.downstream_data.super_safe_lock(|d| d.channel_id));
428430

429-
let Some(channel_id) = channel_id else {
431+
let Some(_channel_id) = channel_id else {
430432
trace!(
431433
"Skipping SetDifficulty for downstream {}: no channel_id yet",
432434
update.downstream_id
@@ -446,17 +448,19 @@ impl Sv1Server {
446448
}
447449
};
448450

449-
if let Err(e) = self
451+
if let Some(sender) = self
450452
.sv1_server_channel_state
451453
.sv1_server_to_downstream_sender
452-
.send((channel_id, Some(update.downstream_id), set_difficulty_msg))
454+
.get(&update.downstream_id)
453455
{
454-
error!(
455-
"Failed to send SetDifficulty to downstream {}: {:?}",
456-
update.downstream_id, e
457-
);
458-
} else {
459-
trace!("Sent SetDifficulty to downstream {}", update.downstream_id);
456+
if let Err(e) = sender.value().try_send(set_difficulty_msg) {
457+
error!(
458+
"Failed to send SetDifficulty to downstream {}: {:?}",
459+
update.downstream_id, e
460+
);
461+
} else {
462+
trace!("Sent SetDifficulty to downstream {}", update.downstream_id);
463+
}
460464
}
461465
}
462466
}

0 commit comments

Comments
 (0)