diff --git a/code/crates/core-driver/src/driver.rs b/code/crates/core-driver/src/driver.rs index 81790ee46..ca925f8e2 100644 --- a/code/crates/core-driver/src/driver.rs +++ b/code/crates/core-driver/src/driver.rs @@ -62,6 +62,10 @@ where /// The first element of the tuple is the round at which that input has been emitted. pending_inputs: Vec<(Round, RoundInput)>, + /// The set of timeouts already scheduled by the consensus logic. + /// Intended to avoid scheduling the same timeout multiple times. + scheduled_timeouts: Vec, + last_prevote: Option, last_precommit: Option, @@ -98,6 +102,7 @@ where round_state, proposer: None, pending_inputs: vec![], + scheduled_timeouts: vec![], commit_certificates: vec![], polka_certificates: vec![], last_prevote: None, @@ -119,7 +124,6 @@ where // Reset the round state let round_state = RoundState::new(height, Round::Nil); self.round_state = round_state; - self.round_certificate = None; // Reset the proposal keeper let proposal_keeper = ProposalKeeper::new(); @@ -130,8 +134,10 @@ where self.polka_certificates = vec![]; // Reset additional internal state + self.scheduled_timeouts.clear(); self.last_prevote = None; self.last_precommit = None; + self.round_certificate = None; } /// Return the height of the consensus. @@ -370,10 +376,10 @@ where RoundOutput::Vote(vote) => self.lift_vote_output(vote, outputs), - RoundOutput::ScheduleTimeout(timeout) => outputs.push(Output::ScheduleTimeout(timeout)), + RoundOutput::ScheduleTimeout(timeout) => self.lift_timeout_output(timeout, outputs), RoundOutput::GetValueAndScheduleTimeout(height, round, timeout) => { - outputs.push(Output::ScheduleTimeout(timeout)); + self.lift_timeout_output(timeout, outputs); outputs.push(Output::GetValue(height, round, timeout)); } @@ -381,6 +387,18 @@ where } } + fn lift_timeout_output(&mut self, timeout: Timeout, outputs: &mut Vec>) { + if timeout.round < self.round() || self.scheduled_timeouts.contains(&timeout) { + return; + } + debug_assert!( + timeout.is_consensus(), + "lift_timeout_output received {timeout:?}" + ); + self.scheduled_timeouts.push(timeout); + outputs.push(Output::ScheduleTimeout(timeout)); + } + fn lift_vote_output(&mut self, vote: Ctx::Vote, outputs: &mut Vec>) { if vote.validator_address() != self.address() { return; @@ -482,6 +500,10 @@ where // Update the proposer for the new round self.proposer = Some(proposer); + // Remove useless timeouts from previous rounds + self.scheduled_timeouts + .retain(|timeout| timeout.round >= round); + self.apply_input(round, RoundInput::NewRound(round)) } diff --git a/code/crates/core-driver/tests/it/extra.rs b/code/crates/core-driver/tests/it/extra.rs index 3cd1ae5f6..42269666c 100644 --- a/code/crates/core-driver/tests/it/extra.rs +++ b/code/crates/core-driver/tests/it/extra.rs @@ -2980,8 +2980,6 @@ fn polka_nil_and_prevote_step_precommit_nil() { expected_outputs: vec![ prevote_nil_output(Round::new(0), &my_addr), precommit_nil_output(Round::new(0), &my_addr), - start_precommit_timer_output(Round::new(0)), - start_precommit_timer_output(Round::new(0)), ], expected_round: Round::new(0), new_state: precommit_state(Round::new(0)), @@ -3049,7 +3047,6 @@ fn polka_any_and_prevote_step_timeout_prevote() { expected_outputs: vec![ prevote_nil_output(Round::new(0), &my_addr), start_prevote_timer_output(Round::new(0)), - start_precommit_timer_output(Round::new(0)), ], expected_round: Round::new(0), new_state: prevote_state(Round::new(0)), @@ -3117,7 +3114,6 @@ fn polka_value_no_proposal_and_prevote_step_timeout_prevote() { expected_outputs: vec![ prevote_nil_output(Round::new(0), &my_addr), start_prevote_timer_output(Round::new(0)), - start_precommit_timer_output(Round::new(0)), ], expected_round: Round::new(0), new_state: prevote_state(Round::new(0)), @@ -3195,7 +3191,6 @@ fn polka_any_precommit_any_future_round() { expected_outputs: vec![ prevote_nil_output(Round::new(1), &my_addr), start_prevote_timer_output(Round::new(1)), - start_precommit_timer_output(Round::new(1)), ], expected_round: Round::new(1), new_state: prevote_state(Round::new(1)), @@ -3271,8 +3266,6 @@ fn polka_nil_precommit_any_future_round() { expected_outputs: vec![ prevote_nil_output(Round::new(1), &my_addr), precommit_nil_output(Round::new(1), &my_addr), - start_precommit_timer_output(Round::new(1)), - start_precommit_timer_output(Round::new(1)), // :'( ], expected_round: Round::new(1), new_state: precommit_state(Round::new(1)), diff --git a/code/crates/core-types/src/timeout.rs b/code/crates/core-types/src/timeout.rs index da18cbbf0..32d67d45f 100644 --- a/code/crates/core-types/src/timeout.rs +++ b/code/crates/core-types/src/timeout.rs @@ -39,6 +39,14 @@ impl Timeout { Self { round, kind } } + /// Check if this timeout is for a consensus step (propose, prevote, or precommit). + pub fn is_consensus(&self) -> bool { + matches!( + self.kind, + TimeoutKind::Propose | TimeoutKind::Prevote | TimeoutKind::Precommit + ) + } + /// Create a new timeout for the propose step of the given round. pub const fn propose(round: Round) -> Self { Self::new(round, TimeoutKind::Propose)