Skip to content
3 changes: 2 additions & 1 deletion rust/hermes-ipfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ connexa = { version = "0.4.1", features = ["full"] }
minicbor = { version = "0.25.1", features = ["alloc"], optional = true }
ed25519-dalek = { version = "2.1.1", optional = true}
catalyst-types = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.11", optional = true }
tracing = "0.1.43"
rand = "0.9.2"

[dev-dependencies]
# Dependencies used by examples
Expand All @@ -38,4 +40,3 @@ dirs = "6.0.0"
lipsum = "0.9.1"
rustyline-async = "0.4.5"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
rand = "0.9.0"
1 change: 1 addition & 0 deletions rust/hermes-ipfs/src/doc_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod envelope;

pub mod payload;
pub mod timers;

pub use envelope::{Envelope, EnvelopePayload};

Expand Down
95 changes: 95 additions & 0 deletions rust/hermes-ipfs/src/doc_sync/timers/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//! Timer configuration definitions (jitter ranges, quiet-period bounds).
//! The configuration is derived from: <https://github.com/input-output-hk/hermes/blob/main/docs/src/architecture/08_concepts/document_sync/protocol_spec.md#timers-and-retries>

use std::{convert::TryFrom, ops::Range, time::Duration};

use rand::Rng;

/// Minimum backoff/jitter before sending .syn: uniform random in ms.
const T_SYN_MIN_MS: u64 = 200;
/// Maximum backoff/jitter before sending .syn: uniform random in ms.
const T_SYN_MAX_MS: u64 = 800;

/// Default range backoff/jitter before sending .syn.
/// Spec: [200ms, 800ms].
const T_SYN_RANGE: Range<Duration> =
Duration::from_millis(T_SYN_MIN_MS)..Duration::from_millis(T_SYN_MAX_MS);

/// Minimum responder jitter before publishing .dif (and .prv) in ms.
const R_MIN_MS: u64 = 50;
/// Maximum responder jitter before publishing .dif (and .prv) in ms.
const R_MAX_MS: u64 = 250;

/// Default responder jitter range.
/// Spec: [50ms, 250ms].
const RESPONDER_RANGE: Range<Duration> =
Duration::from_millis(R_MIN_MS)..Duration::from_millis(R_MAX_MS);

/// Minimum quiet period in seconds for .new re-announce.
const T_QUIET_MIN_SEC: u64 = 20;
/// Maximum quiet period in seconds for .new re-announce.
const T_QUIET_MAX_SEC: u64 = 60;

/// Default quiet period range.
/// Spec: [20s, 60s].
const T_QUIET_RANGE: Range<Duration> =
Duration::from_secs(T_QUIET_MIN_SEC)..Duration::from_secs(T_QUIET_MAX_SEC);

/// Configuration for all Document Sync timers per topic channel.
#[derive(Debug, Clone)]
pub struct SyncTimersConfig {
/// Jitter range for sending .syn
pub syn_jitter: Range<Duration>,
/// Jitter range for responding with .dif (or .prv)
pub responder_jitter: Range<Duration>,
/// Quiet period re-announcement range for .new
pub quiet_period: Range<Duration>,
}

impl Default for SyncTimersConfig {
fn default() -> Self {
Self {
syn_jitter: T_SYN_RANGE,
responder_jitter: RESPONDER_RANGE,
quiet_period: T_QUIET_RANGE,
}
}
}

impl SyncTimersConfig {
/// Generates backoff/jitter for .syn
#[must_use]
pub fn random_syn_jitter(&self) -> Duration {
Self::random_duration(&self.syn_jitter)
}

/// Generates responder jitter for .dif (and .prv)
#[must_use]
pub fn random_responder_jitter(&self) -> Duration {
Self::random_duration(&self.responder_jitter)
}

/// Generates quiet period
#[must_use]
pub fn random_quiet(&self) -> Duration {
Self::random_duration(&self.quiet_period)
}

/// Helper to generate a random duration within a specific Range.
fn random_duration(range: &Range<Duration>) -> Duration {
let min_ms = Self::duration_millis(range.start);
let max_ms = Self::duration_millis(range.end);

if min_ms >= max_ms {
return range.start;
}

let millis = rand::rng().random_range(min_ms..max_ms);
Duration::from_millis(millis)
}

/// Duration converter to `u64` milliseconds.
fn duration_millis(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
}
4 changes: 4 additions & 0 deletions rust/hermes-ipfs/src/doc_sync/timers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Timers for document synchronization.

pub mod config;
pub mod state;
172 changes: 172 additions & 0 deletions rust/hermes-ipfs/src/doc_sync/timers/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//! Timer states - quiet keepalive and jitter scheduling.

use std::{sync::Arc, thread::JoinHandle};

use tokio::sync::{Mutex, Notify};

use super::config::SyncTimersConfig;

/// Keepalive callback type
type KeepaliveCallback = Arc<dyn Fn() -> Result<(), anyhow::Error> + Send + Sync>;

/// State managing timers (Quiet period background task + helpers for one-off jitters).
pub struct SyncTimersState {
/// Timer configuration
cfg: SyncTimersConfig,
/// Callback to invoke when a keepalive is sent.
send_new_keepalive: KeepaliveCallback,
/// Handle for the background quiet period keepalive task.
keepalive_task: Mutex<Option<JoinHandle<()>>>,
/// Notification for resetting the quiet timer.
reset_new_notify: Notify,
}

impl SyncTimersState {
/// Create a timer state.
pub fn new(
cfg: SyncTimersConfig,
send_new_keepalive: KeepaliveCallback,
) -> Arc<Self> {
Arc::new(Self {
cfg,
send_new_keepalive,
keepalive_task: Mutex::new(None),
reset_new_notify: Notify::new(),
})
}

/// Async sleep for the .syn jitter duration.
/// Call before sending .syn.
pub async fn wait_syn_backoff(&self) {
let dur = self.cfg.random_syn_jitter();
tokio::time::sleep(dur).await;
}

/// Async sleep for the .dif (or .prv) responder jitter duration.
/// Call before publishing .dif/.prv.
pub async fn wait_responder_jitter(&self) {
let dur = self.cfg.random_responder_jitter();
tokio::time::sleep(dur).await;
}

/// Start the quiet period keepalive timer for .new topic
pub fn start_quiet_timer(self: &Arc<Self>) {
// Check whether the task is already running
let mut task_guard = self.keepalive_task.blocking_lock();
if task_guard.is_some() {
tracing::trace!("Quiet timer already running");
return;
}

let this = self.clone();
let handle = std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
{
Ok(rt) => rt,
Err(e) => {
tracing::error!("Failed to build Tokio runtime for quiet timer: {e}");
return;
},
};
rt.block_on(async move {
this.run_quiet_timer().await;
});
});
*task_guard = Some(handle);
}

/// Background loop that waits for the quiet period, emits keepalive, and listen to
/// reset notifications.
async fn run_quiet_timer(&self) {
loop {
// Pick a random duration for THIS cycle
let sleep_dur = self.cfg.random_quiet();

tokio::select! {
// Case A: Timer expired (send keepalive)
() = tokio::time::sleep(sleep_dur) => {
// Using spawn_blocking because the callback may perform blocking operations
let result = tokio::task::spawn_blocking({
let callback = self.send_new_keepalive.clone();
move || callback()
}).await;

match result {
Ok(Ok(())) => tracing::debug!("Keepalive sent"),
Ok(Err(e)) => tracing::warn!("Keepalive failed: {e:?}"),
Err(e) => tracing::error!("Keepalive task failed: {e:?}"),
}
// Loop restarts -> New random duration picked
}
// Case B: Timer reset triggered (.new observed externally)
() = self.reset_new_notify.notified() => {
tracing::trace!("Quiet timer reset");
// Loop restarts immediately -> New random duration picked
}
}
}
}

/// Reset quiet period timer (call on every received or posted .new)
pub fn reset_quiet_timer(&self) {
tracing::trace!("Resetting quiet timer");
// Notify the background task to reset its sleep loop
self.reset_new_notify.notify_waiters();
}

/// Stop the quiet period background task, if running.
pub fn stop_quiet_timer(&self) {
self.keepalive_task.blocking_lock().take();
}
}

#[cfg(test)]
mod tests {
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};

use super::*;

fn timer_config() -> SyncTimersConfig {
SyncTimersConfig {
quiet_period: Duration::from_millis(100)..Duration::from_millis(100),
..Default::default()
}
}
#[tokio::test]
async fn test_start_quiet_timer_count_keepalive() {
// Track how many times the callback is called
let callback_count = Arc::new(AtomicU32::new(0)).clone();
let callback: KeepaliveCallback = Arc::new({
let counter = callback_count.clone();
move || {
counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
});

let state = SyncTimersState::new(timer_config(), callback);
let state_clone = state.clone();
tokio::task::spawn_blocking(move || {
state_clone.start_quiet_timer();
})
.await
.unwrap();

// Trigger 2 keepalive
tokio::time::sleep(Duration::from_millis(250)).await;
state.reset_quiet_timer();
// Trigger 1 keepalive
tokio::time::sleep(Duration::from_millis(120)).await;

let count = callback_count.load(Ordering::Relaxed);
assert!(
count == 3,
"Expected callback to be called 3 times, got {count}"
);
}
}