Skip to content

Commit 2961fe7

Browse files
committed
fix(sync): add stale request cleanup to prevent sync blocking
1 parent 223dd0e commit 2961fe7

2 files changed

Lines changed: 67 additions & 7 deletions

File tree

vendor/malachitebft-sync/src/handle.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ where
9595
pub async fn on_tick<Ctx>(
9696
co: Co<Ctx>,
9797
state: &mut State<Ctx>,
98-
_metrics: &Metrics,
98+
metrics: &Metrics,
9999
) -> Result<(), Error<Ctx>>
100100
where
101101
Ctx: Context,
@@ -114,6 +114,38 @@ where
114114
.reset_inactive_peers_scores(inactive_threshold);
115115
}
116116

117+
// Clear stale pending requests that have timed out
118+
let cleared_count = state.clear_stale_pending_requests();
119+
if cleared_count > 0 {
120+
warn!(
121+
cleared_count,
122+
height.sync = %state.sync_height,
123+
pending_requests = state.pending_value_requests.len(),
124+
"SYNC STALE CLEANUP: Cleared timed-out pending requests"
125+
);
126+
127+
// After clearing stale requests, check if we need to skip ahead
128+
// because peers can't serve the heights we're requesting
129+
if let Some((skip_to, skip_peer)) = state.find_earliest_syncable_height(state.sync_height) {
130+
if skip_to > state.sync_height {
131+
warn!(
132+
height.current = %state.sync_height,
133+
height.skip_to = %skip_to,
134+
peer = %skip_peer,
135+
"SYNC SKIP AFTER CLEANUP: Peers cannot serve old heights, jumping to earliest available"
136+
);
137+
138+
state.sync_height = skip_to;
139+
state.tip_height = skip_to.decrement().unwrap_or_default();
140+
state.pending_value_requests.clear();
141+
state.height_per_request_id.clear();
142+
143+
// Request from the new height
144+
request_value_from_peer(&co, state, metrics, skip_to, skip_peer).await?;
145+
}
146+
}
147+
}
148+
117149
debug!("Peer scores: {:#?}", state.peer_scorer.get_scores());
118150

119151
Ok(())

vendor/malachitebft-sync/src/state.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::BTreeMap;
2+
use std::time::Instant;
23

34
use malachitebft_core_types::{Context, Height};
45
use malachitebft_peer::PeerId;
@@ -39,7 +40,8 @@ where
3940
pub sync_height: Ctx::Height,
4041

4142
/// Decided value requests for these heights have been sent out to peers.
42-
pub pending_value_requests: BTreeMap<Ctx::Height, (OutboundRequestId, RequestState)>,
43+
/// Tuple contains: (request_id, state, timestamp_when_sent)
44+
pub pending_value_requests: BTreeMap<Ctx::Height, (OutboundRequestId, RequestState, Instant)>,
4345

4446
/// Maps request ID to height for pending decided value requests.
4547
pub height_per_request_id: BTreeMap<OutboundRequestId, Ctx::Height>,
@@ -133,14 +135,14 @@ where
133135
.insert(request_id.clone(), height);
134136

135137
self.pending_value_requests
136-
.insert(height, (request_id, RequestState::WaitingResponse));
138+
.insert(height, (request_id, RequestState::WaitingResponse, Instant::now()));
137139
}
138140

139141
/// Mark that a response has been received for a height.
140142
///
141143
/// State transition: WaitingResponse -> WaitingValidation
142144
pub fn response_received(&mut self, request_id: OutboundRequestId, height: Ctx::Height) {
143-
if let Some((req_id, state)) = self.pending_value_requests.get_mut(&height) {
145+
if let Some((req_id, state, _)) = self.pending_value_requests.get_mut(&height) {
144146
if req_id != &request_id {
145147
return; // A new request has been made in the meantime, ignore this response.
146148
}
@@ -155,7 +157,7 @@ where
155157
/// State transition: WaitingValidation -> Validated
156158
/// It is also possible to have the following transition: WaitingResponse -> Validated.
157159
pub fn validate_response(&mut self, height: Ctx::Height) {
158-
if let Some((_, state)) = self.pending_value_requests.get_mut(&height) {
160+
if let Some((_, state, _)) = self.pending_value_requests.get_mut(&height) {
159161
*state = RequestState::Validated;
160162
}
161163
}
@@ -167,7 +169,7 @@ where
167169

168170
/// Remove the pending decided value request for a given height.
169171
pub fn remove_pending_request_by_height(&mut self, height: &Ctx::Height) {
170-
if let Some((request_id, _)) = self.pending_value_requests.remove(height) {
172+
if let Some((request_id, _, _)) = self.pending_value_requests.remove(height) {
171173
self.height_per_request_id.remove(&request_id);
172174
}
173175
}
@@ -191,7 +193,7 @@ where
191193

192194
/// Check if a pending decided value request for a given height is in the `Validated` state.
193195
pub fn is_pending_value_request_validated_by_height(&self, height: &Ctx::Height) -> bool {
194-
if let Some((_, state)) = self.pending_value_requests.get(height) {
196+
if let Some((_, state, _)) = self.pending_value_requests.get(height) {
195197
*state == RequestState::Validated
196198
} else {
197199
false
@@ -207,6 +209,32 @@ where
207209
}
208210
}
209211

212+
/// Clear stale pending requests that have been waiting longer than the configured timeout.
213+
/// Returns the number of cleared requests.
214+
pub fn clear_stale_pending_requests(&mut self) -> usize {
215+
let timeout = self.config.request_timeout;
216+
let now = Instant::now();
217+
218+
// Find heights with stale requests (not yet validated and older than timeout)
219+
let stale_heights: Vec<Ctx::Height> = self
220+
.pending_value_requests
221+
.iter()
222+
.filter(|(_, (_, state, sent_at))| {
223+
*state != RequestState::Validated && now.duration_since(*sent_at) > timeout
224+
})
225+
.map(|(height, _)| *height)
226+
.collect();
227+
228+
let count = stale_heights.len();
229+
230+
// Remove stale requests
231+
for height in stale_heights {
232+
self.remove_pending_request_by_height(&height);
233+
}
234+
235+
count
236+
}
237+
210238
/// Find the earliest height that any peer can serve, above the given height.
211239
/// Returns the height and a peer that can serve it.
212240
/// This is used when the requested height is not available from any peer.

0 commit comments

Comments
 (0)