Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions mujina-miner/src/asic/bm13xx/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,13 @@ pub struct Frequency {

impl Frequency {
/// Minimum supported frequency in MHz
#[allow(dead_code)]
pub const MIN_MHZ: f32 = 50.0;
/// Maximum supported frequency in MHz
#[allow(dead_code)]
pub const MAX_MHZ: f32 = 800.0;
/// Base crystal frequency in MHz
const CRYSTAL_MHZ: f32 = 25.0;
pub const CRYSTAL_MHZ: f32 = 25.0;

/// Create frequency from MHz value with validation
#[allow(dead_code)]
pub fn from_mhz(mhz: f32) -> Result<Self, ProtocolError> {
if !(Self::MIN_MHZ..=Self::MAX_MHZ).contains(&mhz) {
return Err(ProtocolError::InvalidFrequency { mhz: mhz as u32 });
Expand All @@ -62,7 +59,6 @@ impl Frequency {
}

/// Get frequency in MHz
#[allow(dead_code)]
pub fn mhz(&self) -> f32 {
self.mhz
}
Expand Down
179 changes: 156 additions & 23 deletions mujina-miner/src/asic/bm13xx/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
//! The thread is implemented as an actor task that monitors the serial bus for
//! chip responses, filters shares, and manages work assignment.

use std::sync::{Arc, RwLock};
use std::{
sync::{Arc, RwLock},
time::Duration,
};

use async_trait::async_trait;
use bitcoin::block::Header as BlockHeader;
use futures::{SinkExt, sink::Sink, stream::Stream};
use futures::{SinkExt, future, sink::Sink, stream::Stream};
use tokio::sync::{mpsc, oneshot, watch};
use tokio_stream::StreamExt;

Expand All @@ -21,10 +24,16 @@ use crate::{
BoardPeripherals, HashTask, HashThread, HashThreadCapabilities, HashThreadError,
HashThreadEvent, HashThreadStatus, Share, ThreadRemovalSignal,
},
thermal::FrequencyCommand,
tracing::prelude::*,
types::{Difficulty, HashRate},
};

/// 1/4 of runtime step size: smaller steps during init prevent PLL
/// instability on cold chips.
const INITIALIZATION_RAMP_STEP_SIZE_MHZ: f32 = protocol::Frequency::CRYSTAL_MHZ / 4f32;
const FREQUENCY_WRITE_TIMEOUT: Duration = Duration::from_millis(750);

/// Tracks tasks sent to chip hardware, indexed by chip_job_id.
///
/// BM13xx chips use 4-bit job IDs. This tracker maintains snapshots of
Expand Down Expand Up @@ -86,6 +95,16 @@ enum ThreadCommand {
Shutdown,
}

/// Actor context for BM13xx thread.
///
/// Groups channels and shared state to reduce function parameter count.
struct ThreadActorContext {
cmd_rx: mpsc::Receiver<ThreadCommand>,
_evt_tx: mpsc::Sender<HashThreadEvent>,
removal_rx: watch::Receiver<ThreadRemovalSignal>,
status: Arc<RwLock<HashThreadStatus>>,
}

/// BM13xx HashThread implementation.
///
/// Represents a chain of BM13xx chips as a schedulable worker. The thread
Expand Down Expand Up @@ -120,12 +139,16 @@ impl BM13xxThread {
/// * `chip_commands` - Sink for sending encoded commands to chips
/// * `peripherals` - Hardware interfaces from board (enable, regulator, etc.)
/// * `removal_rx` - Watch channel for board-triggered removal
/// * `frequency_command_rx` - Optional receiver for frequency adjustment commands
/// * `operating_frequency_mhz` - Target operating frequency for ASIC chips (MHz)
pub fn new<R, W>(
name: String,
chip_responses: R,
chip_commands: W,
peripherals: BoardPeripherals,
removal_rx: watch::Receiver<ThreadRemovalSignal>,
frequency_command_rx: Option<mpsc::Receiver<FrequencyCommand>>,
operating_frequency_mhz: f32,
) -> Self
where
R: Stream<Item = Result<protocol::Response, std::io::Error>> + Unpin + Send + 'static,
Expand All @@ -138,16 +161,21 @@ impl BM13xxThread {
let status = Arc::new(RwLock::new(HashThreadStatus::default()));
let status_clone = Arc::clone(&status);

// Spawn the actor task
let ctx = ThreadActorContext {
cmd_rx,
_evt_tx: evt_tx,
removal_rx,
status: status_clone,
};

tokio::spawn(async move {
bm13xx_thread_actor(
cmd_rx,
evt_tx,
removal_rx,
status_clone,
ctx,
chip_responses,
chip_commands,
peripherals,
frequency_command_rx,
operating_frequency_mhz,
)
.await;
});
Expand Down Expand Up @@ -240,6 +268,7 @@ impl HashThread for BM13xxThread {
async fn initialize_chip<W>(
chip_commands: &mut W,
peripherals: &mut BoardPeripherals,
operating_frequency_mhz: f32,
) -> Result<(), HashThreadError>
where
W: Sink<protocol::Command> + Unpin,
Expand Down Expand Up @@ -498,9 +527,16 @@ where
HashThreadError::InitializationFailed(format!("Core final send failed: {:?}", e))
})?;

// Frequency ramping (56.25 MHz -> 525 MHz)
debug!("Ramping frequency from 56.25 MHz to 525 MHz");
let frequency_steps = generate_frequency_ramp_steps(56.25, 525.0, 6.25);
// Ramp frequency gradually to prevent PLL instability during initialization
debug!(
"Ramping frequency from 56.25 MHz to {} MHz",
operating_frequency_mhz
);
let frequency_steps = generate_frequency_ramp_steps(
56.25,
operating_frequency_mhz,
INITIALIZATION_RAMP_STEP_SIZE_MHZ,
);

for (i, pll_config) in frequency_steps.iter().enumerate() {
chip_commands
Expand Down Expand Up @@ -678,6 +714,18 @@ fn calculate_pll_for_frequency(target_freq: f32) -> Option<protocol::PllConfig>
))
}

/// Receives a frequency command when thermal management is active and the chip is initialized.
/// Otherwise never completes (uses pending future).
async fn recv_frequency_command_if_ready(
frequency_command_rx: &mut Option<mpsc::Receiver<FrequencyCommand>>,
chip_initialized: bool,
) -> Option<FrequencyCommand> {
match (chip_initialized, frequency_command_rx) {
(true, Some(rx)) => rx.recv().await,
_ => future::pending().await,
}
}

/// Internal actor task for BM13xxThread.
///
/// This runs as an independent Tokio task and handles:
Expand All @@ -690,13 +738,12 @@ fn calculate_pll_for_frequency(target_freq: f32) -> Option<protocol::PllConfig>
/// Chip is disabled on startup to establish known state. Chip is enabled and
/// configured when scheduler assigns first work.
async fn bm13xx_thread_actor<R, W>(
mut cmd_rx: mpsc::Receiver<ThreadCommand>,
_evt_tx: mpsc::Sender<HashThreadEvent>,
mut removal_rx: watch::Receiver<ThreadRemovalSignal>,
status: Arc<RwLock<HashThreadStatus>>,
mut ctx: ThreadActorContext,
mut chip_responses: R,
mut chip_commands: W,
mut peripherals: BoardPeripherals,
mut frequency_command_rx: Option<mpsc::Receiver<FrequencyCommand>>,
operating_frequency_mhz: f32,
) where
R: Stream<Item = Result<protocol::Response, std::io::Error>> + Unpin,
W: Sink<protocol::Command> + Unpin,
Expand All @@ -715,19 +762,21 @@ async fn bm13xx_thread_actor<R, W>(
let mut ntime_ticker = tokio::time::interval(tokio::time::Duration::from_secs(1));
ntime_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut current_frequency_mhz: f32 = operating_frequency_mhz;

loop {
tokio::select! {
// Removal signal (highest priority)
_ = removal_rx.changed() => {
let signal = removal_rx.borrow().clone(); // Clone to avoid holding borrow across await
_ = ctx.removal_rx.changed() => {
let signal = ctx.removal_rx.borrow().clone(); // Clone to avoid holding borrow across await
match signal {
ThreadRemovalSignal::Running => {
// False alarm - still running
}
_reason => {
// Update status
{
let mut s = status.write().unwrap();
let mut s = ctx.status.write().unwrap();
s.is_active = false;
}

Expand All @@ -738,7 +787,7 @@ async fn bm13xx_thread_actor<R, W>(
}

// Commands from scheduler
Some(cmd) = cmd_rx.recv() => {
Some(cmd) = ctx.cmd_rx.recv() => {
match cmd {
ThreadCommand::UpdateTask { new_task, response_tx } => {
if let Some(ref old) = current_task {
Expand All @@ -753,7 +802,13 @@ async fn bm13xx_thread_actor<R, W>(

if !chip_initialized {
trace!("Initializing chip on first assignment.");
if let Err(e) = initialize_chip(&mut chip_commands, &mut peripherals).await {
if let Err(e) = initialize_chip(
&mut chip_commands,
&mut peripherals,
operating_frequency_mhz,
)
.await
{
error!(error = %e, "Chip initialization failed");
response_tx.send(Err(e)).ok();
continue;
Expand Down Expand Up @@ -784,7 +839,7 @@ async fn bm13xx_thread_actor<R, W>(
}

{
let mut s = status.write().unwrap();
let mut s = ctx.status.write().unwrap();
s.is_active = true;
}

Expand All @@ -804,7 +859,13 @@ async fn bm13xx_thread_actor<R, W>(

if !chip_initialized {
trace!("Initializing chip on first assignment.");
if let Err(e) = initialize_chip(&mut chip_commands, &mut peripherals).await {
if let Err(e) = initialize_chip(
&mut chip_commands,
&mut peripherals,
operating_frequency_mhz,
)
.await
{
error!(error = %e, "Chip initialization failed");
response_tx.send(Err(e)).ok();
continue;
Expand Down Expand Up @@ -838,7 +899,7 @@ async fn bm13xx_thread_actor<R, W>(
}

{
let mut s = status.write().unwrap();
let mut s = ctx.status.write().unwrap();
s.is_active = true;
}

Expand All @@ -851,7 +912,7 @@ async fn bm13xx_thread_actor<R, W>(
let old_task = current_task.take();

{
let mut s = status.write().unwrap();
let mut s = ctx.status.write().unwrap();
s.is_active = false;
}

Expand Down Expand Up @@ -983,6 +1044,78 @@ async fn bm13xx_thread_actor<R, W>(
}
}
}

// Handle thermal frequency adjustment commands
Some(cmd) = recv_frequency_command_if_ready(&mut frequency_command_rx, chip_initialized) => {
use protocol::{Command, Frequency, Register};

let old_frequency_mhz = current_frequency_mhz;

debug!(
command = ?cmd,
current_freq_mhz = %current_frequency_mhz,
"Thermal frequency command received"
);

let new_frequency_mhz = match cmd {
FrequencyCommand::BumpUp => current_frequency_mhz + protocol::Frequency::CRYSTAL_MHZ,
FrequencyCommand::BumpDown => current_frequency_mhz - protocol::Frequency::CRYSTAL_MHZ,
};

let clamped_frequency_mhz = new_frequency_mhz
.clamp(protocol::Frequency::MIN_MHZ, protocol::Frequency::MAX_MHZ);

match Frequency::from_mhz(clamped_frequency_mhz) {
Err(e) => {
warn!(
error = %e,
requested_freq = clamped_frequency_mhz,
"Invalid frequency requested"
);
}
Ok(frequency) => {
let pll_config = frequency.calculate_pll();

trace!(
freq_mhz = %clamped_frequency_mhz,
pll = ?pll_config,
"Writing PLL divider register"
);

let timeout_result = tokio::time::timeout(
FREQUENCY_WRITE_TIMEOUT,
chip_commands.send(Command::WriteRegister {
broadcast: true,
chip_address: 0x00,
register: Register::PllDivider(pll_config),
}),
)
.await;

match timeout_result {
Ok(write_result) => match write_result {
Ok(()) => {
current_frequency_mhz = clamped_frequency_mhz;
debug!(
old_freq = old_frequency_mhz,
new_freq = current_frequency_mhz,
"Frequency adjusted"
);
}
Err(e) => {
error!(error = ?e, "Failed to send PllDivider register write");
}
},
Err(_) => {
error!(
timeout_ms = %FREQUENCY_WRITE_TIMEOUT.as_millis(),
"Frequency register write timed out"
);
}
}
}
}
}
}
}

Expand Down
Loading