Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions code/crates/core-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ where
/// The first element of the tuple is the round at which that input has been emitted.
pending_inputs: Vec<(Round, RoundInput<Ctx>)>,

/// The set of timeouts already scheduled by the consensus logic.
/// Intended to avoid scheduling the same timeout multiple times.
scheduled_timeouts: Vec<Timeout>,

last_prevote: Option<Ctx::Vote>,
last_precommit: Option<Ctx::Vote>,

Expand Down Expand Up @@ -98,6 +102,7 @@ where
round_state,
proposer: None,
pending_inputs: vec![],
scheduled_timeouts: vec![],
commit_certificates: vec![],
polka_certificates: vec![],
last_prevote: None,
Expand All @@ -119,7 +124,6 @@ where
// Reset the round state
let round_state = RoundState::new(height, Round::Nil);
self.round_state = round_state;
self.round_certificate = None;

// Reset the proposal keeper
let proposal_keeper = ProposalKeeper::new();
Expand All @@ -130,8 +134,10 @@ where
self.polka_certificates = vec![];

// Reset additional internal state
self.scheduled_timeouts.clear();
self.last_prevote = None;
self.last_precommit = None;
self.round_certificate = None;
}

/// Return the height of the consensus.
Expand Down Expand Up @@ -370,17 +376,29 @@ where

RoundOutput::Vote(vote) => self.lift_vote_output(vote, outputs),

RoundOutput::ScheduleTimeout(timeout) => outputs.push(Output::ScheduleTimeout(timeout)),
RoundOutput::ScheduleTimeout(timeout) => self.lift_timeout_output(timeout, outputs),

RoundOutput::GetValueAndScheduleTimeout(height, round, timeout) => {
outputs.push(Output::ScheduleTimeout(timeout));
self.lift_timeout_output(timeout, outputs);
outputs.push(Output::GetValue(height, round, timeout));
}

RoundOutput::Decision(round, proposal) => outputs.push(Output::Decide(round, proposal)),
}
}

fn lift_timeout_output(&mut self, timeout: Timeout, outputs: &mut Vec<Output<Ctx>>) {
if timeout.round < self.round() || self.scheduled_timeouts.contains(&timeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Can this ever be true timeout.round < self.round()? Timeouts should only be for the current round. If purely defensive, I'd add a warn! here so we notice if it ever triggers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

return;
}
debug_assert!(
timeout.is_consensus(),
"lift_timeout_output received {timeout:?}"
);
self.scheduled_timeouts.push(timeout);
outputs.push(Output::ScheduleTimeout(timeout));
}

fn lift_vote_output(&mut self, vote: Ctx::Vote, outputs: &mut Vec<Output<Ctx>>) {
if vote.validator_address() != self.address() {
return;
Expand Down Expand Up @@ -482,6 +500,10 @@ where
// Update the proposer for the new round
self.proposer = Some(proposer);

// Remove useless timeouts from previous rounds
self.scheduled_timeouts
.retain(|timeout| timeout.round >= round);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here, could this just be clear()? It is minor ofc, I just want to understand if we expect to see any future rounds here, since I don't see how

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a good point. But the concern here is to limit the growth of this set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can schedule timeouts for future rounds - otherwise, clear() is the way to go.


self.apply_input(round, RoundInput::NewRound(round))
}

Expand Down
7 changes: 0 additions & 7 deletions code/crates/core-driver/tests/it/extra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2980,8 +2980,6 @@ fn polka_nil_and_prevote_step_precommit_nil() {
expected_outputs: vec![
prevote_nil_output(Round::new(0), &my_addr),
precommit_nil_output(Round::new(0), &my_addr),
start_precommit_timer_output(Round::new(0)),
start_precommit_timer_output(Round::new(0)),
],
expected_round: Round::new(0),
new_state: precommit_state(Round::new(0)),
Expand Down Expand Up @@ -3049,7 +3047,6 @@ fn polka_any_and_prevote_step_timeout_prevote() {
expected_outputs: vec![
prevote_nil_output(Round::new(0), &my_addr),
start_prevote_timer_output(Round::new(0)),
start_precommit_timer_output(Round::new(0)),
],
expected_round: Round::new(0),
new_state: prevote_state(Round::new(0)),
Expand Down Expand Up @@ -3117,7 +3114,6 @@ fn polka_value_no_proposal_and_prevote_step_timeout_prevote() {
expected_outputs: vec![
prevote_nil_output(Round::new(0), &my_addr),
start_prevote_timer_output(Round::new(0)),
start_precommit_timer_output(Round::new(0)),
],
expected_round: Round::new(0),
new_state: prevote_state(Round::new(0)),
Expand Down Expand Up @@ -3195,7 +3191,6 @@ fn polka_any_precommit_any_future_round() {
expected_outputs: vec![
prevote_nil_output(Round::new(1), &my_addr),
start_prevote_timer_output(Round::new(1)),
start_precommit_timer_output(Round::new(1)),
],
expected_round: Round::new(1),
new_state: prevote_state(Round::new(1)),
Expand Down Expand Up @@ -3271,8 +3266,6 @@ fn polka_nil_precommit_any_future_round() {
expected_outputs: vec![
prevote_nil_output(Round::new(1), &my_addr),
precommit_nil_output(Round::new(1), &my_addr),
start_precommit_timer_output(Round::new(1)),
start_precommit_timer_output(Round::new(1)), // :'(
],
expected_round: Round::new(1),
new_state: precommit_state(Round::new(1)),
Expand Down
8 changes: 8 additions & 0 deletions code/crates/core-types/src/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ impl Timeout {
Self { round, kind }
}

/// Check if this timeout is for a consensus step (propose, prevote, or precommit).
pub fn is_consensus(&self) -> bool {
matches!(
self.kind,
TimeoutKind::Propose | TimeoutKind::Prevote | TimeoutKind::Precommit
)
}

/// Create a new timeout for the propose step of the given round.
pub const fn propose(round: Round) -> Self {
Self::new(round, TimeoutKind::Propose)
Expand Down