diff --git a/code/crates/core-consensus/src/handle/decide.rs b/code/crates/core-consensus/src/handle/decide.rs index 866c75305..37eec5e97 100644 --- a/code/crates/core-consensus/src/handle/decide.rs +++ b/code/crates/core-consensus/src/handle/decide.rs @@ -1,8 +1,8 @@ +use std::time::Duration; + use crate::handle::signature::verify_commit_certificate; use crate::prelude::*; -use super::finalize::log_and_finalize; - #[cfg_attr(not(feature = "metrics"), allow(unused_variables))] pub async fn decide( co: &Co, @@ -30,16 +30,16 @@ where .cloned(); // Determine if we have an existing certificate or need to restore one. - let (certificate, extensions, sync_decision) = if let Some(certificate) = existing_certificate { + let (certificate, extensions) = if let Some(certificate) = existing_certificate { // NOTE: Existence implies the decision was reached via Sync protocol. // FIXME: No guarantee vote extensions are found in sync. (CCHAIN-915) - (certificate, VoteExtensions::default(), true) + (certificate, VoteExtensions::default()) } else { // Restore the precommits (removes them from `state`). let mut commits = state.restore_precommits(height, proposal_round, &decided_value); let extensions = extract_vote_extensions(&mut commits); let certificate = CommitCertificate::new(height, proposal_round, decided_id, commits); - (certificate, extensions, false) + (certificate, extensions) }; // The certificate must be valid in Commit step @@ -77,30 +77,17 @@ where Effect::Decide(certificate.clone(), extensions.clone(), Default::default()) ); - let Some(target_time) = state.target_time else { - debug!(%height, "No target time set, finalizing immediately"); - return log_and_finalize(co, state, certificate, extensions).await; - }; - - // FIXME: based on the assumption that a decision reached via Sync protocol implies - // that the configured target_time should not be observed by Malachite. - if sync_decision { - debug!(%height, "Decision via sync, finalizing immediately"); - return log_and_finalize(co, state, certificate, extensions).await; - } - - let elapsed = state - .height_start_time - .expect("height_start_time must be set when target_time is set") - .elapsed(); - - if elapsed >= target_time { - debug!(%height, ?elapsed, ?target_time, "Target time exceeded, finalizing immediately"); - return log_and_finalize(co, state, certificate, extensions).await; - } - - // Time remaining until target time is reached - let remaining = target_time - elapsed; + // Calculate remaining time until target (0 if no target or already exceeded) + let remaining = state + .target_time + .and_then(|target| { + let elapsed = state + .height_start_time + .expect("height_start_time must be set when target_time is set") + .elapsed(); + target.checked_sub(elapsed) + }) + .unwrap_or(Duration::ZERO); // Enter finalization period debug!(%height, ?remaining, "Entering finalization period"); diff --git a/code/crates/core-consensus/src/handle/finalize.rs b/code/crates/core-consensus/src/handle/finalize.rs index f927235cf..c5314c976 100644 --- a/code/crates/core-consensus/src/handle/finalize.rs +++ b/code/crates/core-consensus/src/handle/finalize.rs @@ -25,9 +25,22 @@ where let decided_id = decided_value.id(); + // Get any additional commits collected during finalization period let mut commits = state.restore_precommits(height, proposal_round, &decided_value); let extensions = super::decide::extract_vote_extensions(&mut commits); - let certificate = CommitCertificate::new(height, proposal_round, decided_id, commits); + + // Check for existing certificate (from sync) and extend with additional commits + let certificate = if let Some(existing) = state + .driver + .commit_certificate(proposal_round, &decided_id) + .cloned() + { + // Extend the existing certificate with any additional commits from vote keeper + existing.extend_votes(commits) + } else { + // Build certificate from commits + CommitCertificate::new(height, proposal_round, decided_id, commits) + }; assert!( verify_commit_certificate( @@ -49,7 +62,7 @@ where } /// Emit the Finalize effect with a pre-built certificate and extensions. -pub async fn log_and_finalize( +async fn log_and_finalize( co: &Co, state: &mut State, certificate: CommitCertificate, diff --git a/code/crates/core-types/src/certificate.rs b/code/crates/core-types/src/certificate.rs index 0b0a3f967..c9525a607 100644 --- a/code/crates/core-types/src/certificate.rs +++ b/code/crates/core-types/src/certificate.rs @@ -70,6 +70,35 @@ impl CommitCertificate { commit_signatures, } } + + /// Extend the certificate with additional votes, deduplicating by address. + pub fn extend_votes(mut self, additional_commits: Vec>) -> Self { + // Add new signatures that aren't already present + for vote in additional_commits { + let is_valid = matches!(vote.value(), NilOrVal::Val(id) if id == &self.value_id) + && vote.vote_type() == VoteType::Precommit + && vote.round() == self.round + && vote.height() == self.height; + + if !is_valid { + continue; + } + + // Check if address already exists (linear search, but commit sets are small) + let already_exists = self + .commit_signatures + .iter() + .any(|sig| &sig.address == vote.validator_address()); + + if !already_exists { + self.commit_signatures.push(CommitSignature::new( + vote.validator_address().clone(), + vote.signature, + )); + } + } + self + } } /// Represents a signature for a polka certificate, with the address of the validator that produced it. diff --git a/code/crates/test/tests/it/value_sync.rs b/code/crates/test/tests/it/value_sync.rs index 4c834aba1..10aad3a6d 100644 --- a/code/crates/test/tests/it/value_sync.rs +++ b/code/crates/test/tests/it/value_sync.rs @@ -836,3 +836,39 @@ pub async fn status_update_on_decision() { ) .await } + +#[tokio::test] +pub async fn start_late_with_target_time() { + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT * 2) + .success(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT * 2) + .success(); + + test.add_node() + .with_voting_power(5) + .start_after(1, Duration::from_secs(10)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_params( + Duration::from_secs(30), + TestParams { + enable_value_sync: true, + //target_time: Some(Duration::from_millis(200)), + ..Default::default() + }, + ) + .await +}