From 9a9e8ea6361fd370705074e03b71a05bdf5dc52e Mon Sep 17 00:00:00 2001 From: bkioshn Date: Wed, 10 Dec 2025 13:52:55 +0700 Subject: [PATCH 1/9] implement doc sync timer Signed-off-by: bkioshn --- rust/hermes-ipfs/src/doc_sync/mod.rs | 1 + .../hermes-ipfs/src/doc_sync/timers/config.rs | 92 ++++++++++ rust/hermes-ipfs/src/doc_sync/timers/mod.rs | 4 + rust/hermes-ipfs/src/doc_sync/timers/state.rs | 172 ++++++++++++++++++ 4 files changed, 269 insertions(+) create mode 100644 rust/hermes-ipfs/src/doc_sync/timers/config.rs create mode 100644 rust/hermes-ipfs/src/doc_sync/timers/mod.rs create mode 100644 rust/hermes-ipfs/src/doc_sync/timers/state.rs diff --git a/rust/hermes-ipfs/src/doc_sync/mod.rs b/rust/hermes-ipfs/src/doc_sync/mod.rs index b98ca70955..08a6384882 100644 --- a/rust/hermes-ipfs/src/doc_sync/mod.rs +++ b/rust/hermes-ipfs/src/doc_sync/mod.rs @@ -3,6 +3,7 @@ mod envelope; pub mod payload; +pub mod timers; pub use envelope::{Envelope, EnvelopePayload}; diff --git a/rust/hermes-ipfs/src/doc_sync/timers/config.rs b/rust/hermes-ipfs/src/doc_sync/timers/config.rs new file mode 100644 index 0000000000..9043ed58a0 --- /dev/null +++ b/rust/hermes-ipfs/src/doc_sync/timers/config.rs @@ -0,0 +1,92 @@ +//! Timer configuration definitions (jitter ranges, quiet-period bounds). +//! The configuration is derived from: + +use std::{convert::TryFrom, ops::Range, time::Duration}; + +use rand::Rng; + +/// Minimum backoff/jitter before sending .syn: uniform random in ms. +const T_SYN_MIN_MS: u64 = 200; +/// Maximum backoff/jitter before sending .syn: uniform random in ms. +const T_SYN_MAX_MS: u64 = 800; + +/// Default range backoff/jitter before sending .syn. +/// Spec: [200ms, 800ms]. +const T_SYN_RANGE: Range = + Duration::from_millis(T_SYN_MIN_MS)..Duration::from_millis(T_SYN_MAX_MS); + +/// Minimum responder jitter before publishing .dif (and .prv) in ms. +const R_MIN_MS: u64 = 50; +/// Maximum responder jitter before publishing .dif (and .prv) in ms. +const R_MAX_MS: u64 = 250; + +/// Default responder jitter range. +/// Spec: [50ms, 250ms]. +const RESPONDER_RANGE: Range = + Duration::from_millis(R_MIN_MS)..Duration::from_millis(R_MAX_MS); + +/// Minimum quiet period in seconds for .new re-announce. +const T_QUIET_MIN_SEC: u64 = 20; +/// Maximum quiet period in seconds for .new re-announce. +const T_QUIET_MAX_SEC: u64 = 60; + +/// Default quiet period range. +/// Spec: [20s, 60s]. +const T_QUIET_RANGE: Range = + Duration::from_secs(T_QUIET_MIN_SEC)..Duration::from_secs(T_QUIET_MAX_SEC); + +/// Configuration for all Document Sync timers per topic channel. +#[derive(Debug, Clone)] +pub struct SyncTimersConfig { + /// Jitter range for sending .syn + pub syn_jitter: Range, + /// Jitter range for responding with .dif (or .prv) + pub responder_jitter: Range, + /// Quiet period re-announcement range for .new + pub quiet_period: Range, +} + +impl Default for SyncTimersConfig { + fn default() -> Self { + Self { + syn_jitter: T_SYN_RANGE, + responder_jitter: RESPONDER_RANGE, + quiet_period: T_QUIET_RANGE, + } + } +} + +impl SyncTimersConfig { + /// Generates backoff/jitter for .syn + pub fn random_syn_jitter(&self) -> Duration { + Self::random_duration(&self.syn_jitter) + } + + /// Generates responder jitter for .dif (and .prv) + pub fn random_responder_jitter(&self) -> Duration { + Self::random_duration(&self.responder_jitter) + } + + /// Generates quiet period + pub fn random_quiet(&self) -> Duration { + Self::random_duration(&self.quiet_period) + } + + /// Helper to generate a random duration within a specific Range. + fn random_duration(range: &Range) -> Duration { + let min_ms = Self::duration_millis(range.start); + let max_ms = Self::duration_millis(range.end); + + if min_ms >= max_ms { + return range.start; + } + + let millis = rand::rng().random_range(min_ms..max_ms); + Duration::from_millis(millis) + } + + /// Duration converter to `u64` milliseconds. + fn duration_millis(duration: Duration) -> u64 { + u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) + } +} diff --git a/rust/hermes-ipfs/src/doc_sync/timers/mod.rs b/rust/hermes-ipfs/src/doc_sync/timers/mod.rs new file mode 100644 index 0000000000..9d2c4a9179 --- /dev/null +++ b/rust/hermes-ipfs/src/doc_sync/timers/mod.rs @@ -0,0 +1,4 @@ +//! Timers for document synchronization. + +mod config; +pub mod state; diff --git a/rust/hermes-ipfs/src/doc_sync/timers/state.rs b/rust/hermes-ipfs/src/doc_sync/timers/state.rs new file mode 100644 index 0000000000..0e0fbdd7e9 --- /dev/null +++ b/rust/hermes-ipfs/src/doc_sync/timers/state.rs @@ -0,0 +1,172 @@ +//! Timer states - quiet keepalive and jitter scheduling. + +use std::{sync::Arc, thread::JoinHandle}; + +use tokio::sync::{Mutex, Notify}; + +use super::config::SyncTimersConfig; + +/// Keepalive callback type +type KeepaliveCallback = Arc Result<(), anyhow::Error> + Send + Sync>; + +/// State managing timers (Quiet period background task + helpers for one-off jitters). +pub struct SyncTimersState { + /// Timer configuration + cfg: SyncTimersConfig, + /// Callback to invoke when a keepalive is sent. + send_new_keepalive: KeepaliveCallback, + /// Handle for the background quiet-period keepalive task. + keepalive_task: Mutex>>, + /// Notification for resetting the quiet timer. + reset_new_notify: Notify, +} + +impl SyncTimersState { + /// Create a timer state. + pub fn new( + cfg: SyncTimersConfig, + send_new_keepalive: KeepaliveCallback, + ) -> Arc { + Arc::new(Self { + cfg, + send_new_keepalive, + keepalive_task: Mutex::new(None), + reset_new_notify: Notify::new(), + }) + } + + /// Async sleep for the .syn jitter duration. + /// Call before sending .syn. + pub async fn wait_syn_backoff(&self) { + let dur = self.cfg.random_syn_jitter(); + tokio::time::sleep(dur).await; + } + + /// Async sleep for the .dif (or .prv) responder jitter duration. + /// Call before publishing .dif/.prv. + pub async fn wait_responder_jitter(&self) { + let dur = self.cfg.random_responder_jitter(); + tokio::time::sleep(dur).await; + } + + /// Start the quiet period keepalive timer for .new topic + pub fn start_quiet_timer(self: &Arc) { + // Check whether the task is already running + let mut task_guard = self.keepalive_task.blocking_lock(); + if task_guard.is_some() { + tracing::trace!("Quiet timer already running"); + return; + } + + let this = Arc::clone(self); + let handle = std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + { + Ok(rt) => rt, + Err(e) => { + tracing::error!("Failed to build Tokio runtime for quiet timer: {e}"); + return; + }, + }; + rt.block_on(async move { + this.run_quiet_timer().await; + }); + }); + *task_guard = Some(handle); + } + + /// Background loop that waits for the quiet period, emits keepalive, and listen to + /// reset notifications. + async fn run_quiet_timer(&self) { + loop { + // Pick a random duration for THIS cycle + let sleep_dur = self.cfg.random_quiet(); + + tokio::select! { + // Case A: Timer expired (send keepalive) + () = tokio::time::sleep(sleep_dur) => { + // Using spawn_blocking because the callback may perform blocking operations + let result = tokio::task::spawn_blocking({ + let callback = self.send_new_keepalive.clone(); + move || callback() + }).await; + + match result { + Ok(Ok(())) => tracing::debug!("Keepalive sent"), + Ok(Err(e)) => tracing::warn!("Keepalive failed: {e:?}"), + Err(e) => tracing::error!("Keepalive task failed: {e:?}"), + } + // Loop restarts -> New random duration picked + } + // Case B: Timer reset triggered (.new observed externally) + () = self.reset_new_notify.notified() => { + tracing::trace!("Quiet timer reset"); + // Loop restarts immediately -> New random duration picked + } + } + } + } + + /// Reset quiet-period timer (call on every received or posted .new) + pub fn reset_quiet_timer(&self) { + tracing::trace!("Resetting quiet timer"); + // Notify the background task to reset its sleep loop + self.reset_new_notify.notify_waiters(); + } + + /// Stop the quiet-period background task, if running. + pub fn stop_quiet_timer(&self) { + self.keepalive_task.blocking_lock().take(); + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::atomic::{AtomicU32, Ordering}, + time::Duration, + }; + + use super::*; + + fn timer_config() -> SyncTimersConfig { + SyncTimersConfig { + quiet_period: Duration::from_millis(100)..Duration::from_millis(100), + ..Default::default() + } + } + #[tokio::test] + async fn test_start_quiet_timer_count_keepalive() { + // Track how many times the callback is called + let callback_count = Arc::new(AtomicU32::new(0)).clone(); + let callback: KeepaliveCallback = Arc::new({ + let counter = callback_count.clone(); + move || { + counter.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + }); + + let state = SyncTimersState::new(timer_config(), callback); + let state_clone = state.clone(); + tokio::task::spawn_blocking(move || { + state_clone.start_quiet_timer(); + }) + .await + .unwrap(); + + // Trigger 2 keepalive + tokio::time::sleep(Duration::from_millis(250)).await; + state.reset_quiet_timer(); + // Trigger 1 keepalive + tokio::time::sleep(Duration::from_millis(120)).await; + + let count = callback_count.load(Ordering::Relaxed); + assert!( + count == 3, + "Expected callback to be called 3 times, got {count}" + ); + } +} From 3c5019a36feab9460a4a967db954cb28c246d027 Mon Sep 17 00:00:00 2001 From: bkioshn Date: Wed, 10 Dec 2025 14:20:02 +0700 Subject: [PATCH 2/9] update deps Signed-off-by: bkioshn --- rust/hermes-ipfs/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/hermes-ipfs/Cargo.toml b/rust/hermes-ipfs/Cargo.toml index a3c25b6200..3ecadf9170 100644 --- a/rust/hermes-ipfs/Cargo.toml +++ b/rust/hermes-ipfs/Cargo.toml @@ -30,6 +30,8 @@ connexa = { version = "0.4.1", features = ["full"] } minicbor = { version = "0.25.1", features = ["alloc"], optional = true } ed25519-dalek = { version = "2.1.1", optional = true} catalyst-types = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.11", optional = true } +tracing = "0.1.43" +rand = "0.9.0" [dev-dependencies] # Dependencies used by examples @@ -38,4 +40,3 @@ dirs = "6.0.0" lipsum = "0.9.1" rustyline-async = "0.4.5" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -rand = "0.9.0" From 92903b1536c038ef78e3e9e5e7aba07f9a44ed6a Mon Sep 17 00:00:00 2001 From: bkioshn Date: Wed, 10 Dec 2025 14:40:37 +0700 Subject: [PATCH 3/9] expose timer config Signed-off-by: bkioshn --- rust/hermes-ipfs/src/doc_sync/timers/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/hermes-ipfs/src/doc_sync/timers/mod.rs b/rust/hermes-ipfs/src/doc_sync/timers/mod.rs index 9d2c4a9179..73cd417fa9 100644 --- a/rust/hermes-ipfs/src/doc_sync/timers/mod.rs +++ b/rust/hermes-ipfs/src/doc_sync/timers/mod.rs @@ -1,4 +1,4 @@ //! Timers for document synchronization. -mod config; +pub mod config; pub mod state; From f18bca112206c8435b08f80e79af01ce65e072d0 Mon Sep 17 00:00:00 2001 From: bkioshn Date: Wed, 10 Dec 2025 15:58:20 +0700 Subject: [PATCH 4/9] minor fix Signed-off-by: bkioshn --- rust/hermes-ipfs/src/doc_sync/timers/config.rs | 3 +++ rust/hermes-ipfs/src/doc_sync/timers/state.rs | 8 ++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/rust/hermes-ipfs/src/doc_sync/timers/config.rs b/rust/hermes-ipfs/src/doc_sync/timers/config.rs index 9043ed58a0..cbcad41346 100644 --- a/rust/hermes-ipfs/src/doc_sync/timers/config.rs +++ b/rust/hermes-ipfs/src/doc_sync/timers/config.rs @@ -58,16 +58,19 @@ impl Default for SyncTimersConfig { impl SyncTimersConfig { /// Generates backoff/jitter for .syn + #[must_use] pub fn random_syn_jitter(&self) -> Duration { Self::random_duration(&self.syn_jitter) } /// Generates responder jitter for .dif (and .prv) + #[must_use] pub fn random_responder_jitter(&self) -> Duration { Self::random_duration(&self.responder_jitter) } /// Generates quiet period + #[must_use] pub fn random_quiet(&self) -> Duration { Self::random_duration(&self.quiet_period) } diff --git a/rust/hermes-ipfs/src/doc_sync/timers/state.rs b/rust/hermes-ipfs/src/doc_sync/timers/state.rs index 0e0fbdd7e9..47f965e4fd 100644 --- a/rust/hermes-ipfs/src/doc_sync/timers/state.rs +++ b/rust/hermes-ipfs/src/doc_sync/timers/state.rs @@ -15,7 +15,7 @@ pub struct SyncTimersState { cfg: SyncTimersConfig, /// Callback to invoke when a keepalive is sent. send_new_keepalive: KeepaliveCallback, - /// Handle for the background quiet-period keepalive task. + /// Handle for the background quiet period keepalive task. keepalive_task: Mutex>>, /// Notification for resetting the quiet timer. reset_new_notify: Notify, @@ -58,7 +58,7 @@ impl SyncTimersState { return; } - let this = Arc::clone(self); + let this = self.clone(); let handle = std::thread::spawn(move || { let rt = match tokio::runtime::Builder::new_current_thread() .enable_time() @@ -109,14 +109,14 @@ impl SyncTimersState { } } - /// Reset quiet-period timer (call on every received or posted .new) + /// Reset quiet period timer (call on every received or posted .new) pub fn reset_quiet_timer(&self) { tracing::trace!("Resetting quiet timer"); // Notify the background task to reset its sleep loop self.reset_new_notify.notify_waiters(); } - /// Stop the quiet-period background task, if running. + /// Stop the quiet period background task, if running. pub fn stop_quiet_timer(&self) { self.keepalive_task.blocking_lock().take(); } From 311cab0801b49ec083e4fcc0b8a9ed190809a5ff Mon Sep 17 00:00:00 2001 From: bkioshn Date: Wed, 10 Dec 2025 18:05:24 +0700 Subject: [PATCH 5/9] use generic fn object callback Signed-off-by: bkioshn --- rust/hermes-ipfs/src/doc_sync/timers/state.rs | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/rust/hermes-ipfs/src/doc_sync/timers/state.rs b/rust/hermes-ipfs/src/doc_sync/timers/state.rs index 47f965e4fd..db77d0e915 100644 --- a/rust/hermes-ipfs/src/doc_sync/timers/state.rs +++ b/rust/hermes-ipfs/src/doc_sync/timers/state.rs @@ -6,30 +6,31 @@ use tokio::sync::{Mutex, Notify}; use super::config::SyncTimersConfig; -/// Keepalive callback type -type KeepaliveCallback = Arc Result<(), anyhow::Error> + Send + Sync>; - /// State managing timers (Quiet period background task + helpers for one-off jitters). -pub struct SyncTimersState { +pub struct SyncTimersState +where F: Fn() -> Result<(), anyhow::Error> + Send + Sync + 'static +{ /// Timer configuration cfg: SyncTimersConfig, /// Callback to invoke when a keepalive is sent. - send_new_keepalive: KeepaliveCallback, + send_new_keepalive: Arc, /// Handle for the background quiet period keepalive task. keepalive_task: Mutex>>, /// Notification for resetting the quiet timer. reset_new_notify: Notify, } -impl SyncTimersState { +impl SyncTimersState +where F: Fn() -> Result<(), anyhow::Error> + Send + Sync + 'static +{ /// Create a timer state. pub fn new( cfg: SyncTimersConfig, - send_new_keepalive: KeepaliveCallback, + send_new_keepalive: F, ) -> Arc { Arc::new(Self { cfg, - send_new_keepalive, + send_new_keepalive: Arc::new(send_new_keepalive), keepalive_task: Mutex::new(None), reset_new_notify: Notify::new(), }) @@ -141,13 +142,13 @@ mod tests { async fn test_start_quiet_timer_count_keepalive() { // Track how many times the callback is called let callback_count = Arc::new(AtomicU32::new(0)).clone(); - let callback: KeepaliveCallback = Arc::new({ + let callback = { let counter = callback_count.clone(); move || { counter.fetch_add(1, Ordering::Relaxed); Ok(()) } - }); + }; let state = SyncTimersState::new(timer_config(), callback); let state_clone = state.clone(); From 2b7dfeea967d78498cd011df35de55e49d3d1859 Mon Sep 17 00:00:00 2001 From: bkioshn Date: Wed, 10 Dec 2025 18:30:28 +0700 Subject: [PATCH 6/9] Revert "use generic fn object callback" This reverts commit 311cab0801b49ec083e4fcc0b8a9ed190809a5ff. --- rust/hermes-ipfs/src/doc_sync/timers/state.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/rust/hermes-ipfs/src/doc_sync/timers/state.rs b/rust/hermes-ipfs/src/doc_sync/timers/state.rs index db77d0e915..47f965e4fd 100644 --- a/rust/hermes-ipfs/src/doc_sync/timers/state.rs +++ b/rust/hermes-ipfs/src/doc_sync/timers/state.rs @@ -6,31 +6,30 @@ use tokio::sync::{Mutex, Notify}; use super::config::SyncTimersConfig; +/// Keepalive callback type +type KeepaliveCallback = Arc Result<(), anyhow::Error> + Send + Sync>; + /// State managing timers (Quiet period background task + helpers for one-off jitters). -pub struct SyncTimersState -where F: Fn() -> Result<(), anyhow::Error> + Send + Sync + 'static -{ +pub struct SyncTimersState { /// Timer configuration cfg: SyncTimersConfig, /// Callback to invoke when a keepalive is sent. - send_new_keepalive: Arc, + send_new_keepalive: KeepaliveCallback, /// Handle for the background quiet period keepalive task. keepalive_task: Mutex>>, /// Notification for resetting the quiet timer. reset_new_notify: Notify, } -impl SyncTimersState -where F: Fn() -> Result<(), anyhow::Error> + Send + Sync + 'static -{ +impl SyncTimersState { /// Create a timer state. pub fn new( cfg: SyncTimersConfig, - send_new_keepalive: F, + send_new_keepalive: KeepaliveCallback, ) -> Arc { Arc::new(Self { cfg, - send_new_keepalive: Arc::new(send_new_keepalive), + send_new_keepalive, keepalive_task: Mutex::new(None), reset_new_notify: Notify::new(), }) @@ -142,13 +141,13 @@ mod tests { async fn test_start_quiet_timer_count_keepalive() { // Track how many times the callback is called let callback_count = Arc::new(AtomicU32::new(0)).clone(); - let callback = { + let callback: KeepaliveCallback = Arc::new({ let counter = callback_count.clone(); move || { counter.fetch_add(1, Ordering::Relaxed); Ok(()) } - }; + }); let state = SyncTimersState::new(timer_config(), callback); let state_clone = state.clone(); From c93d3b81b87d60170a8d2ec9cdfdbfb7252db86e Mon Sep 17 00:00:00 2001 From: bkioshn Date: Thu, 11 Dec 2025 09:11:40 +0700 Subject: [PATCH 7/9] update rand Signed-off-by: bkioshn --- rust/hermes-ipfs/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/hermes-ipfs/Cargo.toml b/rust/hermes-ipfs/Cargo.toml index 3ecadf9170..7e302d07c4 100644 --- a/rust/hermes-ipfs/Cargo.toml +++ b/rust/hermes-ipfs/Cargo.toml @@ -31,7 +31,7 @@ minicbor = { version = "0.25.1", features = ["alloc"], optional = true } ed25519-dalek = { version = "2.1.1", optional = true} catalyst-types = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.11", optional = true } tracing = "0.1.43" -rand = "0.9.0" +rand = "0.9.2" [dev-dependencies] # Dependencies used by examples From 5f78a329c8f75e9f0968ebd99cea7876b88d08de Mon Sep 17 00:00:00 2001 From: bkioshn Date: Thu, 11 Dec 2025 10:09:59 +0700 Subject: [PATCH 8/9] test dep Signed-off-by: bkioshn --- rust/hermes-ipfs/Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/hermes-ipfs/Cargo.toml b/rust/hermes-ipfs/Cargo.toml index 7e302d07c4..39a3c0e220 100644 --- a/rust/hermes-ipfs/Cargo.toml +++ b/rust/hermes-ipfs/Cargo.toml @@ -30,8 +30,6 @@ connexa = { version = "0.4.1", features = ["full"] } minicbor = { version = "0.25.1", features = ["alloc"], optional = true } ed25519-dalek = { version = "2.1.1", optional = true} catalyst-types = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.11", optional = true } -tracing = "0.1.43" -rand = "0.9.2" [dev-dependencies] # Dependencies used by examples From ef68c36a327a9e525f753f16313162662f81f81b Mon Sep 17 00:00:00 2001 From: bkioshn Date: Thu, 11 Dec 2025 10:17:38 +0700 Subject: [PATCH 9/9] test dep Signed-off-by: bkioshn --- rust/hermes-ipfs/Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/hermes-ipfs/Cargo.toml b/rust/hermes-ipfs/Cargo.toml index 39a3c0e220..7e302d07c4 100644 --- a/rust/hermes-ipfs/Cargo.toml +++ b/rust/hermes-ipfs/Cargo.toml @@ -30,6 +30,8 @@ connexa = { version = "0.4.1", features = ["full"] } minicbor = { version = "0.25.1", features = ["alloc"], optional = true } ed25519-dalek = { version = "2.1.1", optional = true} catalyst-types = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.11", optional = true } +tracing = "0.1.43" +rand = "0.9.2" [dev-dependencies] # Dependencies used by examples