diff --git a/code/crates/app/src/spawn.rs b/code/crates/app/src/spawn.rs index 12095b1e2..ea346ecea 100644 --- a/code/crates/app/src/spawn.rs +++ b/code/crates/app/src/spawn.rs @@ -179,6 +179,7 @@ where let scoring_strategy = match config.scoring_strategy { malachitebft_config::ScoringStrategy::Ema => sync::scoring::Strategy::Ema, + malachitebft_config::ScoringStrategy::Credit => sync::scoring::Strategy::Credit, }; let sync_config = sync::Config { @@ -188,6 +189,7 @@ where request_timeout: config.request_timeout, parallel_requests: config.parallel_requests as u64, scoring_strategy, + initial_score: config.initial_score, inactive_threshold: (!config.inactive_threshold.is_zero()) .then_some(config.inactive_threshold), batch_size: config.batch_size, diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 3bb3e338b..8533f097e 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -529,6 +529,10 @@ pub struct ValueSyncConfig { #[serde(default)] pub scoring_strategy: ScoringStrategy, + /// Initial score for peers + #[serde(default = "default_initial_score")] + pub initial_score: f64, + /// Threshold for considering a peer inactive #[serde(with = "humantime_serde")] pub inactive_threshold: Duration, @@ -547,23 +551,33 @@ impl Default for ValueSyncConfig { max_response_size: ByteSize::mib(10), parallel_requests: 5, scoring_strategy: ScoringStrategy::default(), + initial_score: default_initial_score(), inactive_threshold: Duration::from_secs(60), batch_size: 5, } } } +fn default_initial_score() -> f64 { + 0.5 +} + +/// Peer scoring strategy #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum ScoringStrategy { + /// Exponential Moving Average #[default] Ema, + /// Credit-based scoring + Credit, } impl ScoringStrategy { pub fn name(&self) -> &'static str { match self { Self::Ema => "ema", + Self::Credit => "credit", } } } @@ -574,7 +588,10 @@ impl FromStr for ScoringStrategy { fn from_str(s: &str) -> Result { match s { "ema" => Ok(Self::Ema), - e => Err(format!("unknown scoring strategy: {e}, available: ema")), + "credit" => Ok(Self::Credit), + e => Err(format!( + "unknown scoring strategy: {e}, available: ema, credit" + )), } } } diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index 60a881db4..e5d27a0dc 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -161,6 +161,7 @@ async fn spawn_sync_actor( let scoring_strategy = match config.scoring_strategy { config::ScoringStrategy::Ema => sync::scoring::Strategy::Ema, + config::ScoringStrategy::Credit => sync::scoring::Strategy::Credit, }; let sync_config = sync::Config { @@ -170,6 +171,7 @@ async fn spawn_sync_actor( request_timeout: config.request_timeout, parallel_requests: config.parallel_requests as u64, scoring_strategy, + initial_score: config.initial_score, inactive_threshold: (!config.inactive_threshold.is_zero()) .then_some(config.inactive_threshold), batch_size: config.batch_size, diff --git a/code/crates/sync/src/config.rs b/code/crates/sync/src/config.rs index 3f4f6b3a5..b4809c2de 100644 --- a/code/crates/sync/src/config.rs +++ b/code/crates/sync/src/config.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::scoring::Strategy; +use crate::scoring::{Score, Strategy}; const DEFAULT_PARALLEL_REQUESTS: u64 = 5; const DEFAULT_BATCH_SIZE: usize = 5; @@ -13,6 +13,7 @@ pub struct Config { pub max_response_size: usize, pub parallel_requests: u64, pub scoring_strategy: Strategy, + pub initial_score: Score, pub inactive_threshold: Option, pub batch_size: usize, } @@ -50,6 +51,11 @@ impl Config { self } + pub fn with_initial_score(mut self, initial_score: Score) -> Self { + self.initial_score = initial_score; + self + } + pub fn with_inactive_threshold(mut self, inactive_threshold: Option) -> Self { self.inactive_threshold = inactive_threshold; self @@ -70,6 +76,7 @@ impl Default for Config { max_response_size: 10 * 1024 * 1024, // 10 MiB parallel_requests: DEFAULT_PARALLEL_REQUESTS, scoring_strategy: Strategy::default(), + initial_score: 0.5, inactive_threshold: None, batch_size: DEFAULT_BATCH_SIZE, } diff --git a/code/crates/sync/src/scoring.rs b/code/crates/sync/src/scoring.rs index 898682a84..931e82fea 100644 --- a/code/crates/sync/src/scoring.rs +++ b/code/crates/sync/src/scoring.rs @@ -9,6 +9,7 @@ use tracing::debug; use malachitebft_peer::PeerId; +pub mod credit; pub mod ema; pub mod metrics; @@ -31,70 +32,93 @@ pub type Score = f64; /// Strategy for scoring peers based on sync results pub trait ScoringStrategy: Send + Sync { - /// Initial score for new peers. - /// - /// ## Important - /// The initial score MUST be in the `0.0..=1.0` range. - fn initial_score(&self, peer_id: PeerId) -> Score; + /// Per-peer state maintained by this strategy + type State: Default + Clone + Send + Sync; - /// Update the peer score based on previous score and sync result + /// Update the peer score based on previous score and sync result. + /// The strategy has mutable access to per-peer state. /// /// ## Important /// The updated score must be in the `0.0..=1.0` range. - fn update_score(&mut self, previous_score: Score, result: SyncResult) -> Score; + fn update_score( + &self, + state: &mut Self::State, + previous_score: Score, + result: SyncResult, + ) -> Score; } +/// Scoring strategies #[derive(Copy, Clone, Debug, Default)] pub enum Strategy { /// Exponential moving average strategy #[default] Ema, + /// Credit-based strategy + Credit, } -#[derive(Copy, Clone)] -pub struct PeerScore { - score: Score, - last_update: Instant, +/// Per-peer state maintained by the scorer +#[derive(Clone)] +pub struct PeerState { + pub score: Score, + pub last_update: Instant, + pub strategy_state: S::State, } -impl PeerScore { +impl PeerState { pub fn new(score: Score) -> Self { Self { score, last_update: Instant::now(), + strategy_state: S::State::default(), } } } -impl fmt::Debug for PeerScore { +impl fmt::Debug for PeerState +where + S: ScoringStrategy, + S::State: fmt::Debug, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // Round score to 3 decimal places for readability fn round_score(score: Score) -> f64 { (score * 1000.0).round() / 1000.0 } - f.debug_struct("PeerScore") + f.debug_struct("PeerState") .field("score", &round_score(self.score)) .field("last_update", &self.last_update.elapsed()) + .field("strategy_state", &self.strategy_state) .finish() } } /// Tracks peer scores using a scoring strategy -pub struct PeerScorer { - scores: HashMap, - strategy: Box, +pub struct PeerScorer { + state: HashMap>, + initial_score: Score, + strategy: S, } -impl PeerScorer { +impl PeerScorer { /// Create a new peer scorer with specified strategy - pub fn new(strategy: impl ScoringStrategy + 'static) -> Self { + pub fn new(initial_score: Score, strategy: S) -> Self { Self { - scores: HashMap::new(), - strategy: Box::new(strategy), + state: HashMap::new(), + initial_score, + strategy, } } + /// Initial score for new peers. + /// + /// This is used when a peer is first encountered or when a peer's score is reset due to inactivity. + pub fn initial_score(&self) -> Score { + self.initial_score + } + /// Update a peer's score based on the result of a sync request, recording the result in metrics. /// Returns the new score. pub fn update_score_with_metrics( @@ -111,37 +135,40 @@ impl PeerScorer { /// Update a peer's score based on the result of a sync request. /// Returns the new score. pub fn update_score(&mut self, peer_id: PeerId, result: SyncResult) -> Score { - let peer_score = self - .scores + let initial_score = self.initial_score(); + + let peer_state = self + .state .entry(peer_id) - .or_insert_with(|| PeerScore::new(self.strategy.initial_score(peer_id))); + .or_insert_with(|| PeerState::new(initial_score)); - let previous_score = peer_score.score; + let previous_score = peer_state.score; + let new_score = + self.strategy + .update_score(&mut peer_state.strategy_state, previous_score, result); debug!("Updating score for peer {peer_id}"); debug!(" Result = {result:?}"); debug!(" Prev = {previous_score}"); - - let new_score = self.strategy.update_score(previous_score, result); debug!(" New = {new_score}"); - peer_score.score = new_score; - peer_score.last_update = Instant::now(); + peer_state.score = new_score; + peer_state.last_update = Instant::now(); new_score } /// Get the current score for a peer pub fn get_score(&self, peer_id: &PeerId) -> Score { - self.scores + self.state .get(peer_id) .map(|p| p.score) - .unwrap_or(self.strategy.initial_score(*peer_id)) + .unwrap_or_else(|| self.initial_score()) } - /// Get all peer scores - pub fn get_scores(&self) -> &HashMap { - &self.scores + /// Get all peer states + pub fn get_state(&self) -> &HashMap> { + &self.state } /// Select a peer using weighted probabilistic selection @@ -150,7 +177,10 @@ impl PeerScorer { return None; } - let scores = peers.iter().map(|id| self.get_score(id).max(0.0)); + // Use a minimum floor so that even the lowest-scoring peers + // retain a non-zero probability of being selected. + const MIN_WEIGHT: f64 = 0.01; + let scores = peers.iter().map(|id| self.get_score(id).max(MIN_WEIGHT)); // Sample from peers using a weighted distribution based on their scores let distr = WeightedIndex::new(scores).ok()?; @@ -171,14 +201,85 @@ impl PeerScorer { pub fn reset_inactive_peers_scores(&mut self, inactive_threshold: Duration) { let now = Instant::now(); - self.scores - .retain(|_, score| now.duration_since(score.last_update) < inactive_threshold); + self.state.retain(|_, peer_state| { + now.duration_since(peer_state.last_update) < inactive_threshold + }); } } -impl Default for PeerScorer { - fn default() -> Self { - Self::new(ema::ExponentialMovingAverage::default()) +/// Enum wrapper for runtime-configurable scoring strategies +pub enum ScorerVariant { + Ema(PeerScorer), + Credit(PeerScorer), +} + +impl ScorerVariant { + /// Update a peer's score based on the result of a sync request. + /// Returns the new score. + pub fn update_score(&mut self, peer_id: PeerId, result: SyncResult) -> Score { + match self { + ScorerVariant::Ema(scorer) => scorer.update_score(peer_id, result), + ScorerVariant::Credit(scorer) => scorer.update_score(peer_id, result), + } + } + + /// Update a peer's score based on the result of a sync request, recording the result in metrics. + /// Returns the new score. + pub fn update_score_with_metrics( + &mut self, + peer_id: PeerId, + result: SyncResult, + metrics: &Metrics, + ) -> Score { + match self { + ScorerVariant::Ema(scorer) => { + scorer.update_score_with_metrics(peer_id, result, metrics) + } + ScorerVariant::Credit(scorer) => { + scorer.update_score_with_metrics(peer_id, result, metrics) + } + } + } + + /// Get the current score for a peer + pub fn get_score(&self, peer_id: &PeerId) -> Score { + match self { + ScorerVariant::Ema(scorer) => scorer.get_score(peer_id), + ScorerVariant::Credit(scorer) => scorer.get_score(peer_id), + } + } + + /// Select a peer using weighted probabilistic selection + pub fn select_peer(&self, peers: &[PeerId], rng: &mut R) -> Option { + match self { + ScorerVariant::Ema(scorer) => scorer.select_peer(peers, rng), + ScorerVariant::Credit(scorer) => scorer.select_peer(peers, rng), + } + } + + /// Prune peers whose scores have not been updated for the specified duration, + /// effectively resetting their score to the initial score. + pub fn reset_inactive_peers_scores(&mut self, inactive_threshold: Duration) { + match self { + ScorerVariant::Ema(scorer) => scorer.reset_inactive_peers_scores(inactive_threshold), + ScorerVariant::Credit(scorer) => scorer.reset_inactive_peers_scores(inactive_threshold), + } + } + + /// Get all peer scores as a map from peer ID to score value (for debug logging) + pub fn get_scores(&self) -> HashMap { + match self { + ScorerVariant::Ema(scorer) => scorer + .get_state() + .iter() + .map(|(id, state)| (*id, state.score)) + .collect(), + ScorerVariant::Credit(scorer) => scorer + .get_state() + .iter() + .map(|(id, state)| (*id, state.score)) + .collect(), + } } } @@ -238,20 +339,65 @@ mod tests { }) } - fn arb_strategy(u: &mut Unstructured) -> Result { + const INITIAL_SCORE: Score = 0.5; + + fn arb_slow_threshold(u: &mut Unstructured) -> Result { + u.int_in_range(1000..=5000).map(Duration::from_millis) + } + + fn arb_scorer_variant(u: &mut Unstructured) -> Result<(ScorerVariant, Duration)> { + let slow_threshold = arb_slow_threshold(u)?; + eprintln!("slow_threshold = {slow_threshold:?}"); + + let scorer = if u.arbitrary()? { + eprintln!("Testing with EMA strategy"); + let strategy = arb_ema_strategy(u, slow_threshold)?; + ScorerVariant::Ema(PeerScorer::new(INITIAL_SCORE, strategy)) + } else { + eprintln!("Testing with Credit strategy"); + let strategy = arb_credit_strategy(u, slow_threshold)?; + ScorerVariant::Credit(PeerScorer::new(INITIAL_SCORE, strategy)) + }; + + Ok((scorer, slow_threshold)) + } + + fn arb_ema_strategy( + u: &mut Unstructured, + slow_threshold: Duration, + ) -> Result { let alpha_success = u.choose(&[0.20, 0.25, 0.30])?; let alpha_timeout = u.choose(&[0.10, 0.15, 0.20])?; let alpha_failure = u.choose(&[0.10, 0.15, 0.20])?; - let slow_threshold = u.int_in_range(1000..=5000)?; Ok(ema::ExponentialMovingAverage::new( *alpha_success, *alpha_timeout, *alpha_failure, - Duration::from_millis(slow_threshold), + slow_threshold, )) } + fn arb_credit_strategy( + u: &mut Unstructured, + slow_threshold: Duration, + ) -> Result { + let credit_fast_success = *u.choose(&[30, 40, 50])?; + let credit_slow_success = *u.choose(&[-5, -10, -15])?; + let credit_failure = *u.choose(&[-15, -20, -25])?; + let credit_timeout = *u.choose(&[-10, -15, -20])?; + + Ok(credit::Credit::new(credit::CreditConfig { + slow_threshold, + credit_fast_success, + credit_slow_success, + credit_failure, + credit_timeout, + min_credit: -100, + max_credit: 100, + })) + } + fn arb_vec( u: &mut Unstructured, f: impl Fn(&mut Unstructured) -> Result, @@ -261,18 +407,53 @@ mod tests { (0..size).map(|_| f(u)).collect::>>() } + // Helper to get strategy-specific properties for assertions + fn update_score_directly( + scorer: &ScorerVariant, + state: &mut StrategyState, + previous_score: Score, + result: SyncResult, + ) -> Score { + match (scorer, state) { + (ScorerVariant::Ema(s), StrategyState::Ema(ref mut peer_state)) => { + s.strategy.update_score(peer_state, previous_score, result) + } + (ScorerVariant::Credit(s), StrategyState::Credit(ref mut peer_state)) => { + s.strategy.update_score(peer_state, previous_score, result) + } + _ => unreachable!("Mismatched scorer and state types"), + } + } + + // Helper enum for per-peer state in tests + enum StrategyState { + Ema(()), + Credit(i32), + } + + impl StrategyState { + fn for_scorer(scorer: &ScorerVariant) -> Self { + match scorer { + ScorerVariant::Ema(_) => StrategyState::Ema(()), + ScorerVariant::Credit(s) => { + // Initialize credit with the strategy's initial credit value + StrategyState::Credit(s.strategy.initial_credit()) + } + } + } + } + // Property: Scores are bounded between 0.0 and 1.0 #[test] fn scores_are_bounded() { arbtest(|u| { - let strategy = arb_strategy(u)?; + let (mut scorer, slow_threshold) = arb_scorer_variant(u)?; let results = arb_vec( u, - |u| arb_sync_result_success_fast(u, strategy.slow_threshold), + |u| arb_sync_result_success_fast(u, slow_threshold), 10..=100, )?; - let mut scorer = PeerScorer::new(strategy); let peer_id = PeerId::random(); // Initial score should be bounded @@ -293,10 +474,9 @@ mod tests { }); arbtest(|u| { - let strategy = arb_strategy(u)?; + let (mut scorer, _) = arb_scorer_variant(u)?; let results = arb_vec(u, arb_sync_result_failure, 10..=100)?; - let mut scorer = PeerScorer::new(strategy); let peer_id = PeerId::random(); // Initial score should be bounded @@ -321,20 +501,22 @@ mod tests { #[test] fn fast_responses_improve_score() { arbtest(|u| { - let strategy = arb_strategy(u)?; - let response_time = arb_response_time_fast(u, strategy.slow_threshold)?; + let (scorer, slow_threshold) = arb_scorer_variant(u)?; + let response_time = arb_response_time_fast(u, slow_threshold)?; - let mut scorer = PeerScorer::new(strategy); - let peer_id = PeerId::random(); + let init_score = INITIAL_SCORE; + let mut state = StrategyState::for_scorer(&scorer); - let initial_score = scorer.get_score(&peer_id); - let update_score = scorer - .strategy - .update_score(initial_score, SyncResult::Success(response_time)); + let update_score = update_score_directly( + &scorer, + &mut state, + init_score, + SyncResult::Success(response_time), + ); assert!( - update_score > initial_score, - "Fast response decreased score: {initial_score} -> {update_score}", + update_score > init_score, + "Fast response decreased score: {init_score} -> {update_score}", ); Ok(()) @@ -345,19 +527,22 @@ mod tests { #[test] fn slow_responses_decrease_high_score() { arbtest(|u| { - let strategy = arb_strategy(u)?; - let response_time = arb_response_time_slow(u, strategy.slow_threshold)?; + let (scorer, slow_threshold) = arb_scorer_variant(u)?; + let response_time = arb_response_time_slow(u, slow_threshold)?; - let mut scorer = PeerScorer::new(strategy); + let init_score = 1.0; // Start with a high score to test the penalty effect + let mut state = StrategyState::for_scorer(&scorer); - let initial_score = 1.0; - let update_score = scorer - .strategy - .update_score(initial_score, SyncResult::Success(response_time)); + let update_score = update_score_directly( + &scorer, + &mut state, + init_score, + SyncResult::Success(response_time), + ); assert!( - update_score < initial_score, - "Slow response ({response_time:?}) should decrease score: {initial_score} -> {update_score}", + update_score < init_score, + "Slow response should decrease high score: {init_score} -> {update_score}", ); Ok(()) @@ -368,18 +553,18 @@ mod tests { #[test] fn failures_decrease_score() { arbtest(|u| { - let strategy = arb_strategy(u)?; + let (scorer, _) = arb_scorer_variant(u)?; let failure_type = u.choose(&[SyncResult::Timeout, SyncResult::Failure])?; - let mut scorer = PeerScorer::new(strategy); - let peer_id = PeerId::random(); + let init_score = INITIAL_SCORE; + let mut state = StrategyState::for_scorer(&scorer); - let initial_score = scorer.get_score(&peer_id); - let update_score = scorer.strategy.update_score(initial_score, *failure_type); + let update_score = + update_score_directly(&scorer, &mut state, init_score, *failure_type); assert!( - update_score < initial_score, - "Failure/timeout should decrease score: {initial_score} -> {update_score} for {failure_type:?}", + update_score < init_score, + "Failure/timeout should decrease score: {init_score} -> {update_score} for {failure_type:?}", ); Ok(()) @@ -390,30 +575,55 @@ mod tests { #[test] fn peer_selection_is_deterministic() { arbtest(|u| { + let slow_threshold = arb_slow_threshold(u)?; let peer_count = u.int_in_range(2usize..=10)?; let seed = u.arbitrary()?; let results = arb_vec(u, arb_sync_result, 0..=50)?; let peers: Vec<_> = (0..peer_count).map(|_| PeerId::random()).collect(); - let mut scorer1 = PeerScorer::default(); - let mut scorer2 = PeerScorer::default(); + // Test with EMA strategy + { + let strategy = arb_ema_strategy(u, slow_threshold)?; + let mut scorer1 = PeerScorer::new(INITIAL_SCORE, strategy); + let mut scorer2 = PeerScorer::new(INITIAL_SCORE, strategy); - // Apply same updates to both scorers - for (i, result) in results.into_iter().enumerate() { - let peer_id = peers[i % peers.len()]; - scorer1.update_score(peer_id, result); - scorer2.update_score(peer_id, result); + for (i, result) in results.iter().enumerate() { + let peer_id = peers[i % peers.len()]; + scorer1.update_score(peer_id, *result); + scorer2.update_score(peer_id, *result); + } + + let mut rng1 = StdRng::seed_from_u64(seed); + let mut rng2 = StdRng::seed_from_u64(seed); + + for _ in 0..10 { + let selection1 = scorer1.select_peer(&peers, &mut rng1); + let selection2 = scorer2.select_peer(&peers, &mut rng2); + assert_eq!(selection1, selection2); + } } - // Select peers with same RNG seed - let mut rng1 = StdRng::seed_from_u64(seed); - let mut rng2 = StdRng::seed_from_u64(seed); + // Test with Credit strategy + { + let strategy = arb_credit_strategy(u, slow_threshold)?; + let mut scorer1 = PeerScorer::new(INITIAL_SCORE, strategy.clone()); + let mut scorer2 = PeerScorer::new(INITIAL_SCORE, strategy); - for _ in 0..10 { - let selection1 = scorer1.select_peer(&peers, &mut rng1); - let selection2 = scorer2.select_peer(&peers, &mut rng2); - assert_eq!(selection1, selection2); + for (i, result) in results.iter().enumerate() { + let peer_id = peers[i % peers.len()]; + scorer1.update_score(peer_id, *result); + scorer2.update_score(peer_id, *result); + } + + let mut rng1 = StdRng::seed_from_u64(seed); + let mut rng2 = StdRng::seed_from_u64(seed); + + for _ in 0..10 { + let selection1 = scorer1.select_peer(&peers, &mut rng1); + let selection2 = scorer2.select_peer(&peers, &mut rng2); + assert_eq!(selection1, selection2); + } } Ok(()) @@ -424,11 +634,11 @@ mod tests { #[test] fn all_peers_selectable() { arbtest(|u| { + let (mut scorer, _) = arb_scorer_variant(u)?; let peer_count = u.int_in_range(2_usize..=6)?; let results = arb_vec(u, arb_sync_result, 0..=20)?; let peers: Vec = (0..peer_count).map(|_| PeerId::random()).collect(); - let mut scorer = PeerScorer::default(); // Apply random updates for (i, result) in results.iter().enumerate() { @@ -487,16 +697,16 @@ mod tests { let peers = vec![good_peer, bad_peer]; - let mut scorer = PeerScorer::default(); + let (mut scorer, _) = arb_scorer_variant(u)?; // Give good peer good results - for result in good_results { - scorer.update_score(good_peer, result); + for result in &good_results { + scorer.update_score(good_peer, *result); } // Give bad peer bad results - for result in bad_results { - scorer.update_score(bad_peer, result); + for result in &bad_results { + scorer.update_score(bad_peer, *result); } let good_score = scorer.get_score(&good_peer); @@ -527,32 +737,37 @@ mod tests { } // Property: Score updates should be monotonic for sequences of same result type - // Note: This test does not apply to EMA anymore, but it can be useful for testing other strategies that have more deterministic score changes. + // + // Note: This test does not apply to EMA but does apply to Credit strategy, + // where we expect consistent improvements for fast successes and consistent + // penalties for failures and timeouts. #[test] - #[ignore] fn monotonic_score_updates() { arbtest(|u| { - let strategy = arb_strategy(u)?; + let slow_threshold = arb_slow_threshold(u)?; + let strategy = arb_credit_strategy(u, slow_threshold)?; + let scorer = ScorerVariant::Credit(PeerScorer::new(INITIAL_SCORE, strategy)); + let result = arb_sync_result(u)?; let update_count = u.int_in_range(1_usize..=20)?; - let mut scorer = PeerScorer::new(strategy); - let mut current_score = scorer.strategy.initial_score(PeerId::random()); + let mut current_score = INITIAL_SCORE; + let mut state = StrategyState::for_scorer(&scorer); let mut scores = vec![current_score]; println!( "Testing monotonicity for result {:?} over {} updates, threshold={:?}", - result, update_count, strategy.slow_threshold + result, update_count, slow_threshold ); for _ in 0..update_count { - current_score = scorer.strategy.update_score(current_score, result); + current_score = update_score_directly(&scorer, &mut state, current_score, result); scores.push(current_score); } // Check monotonicity based on result type match result { - SyncResult::Success(rt) if rt < strategy.slow_threshold => { + SyncResult::Success(rt) if rt < slow_threshold => { // For fast response, scores should increase for window in scores.windows(2) { let diff = window[1] - window[0]; @@ -597,8 +812,8 @@ mod tests { fn empty_peer_list_returns_none() { arbtest(|u| { let seed = u.arbitrary()?; + let (scorer, _) = arb_scorer_variant(u)?; - let scorer = PeerScorer::default(); let mut rng = StdRng::seed_from_u64(seed); let result = scorer.select_peer(&[], &mut rng); assert_eq!(result, None); @@ -610,12 +825,12 @@ mod tests { #[test] fn single_peer_always_selected() { arbtest(|u| { + let (mut scorer, _) = arb_scorer_variant(u)?; let seed = u.arbitrary()?; let results = arb_vec(u, arb_sync_result, 0..=10)?; let peer = PeerId::random(); let peers = vec![peer]; - let mut scorer = PeerScorer::default(); // Apply some updates for result in results { @@ -636,18 +851,21 @@ mod tests { #[test] fn response_time_affects_success_score() { arbtest(|u| { - let strategy = arb_strategy(u)?; + let (scorer, _) = arb_scorer_variant(u)?; let fast_time = u.int_in_range(10_u64..=100)?; let slow_time = u.int_in_range(1000_u64..=5000)?; - let mut scorer = PeerScorer::new(strategy); - let initial_score = scorer.strategy.initial_score(PeerId::random()); + let init_score = INITIAL_SCORE; + let mut fast_state = StrategyState::for_scorer(&scorer); + let mut slow_state = StrategyState::for_scorer(&scorer); let fast_result = SyncResult::Success(Duration::from_millis(fast_time)); let slow_result = SyncResult::Success(Duration::from_millis(slow_time)); - let fast_score = scorer.strategy.update_score(initial_score, fast_result); - let slow_score = scorer.strategy.update_score(initial_score, slow_result); + let fast_score = + update_score_directly(&scorer, &mut fast_state, init_score, fast_result); + let slow_score = + update_score_directly(&scorer, &mut slow_state, init_score, slow_result); assert!( fast_score >= slow_score, @@ -662,10 +880,9 @@ mod tests { #[test] fn updating_one_peer_does_not_affect_others() { arbtest(|u| { - let strategy = arb_strategy(u)?; + let (mut scorer, _) = arb_scorer_variant(u)?; let results = arb_vec(u, arb_sync_result, 0..=10)?; - let mut scorer = PeerScorer::new(strategy); let peer1 = PeerId::random(); let peer2 = PeerId::random(); @@ -694,13 +911,12 @@ mod tests { #[test] fn fast_response_help_recover_score_quickly() { arbtest(|u| { - let strategy = arb_strategy(u)?; - let response_time = arb_response_time_fast(u, strategy.slow_threshold)?; + let (mut scorer, slow_threshold) = arb_scorer_variant(u)?; + let response_time = arb_response_time_fast(u, slow_threshold)?; - let mut scorer = PeerScorer::new(strategy); let peer_id = PeerId::random(); - let initial_score = scorer.get_score(&peer_id); + let init_score = scorer.get_score(&peer_id); // Apply a timeout scorer.update_score(peer_id, SyncResult::Timeout); @@ -708,8 +924,8 @@ mod tests { // Score after success should be higher than after timeout assert!( - score_after_timeout < initial_score, - "Score after timeout ({score_after_timeout}) should be lower than initial score ({initial_score})" + score_after_timeout < init_score, + "Score after timeout ({score_after_timeout}) should be lower than initial score ({init_score})" ); // Apply a success @@ -718,8 +934,8 @@ mod tests { // Score after success should be higher than initial score assert!( - score_after_success > initial_score, - "Score after success ({score_after_success}) should be greater than initial score ({initial_score})" + score_after_success > init_score, + "Score after success ({score_after_success}) should be greater than initial score ({init_score})" ); Ok(()) @@ -730,22 +946,19 @@ mod tests { #[test] fn pruning_inactive_peers_resets_scores() { arbtest(|u| { - let strategy = arb_strategy(u)?; - let mut scorer = PeerScorer::new(strategy); + let (mut scorer, _) = arb_scorer_variant(u)?; + let peer_id = PeerId::random(); + let init_score = INITIAL_SCORE; // Update score for the peer scorer.update_score(peer_id, SyncResult::Success(Duration::from_millis(100))); - // Ensure the peer is present - assert!(scorer.get_scores().contains_key(&peer_id)); - // Prune inactive peers with a threshold that will remove this peer scorer.reset_inactive_peers_scores(Duration::from_millis(0)); - // Peer should be removed - assert!(!scorer.get_scores().contains_key(&peer_id)); - assert_eq!(scorer.get_score(&peer_id), strategy.initial_score(peer_id)); + // Peer should be removed, score should reset to initial + assert_eq!(scorer.get_score(&peer_id), init_score); Ok(()) }); diff --git a/code/crates/sync/src/scoring/credit.rs b/code/crates/sync/src/scoring/credit.rs new file mode 100644 index 000000000..670cdb04c --- /dev/null +++ b/code/crates/sync/src/scoring/credit.rs @@ -0,0 +1,138 @@ +use std::time::Duration; + +use super::{Score, ScoringStrategy, SyncResult}; + +#[derive(Copy, Clone, Debug)] +pub struct CreditConfig { + /// Threshold for what is considered "fast enough". + pub slow_threshold: Duration, + /// Credit deltas + pub credit_fast_success: i32, + /// Credit delta for a success that's slower than the slow_threshold. + pub credit_slow_success: i32, + /// Credit delta for a failure. + pub credit_failure: i32, + /// Credit delta for a timeout. + pub credit_timeout: i32, + /// Minimum credit (worst score). + pub min_credit: i32, + /// Maximum credit (best score). + pub max_credit: i32, +} + +impl Default for CreditConfig { + fn default() -> Self { + CreditConfig { + slow_threshold: Duration::from_millis(500), + credit_fast_success: 2, + credit_slow_success: 0, + credit_failure: -2, + credit_timeout: -4, + min_credit: -20, + max_credit: 20, + } + } +} + +/// Credit-based scoring strategy +/// +/// Maintain an integer "credit" per peer. +/// - Fast success increases credit more than slow success. +/// - Failures and timeouts reduce credit. +/// +/// Credits are clamped to [min_credit, max_credit]. +/// Score is a normalized mapping of credit -> [0.0, 1.0]. +#[derive(Clone, Debug)] +pub struct Credit { + config: CreditConfig, +} + +impl Default for Credit { + fn default() -> Self { + Self::new(CreditConfig::default()) + } +} + +impl Credit { + pub fn new(config: CreditConfig) -> Self { + assert!( + config.slow_threshold.as_secs_f64() > 0.0, + "slow_threshold must be > 0" + ); + + assert!( + config.min_credit < config.max_credit, + "min_credit must be < max_credit" + ); + + Self { config } + } + + pub fn initial_credit(&self) -> i32 { + // Neutral: midpoint of the clamp range. + self.config.min_credit + (self.config.max_credit - self.config.min_credit) / 2 + } + + fn clamp_credit(&self, c: i32) -> i32 { + c.clamp(self.config.min_credit, self.config.max_credit) + } + + /// Map credit in [min_credit, max_credit] to score in [0.0, 1.0]. + fn credit_to_score(&self, credit: i32) -> Score { + let min = self.config.min_credit as f64; + let max = self.config.max_credit as f64; + let c = credit as f64; + + // Avoid division by zero if min and max are the same + // (though this should be prevented by the constructor). + if (max - min).abs() < f64::EPSILON { + return 0.5; + } + + ((c - min) / (max - min)).clamp(0.0, 1.0) + } + + fn is_fast(&self, response_time: Duration) -> bool { + response_time < self.config.slow_threshold + } +} + +impl ScoringStrategy for Credit { + type State = i32; // The credit value per peer + + fn update_score( + &self, + credit: &mut Self::State, + _previous_score: Score, + result: SyncResult, + ) -> Score { + // Initialize credit if it's at the default value (0) + // This handles the case where PeerState::default() is used + if *credit == 0 { + *credit = self.initial_credit(); + } + + let delta = match result { + SyncResult::Success(rt) => { + if self.is_fast(rt) { + self.config.credit_fast_success + } else { + self.config.credit_slow_success + } + } + SyncResult::Failure => self.config.credit_failure, + SyncResult::Timeout => self.config.credit_timeout, + }; + + let old_credit = *credit; + *credit = self.clamp_credit(credit.saturating_add(delta)); + + eprintln!( + "result={result:?}, credit={old_credit}, delta={delta}, new={}, score={:.2}", + *credit, + self.credit_to_score(*credit) + ); + + self.credit_to_score(*credit) + } +} diff --git a/code/crates/sync/src/scoring/ema.rs b/code/crates/sync/src/scoring/ema.rs index 586516f6d..9e9e266d9 100644 --- a/code/crates/sync/src/scoring/ema.rs +++ b/code/crates/sync/src/scoring/ema.rs @@ -1,7 +1,5 @@ use std::time::Duration; -use malachitebft_peer::PeerId; - use super::{Score, ScoringStrategy, SyncResult}; /// Exponential Moving Average scoring strategy @@ -71,11 +69,9 @@ impl ExponentialMovingAverage { } impl ScoringStrategy for ExponentialMovingAverage { - fn initial_score(&self, _peer_id: PeerId) -> Score { - 0.5 // All peers start with a neutral score of 0.5 - } + type State = (); // Stateless - no per-peer data needed - fn update_score(&mut self, previous_score: Score, result: SyncResult) -> Score { + fn update_score(&self, _state: &mut (), previous_score: Score, result: SyncResult) -> Score { match result { SyncResult::Success(response_time) => { let response_time_secs = response_time.as_secs_f64(); diff --git a/code/crates/sync/src/state.rs b/code/crates/sync/src/state.rs index 5f7278bcb..e047f011a 100644 --- a/code/crates/sync/src/state.rs +++ b/code/crates/sync/src/state.rs @@ -5,7 +5,7 @@ use std::ops::RangeInclusive; use malachitebft_core_types::{Context, Height}; use malachitebft_peer::PeerId; -use crate::scoring::{ema, PeerScorer, Strategy}; +use crate::scoring::{credit, ema, PeerScorer, ScorerVariant, Strategy}; use crate::{Config, OutboundRequestId, Status}; pub struct State @@ -34,7 +34,7 @@ where pub peers: BTreeMap>, /// Peer scorer for scoring peers based on their performance. - pub peer_scorer: PeerScorer, + pub peer_scorer: ScorerVariant, } impl State @@ -47,8 +47,16 @@ where // Sync configuration config: Config, ) -> Self { + let initial_score = config.initial_score; + let peer_scorer = match config.scoring_strategy { - Strategy::Ema => PeerScorer::new(ema::ExponentialMovingAverage::default()), + Strategy::Ema => ScorerVariant::Ema(PeerScorer::new( + initial_score, + ema::ExponentialMovingAverage::default(), + )), + Strategy::Credit => { + ScorerVariant::Credit(PeerScorer::new(initial_score, credit::Credit::default())) + } }; Self {