Skip to content

Commit bd8af36

Browse files
authored
fix(sync): Replace the latency “cliff” in EMA scoring with a continuous curve (#1451)
Right now quality drops from 1.0 to e^-1 immediately at the threshold. This commit makes it continuous after the threshold. **Before:** ``` threshold = 1.00s t(s) quality -------------------- 0.90 1.000000 1.00 1.000000 1.10 0.332871 <-- cliff 1.20 0.301194 1.30 0.272532 1.40 0.246597 1.50 0.223130 1.60 0.201897 1.70 0.182684 1.80 0.165299 1.90 0.149569 2.00 0.135335 2.10 0.122456 2.20 0.110803 2.30 0.100259 2.40 0.090718 ``` **After:** ``` threshold = 1.00s t(s) quality -------------------- 0.90 1.000000 1.00 1.000000 1.10 0.904837 <-- no cliff 1.20 0.818731 1.30 0.740818 1.40 0.670320 1.50 0.606531 1.60 0.548812 1.70 0.496585 1.80 0.449329 1.90 0.406570 2.00 0.367879 2.10 0.332871 2.20 0.301194 2.30 0.272532 2.40 0.246597 ```
1 parent b24e55c commit bd8af36

2 files changed

Lines changed: 31 additions & 14 deletions

File tree

code/crates/sync/src/scoring.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -341,24 +341,23 @@ mod tests {
341341
});
342342
}
343343

344-
// Property: Slow responses should decrease the score
344+
// Property: Slow responses should decrease the score if it was already high
345345
#[test]
346-
fn slow_responses_decrease_score() {
346+
fn slow_responses_decrease_high_score() {
347347
arbtest(|u| {
348348
let strategy = arb_strategy(u)?;
349349
let response_time = arb_response_time_slow(u, strategy.slow_threshold)?;
350350

351351
let mut scorer = PeerScorer::new(strategy);
352-
let peer_id = PeerId::random();
353352

354-
let initial_score = scorer.get_score(&peer_id);
353+
let initial_score = 1.0;
355354
let update_score = scorer
356355
.strategy
357356
.update_score(initial_score, SyncResult::Success(response_time));
358357

359358
assert!(
360359
update_score < initial_score,
361-
"Slow response should decrease score: {initial_score} -> {update_score}",
360+
"Slow response ({response_time:?}) should decrease score: {initial_score} -> {update_score}",
362361
);
363362

364363
Ok(())
@@ -528,7 +527,9 @@ mod tests {
528527
}
529528

530529
// Property: Score updates should be monotonic for sequences of same result type
530+
// Note: This test does not apply to EMA anymore, but it can be useful for testing other strategies that have more deterministic score changes.
531531
#[test]
532+
#[ignore]
532533
fn monotonic_score_updates() {
533534
arbtest(|u| {
534535
let strategy = arb_strategy(u)?;
@@ -539,6 +540,11 @@ mod tests {
539540
let mut current_score = scorer.strategy.initial_score(PeerId::random());
540541
let mut scores = vec![current_score];
541542

543+
println!(
544+
"Testing monotonicity for result {:?} over {} updates, threshold={:?}",
545+
result, update_count, strategy.slow_threshold
546+
);
547+
542548
for _ in 0..update_count {
543549
current_score = scorer.strategy.update_score(current_score, result);
544550
scores.push(current_score);
@@ -552,18 +558,18 @@ mod tests {
552558
let diff = window[1] - window[0];
553559
assert!(
554560
diff >= 0.0,
555-
"Fast response should improve score: {} -> {}",
561+
"Fast response ({rt:?}) should improve score: {} -> {}",
556562
window[0],
557563
window[1]
558564
);
559565
}
560566
}
561-
SyncResult::Success(_) => {
567+
SyncResult::Success(rt) => {
562568
// For slow responses, scores should decrease
563569
for window in scores.windows(2) {
564570
assert!(
565571
window[1] <= window[0],
566-
"Slow response should decrease score: {} -> {}",
572+
"Slow response ({rt:?}) should decrease score: {} -> {}",
567573
window[0],
568574
window[1]
569575
);

code/crates/sync/src/scoring/ema.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,29 @@ impl ScoringStrategy for ExponentialMovingAverage {
7878
fn update_score(&mut self, previous_score: Score, result: SyncResult) -> Score {
7979
match result {
8080
SyncResult::Success(response_time) => {
81+
let response_time_secs = response_time.as_secs_f64();
82+
let threshold_secs = self.slow_threshold.as_secs_f64();
83+
8184
// Calculate quality score between 0-1 based on response time
82-
// We use an exponential decay function to penalize slow responses more heavily
83-
let quality = if response_time < self.slow_threshold {
84-
// Fast responses get a high quality score
85+
// We use an exponential decay function to penalize slow responses more heavily.
86+
let quality = if response_time_secs <= threshold_secs {
8587
1.0
8688
} else {
87-
// Slow responses get a low quality score based on how slow they were
88-
(-response_time.as_secs_f64() / self.slow_threshold.as_secs_f64()).exp()
89+
// Decay from the threshold, ensuring a smooth drop-off
90+
(-(response_time_secs - threshold_secs) / threshold_secs).exp()
8991
};
9092

9193
// Update score with EMA using alpha_success
92-
self.alpha_success * quality + (1.0 - self.alpha_success) * previous_score
94+
let new_score =
95+
self.alpha_success * quality + (1.0 - self.alpha_success) * previous_score;
96+
97+
#[cfg(test)]
98+
{
99+
println!("Response time: {response_time_secs:.3}s, Quality: {quality:.3}");
100+
println!(" => Updating score: prev={previous_score:.3}, new={new_score:.3}");
101+
}
102+
103+
new_score
93104
}
94105

95106
SyncResult::Timeout => {

0 commit comments

Comments
 (0)