diff --git a/rust/hermes-ipfs/Cargo.toml b/rust/hermes-ipfs/Cargo.toml index 7a2e2d062a..35880ac6e3 100644 --- a/rust/hermes-ipfs/Cargo.toml +++ b/rust/hermes-ipfs/Cargo.toml @@ -30,6 +30,8 @@ connexa = { version = "0.4.1", features = ["full"] } minicbor = { version = "0.25.1", features = ["alloc", "derive", "half"], optional = true } ed25519-dalek = { version = "2.1.1", optional = true} catalyst-types = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.11", optional = true } +tracing = "0.1.43" +rand = "0.9.2" blake3 = { version = "1.8.2", optional = true } [dev-dependencies] @@ -39,4 +41,3 @@ dirs = "6.0.0" lipsum = "0.9.1" rustyline-async = "0.4.5" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -rand = "0.9.0" diff --git a/rust/hermes-ipfs/src/doc_sync/mod.rs b/rust/hermes-ipfs/src/doc_sync/mod.rs index 96ea5e903c..b50c247bc1 100644 --- a/rust/hermes-ipfs/src/doc_sync/mod.rs +++ b/rust/hermes-ipfs/src/doc_sync/mod.rs @@ -4,6 +4,7 @@ mod envelope; mod state_machine; pub mod payload; +pub mod timers; use ed25519_dalek::VerifyingKey; pub use envelope::{Envelope, EnvelopePayload}; diff --git a/rust/hermes-ipfs/src/doc_sync/timers/config.rs b/rust/hermes-ipfs/src/doc_sync/timers/config.rs new file mode 100644 index 0000000000..cbcad41346 --- /dev/null +++ b/rust/hermes-ipfs/src/doc_sync/timers/config.rs @@ -0,0 +1,95 @@ +//! Timer configuration definitions (jitter ranges, quiet-period bounds). +//! The configuration is derived from: + +use std::{convert::TryFrom, ops::Range, time::Duration}; + +use rand::Rng; + +/// Minimum backoff/jitter before sending .syn: uniform random in ms. +const T_SYN_MIN_MS: u64 = 200; +/// Maximum backoff/jitter before sending .syn: uniform random in ms. +const T_SYN_MAX_MS: u64 = 800; + +/// Default range backoff/jitter before sending .syn. +/// Spec: [200ms, 800ms]. +const T_SYN_RANGE: Range = + Duration::from_millis(T_SYN_MIN_MS)..Duration::from_millis(T_SYN_MAX_MS); + +/// Minimum responder jitter before publishing .dif (and .prv) in ms. +const R_MIN_MS: u64 = 50; +/// Maximum responder jitter before publishing .dif (and .prv) in ms. +const R_MAX_MS: u64 = 250; + +/// Default responder jitter range. +/// Spec: [50ms, 250ms]. +const RESPONDER_RANGE: Range = + Duration::from_millis(R_MIN_MS)..Duration::from_millis(R_MAX_MS); + +/// Minimum quiet period in seconds for .new re-announce. +const T_QUIET_MIN_SEC: u64 = 20; +/// Maximum quiet period in seconds for .new re-announce. +const T_QUIET_MAX_SEC: u64 = 60; + +/// Default quiet period range. +/// Spec: [20s, 60s]. +const T_QUIET_RANGE: Range = + Duration::from_secs(T_QUIET_MIN_SEC)..Duration::from_secs(T_QUIET_MAX_SEC); + +/// Configuration for all Document Sync timers per topic channel. +#[derive(Debug, Clone)] +pub struct SyncTimersConfig { + /// Jitter range for sending .syn + pub syn_jitter: Range, + /// Jitter range for responding with .dif (or .prv) + pub responder_jitter: Range, + /// Quiet period re-announcement range for .new + pub quiet_period: Range, +} + +impl Default for SyncTimersConfig { + fn default() -> Self { + Self { + syn_jitter: T_SYN_RANGE, + responder_jitter: RESPONDER_RANGE, + quiet_period: T_QUIET_RANGE, + } + } +} + +impl SyncTimersConfig { + /// Generates backoff/jitter for .syn + #[must_use] + pub fn random_syn_jitter(&self) -> Duration { + Self::random_duration(&self.syn_jitter) + } + + /// Generates responder jitter for .dif (and .prv) + #[must_use] + pub fn random_responder_jitter(&self) -> Duration { + Self::random_duration(&self.responder_jitter) + } + + /// Generates quiet period + #[must_use] + pub fn random_quiet(&self) -> Duration { + Self::random_duration(&self.quiet_period) + } + + /// Helper to generate a random duration within a specific Range. + fn random_duration(range: &Range) -> Duration { + let min_ms = Self::duration_millis(range.start); + let max_ms = Self::duration_millis(range.end); + + if min_ms >= max_ms { + return range.start; + } + + let millis = rand::rng().random_range(min_ms..max_ms); + Duration::from_millis(millis) + } + + /// Duration converter to `u64` milliseconds. + fn duration_millis(duration: Duration) -> u64 { + u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) + } +} diff --git a/rust/hermes-ipfs/src/doc_sync/timers/mod.rs b/rust/hermes-ipfs/src/doc_sync/timers/mod.rs new file mode 100644 index 0000000000..73cd417fa9 --- /dev/null +++ b/rust/hermes-ipfs/src/doc_sync/timers/mod.rs @@ -0,0 +1,4 @@ +//! Timers for document synchronization. + +pub mod config; +pub mod state; diff --git a/rust/hermes-ipfs/src/doc_sync/timers/state.rs b/rust/hermes-ipfs/src/doc_sync/timers/state.rs new file mode 100644 index 0000000000..f4fe811de5 --- /dev/null +++ b/rust/hermes-ipfs/src/doc_sync/timers/state.rs @@ -0,0 +1,233 @@ +//! Timer states - quiet keepalive and jitter scheduling. + +use std::{sync::Arc, thread::JoinHandle}; + +use tokio::sync::{Mutex, Notify}; + +use super::config::SyncTimersConfig; + +/// Keepalive callback type +type KeepaliveCallback = Arc Result<(), anyhow::Error> + Send + Sync>; + +/// State managing timers (Quiet period background task + helpers for one-off jitters). +pub struct SyncTimersState { + /// Timer configuration + cfg: SyncTimersConfig, + /// Callback to invoke when a keepalive is sent. + send_new_keepalive: KeepaliveCallback, + /// Handle for the background quiet period keepalive task. + keepalive_task: Mutex>>, + /// Notification for resetting the quiet timer. + reset_new_notify: Notify, + /// Notification for shutting down the quiet timer. + shutdown_notify: Notify, +} + +impl SyncTimersState { + /// Create a timer state. + pub fn new( + cfg: SyncTimersConfig, + send_new_keepalive: KeepaliveCallback, + ) -> Arc { + Arc::new(Self { + cfg, + send_new_keepalive, + keepalive_task: Mutex::new(None), + reset_new_notify: Notify::new(), + shutdown_notify: Notify::new(), + }) + } + + /// Async sleep for the .syn jitter duration. + /// Call before sending .syn. + pub async fn wait_syn_backoff(&self) { + let dur = self.cfg.random_syn_jitter(); + tokio::time::sleep(dur).await; + } + + /// Async sleep for the .dif (or .prv) responder jitter duration. + /// Call before publishing .dif/.prv. + pub async fn wait_responder_jitter(&self) { + let dur = self.cfg.random_responder_jitter(); + tokio::time::sleep(dur).await; + } + + /// Start the quiet period keepalive timer for .new topic + pub fn start_quiet_timer(self: &Arc) { + // Check whether the task is already running + let mut task_guard = self.keepalive_task.blocking_lock(); + if task_guard.is_some() { + tracing::trace!("Quiet timer already running"); + return; + } + + let this = self.clone(); + let handle = std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + { + Ok(rt) => rt, + Err(e) => { + tracing::error!("Failed to build Tokio runtime for quiet timer: {e}"); + return; + }, + }; + rt.block_on(async move { + this.run_quiet_timer().await; + }); + }); + *task_guard = Some(handle); + } + + /// Background loop that waits for the quiet period, emits keepalive, and listen to + /// reset notifications. + async fn run_quiet_timer(&self) { + loop { + // Pick a random duration for THIS cycle + let sleep_dur = self.cfg.random_quiet(); + + tokio::select! { + // Case A: Timer expired (send keepalive) + () = tokio::time::sleep(sleep_dur) => { + // Using spawn_blocking because the callback may perform blocking operations + let result = tokio::task::spawn_blocking({ + let callback = self.send_new_keepalive.clone(); + move || callback() + }).await; + + match result { + Ok(Ok(())) => tracing::debug!("Keepalive sent"), + Ok(Err(e)) => tracing::warn!("Keepalive failed: {e:?}"), + Err(e) => tracing::error!("Keepalive task failed: {e:?}"), + } + // Loop restarts -> New random duration picked + } + // Case B: Timer reset triggered (.new observed externally) + () = self.reset_new_notify.notified() => { + tracing::trace!("Quiet timer reset"); + // Loop restarts immediately -> New random duration picked + } + // Shutdown triggered + () = self.shutdown_notify.notified() => { + tracing::trace!("Quiet timer shutting down"); + break; + } + } + } + } + + /// Reset quiet period timer (call on every received or posted .new) + pub fn reset_quiet_timer(&self) { + // Notify the background task to reset its sleep loop + self.reset_new_notify.notify_one(); + } + + /// Stop the quiet period background task, if running. + pub fn stop_quiet_timer(&self) { + self.shutdown_notify.notify_one(); + self.keepalive_task + .blocking_lock() + .take() + .map(JoinHandle::join); + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::atomic::{AtomicU32, Ordering}, + time::Duration, + }; + + use super::*; + + fn timer_config() -> SyncTimersConfig { + SyncTimersConfig { + quiet_period: Duration::from_millis(100)..Duration::from_millis(100), + ..Default::default() + } + } + #[test] + fn test_start_quiet_timer_count_keepalive() { + // Track how many times the callback is called + let callback_count = Arc::new(AtomicU32::new(0)); + let callback: KeepaliveCallback = Arc::new({ + let counter = callback_count.clone(); + move || { + counter.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + }); + + let state = SyncTimersState::new(timer_config(), callback); + state.start_quiet_timer(); + + // Trigger 2 keepalive + std::thread::sleep(Duration::from_millis(250)); + state.reset_quiet_timer(); + // Trigger 1 keepalive + std::thread::sleep(Duration::from_millis(120)); + + let count = callback_count.load(Ordering::Relaxed); + assert_eq!( + count, 3, + "Expected callback to be called 3 times, got {count}" + ); + } + + #[test] + fn test_double_start_no_duplicate() { + let callback_count = Arc::new(AtomicU32::new(0)); + let callback: KeepaliveCallback = Arc::new({ + let counter = callback_count.clone(); + move || { + counter.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + }); + let state = SyncTimersState::new(timer_config(), callback); + + // Start twice + state.start_quiet_timer(); + // Should ignore + // If log is enabled, this should show "Quiet timer already running" + state.start_quiet_timer(); + + // Wait for keepalive + std::thread::sleep(Duration::from_millis(120)); + + let count = callback_count.load(Ordering::Relaxed); + assert_eq!( + count, 1, + "Expected callback to be called 1 times, got {count}" + ); + } + + #[test] + fn test_start_stop() { + let callback_count = Arc::new(AtomicU32::new(0)); + let callback: KeepaliveCallback = Arc::new({ + let counter = callback_count.clone(); + move || { + counter.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + }); + + let state = SyncTimersState::new(timer_config(), callback); + + // Start timer -> timer expired (count + 1) -> stop timer -> + // timer expired (this should not do anything since the timer is stopped) + state.start_quiet_timer(); + std::thread::sleep(Duration::from_millis(120)); + state.stop_quiet_timer(); + assert!(state.keepalive_task.blocking_lock().is_none()); + std::thread::sleep(Duration::from_millis(120)); + let count = callback_count.load(Ordering::Relaxed); + assert_eq!( + count, 1, + "Expected callback to be called 1 times, got {count}" + ); + } +}