Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(code/core-consensus): Reproduce and fix consensus being stuck with byzantine proposer #853

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,18 @@ jobs:
- name: Disable apparmor container restrictions
run: sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0
- name: Run integration tests (discovery)
if: always()
run: |
cargo maelstrom --slots 8 \
--include 'package.equals(informalsystems-malachitebft-discovery-test)'
- name: Run integration tests (Starknet app)
if: always()
run: |
cargo maelstrom --slots 8 \
--include 'package.equals(informalsystems-malachitebft-starknet-test)' \
--exclude 'package.match(informalsystems-malachitebft-starknet-test-mbt)'
- name: Run integration tests (Test app)
if: always()
run: |
cargo maelstrom --slots 8 \
--include 'package.equals(informalsystems-malachitebft-test)' \
Expand Down
11 changes: 9 additions & 2 deletions code/crates/app/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::Path;
use std::time::Duration;

use eyre::Result;
use malachitebft_config::SyncConfig;
use tokio::task::JoinHandle;
use tracing::Span;

Expand All @@ -16,11 +17,11 @@ use malachitebft_engine::util::events::TxEvent;
use malachitebft_engine::wal::{Wal, WalCodec, WalRef};
use malachitebft_network::{Config as NetworkConfig, DiscoveryConfig, GossipSubConfig, Keypair};

use crate::types::config::{Config as NodeConfig, PubSubProtocol, SyncConfig, TransportProtocol};
use crate::types::config::{Config as NodeConfig, PubSubProtocol, TransportProtocol};
use crate::types::core::{Context, SigningProvider};
use crate::types::metrics::{Metrics, SharedRegistry};
use crate::types::sync;
use crate::types::ValuePayload;
use crate::types::{ValuePayload, VoteSyncMode};

pub async fn spawn_node_actor<Ctx>(
ctx: Ctx,
Expand Down Expand Up @@ -92,12 +93,18 @@ where
config::ValuePayload::ProposalAndParts => ValuePayload::ProposalAndParts,
};

let vote_sync_mode = match cfg.consensus.vote_sync.mode {
config::VoteSyncMode::RequestResponse => VoteSyncMode::RequestResponse,
config::VoteSyncMode::Rebroadcast => VoteSyncMode::Rebroadcast,
};

let consensus_params = ConsensusParams {
initial_height,
initial_validator_set,
address,
threshold_params: Default::default(),
value_payload,
vote_sync_mode,
};

Consensus::spawn(
Expand Down
2 changes: 1 addition & 1 deletion code/crates/app/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Re-export of all types required to build a Malachite application.

pub use malachitebft_core_consensus::{
ConsensusMsg, ProposedValue, SignedConsensusMsg, ValuePayload,
ConsensusMsg, ProposedValue, SignedConsensusMsg, ValuePayload, VoteSyncMode,
};
pub use malachitebft_engine::host::LocallyProposedValue;
pub use malachitebft_peer::PeerId;
Expand Down
26 changes: 26 additions & 0 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,29 @@ pub struct ConsensusConfig {

/// P2P configuration options
pub p2p: P2pConfig,

/// VoteSync configuration options
#[serde(default)]
pub vote_sync: VoteSyncConfig,
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct VoteSyncConfig {
/// The mode of vote synchronization
/// - RequestResponse: The lagging node sends a request to a peer for the missing votes
/// - Rebroadcast: Nodes rebroadcast their last vote to all peers
pub mode: VoteSyncMode,
}

/// The mode of vote synchronization
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum VoteSyncMode {
/// The lagging node sends a request to a peer for the missing votes
#[default]
RequestResponse,
/// Nodes rebroadcast their last vote to all peers
Rebroadcast,
}

/// Message types required by consensus to deliver the value being proposed
Expand Down Expand Up @@ -556,6 +579,8 @@ pub struct TestConfig {
pub max_retain_blocks: usize,
#[serde(default)]
pub vote_extensions: VoteExtensionsConfig,
#[serde(default)]
pub is_byzantine_proposer: bool,
}

impl Default for TestConfig {
Expand All @@ -569,6 +594,7 @@ impl Default for TestConfig {
exec_time_per_tx: Duration::from_millis(1),
max_retain_blocks: 1000,
vote_extensions: VoteExtensionsConfig::default(),
is_byzantine_proposer: false,
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::handle::vote::on_vote;
use crate::prelude::*;
use crate::types::SignedConsensusMsg;
use crate::util::pretty::PrettyVal;
use crate::VoteSyncMode;

#[async_recursion]
pub async fn apply_driver_input<Ctx>(
Expand Down Expand Up @@ -267,12 +268,14 @@ where

state.set_last_vote(signed_vote);

let timeout = match vote_type {
VoteType::Prevote => Timeout::prevote_rebroadcast(state.driver.round()),
VoteType::Precommit => Timeout::precommit_rebroadcast(state.driver.round()),
};
if state.params.vote_sync_mode == VoteSyncMode::Rebroadcast {
let timeout = match vote_type {
VoteType::Prevote => Timeout::prevote_rebroadcast(state.driver.round()),
VoteType::Precommit => Timeout::precommit_rebroadcast(state.driver.round()),
};

perform!(co, Effect::ScheduleTimeout(timeout, Default::default()));
perform!(co, Effect::ScheduleTimeout(timeout, Default::default()));
}
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::*;
use crate::{prelude::*, VoteSyncMode};

#[cfg_attr(not(feature = "metrics"), allow(unused_variables))]
pub async fn on_rebroadcast_timeout<Ctx>(
Expand All @@ -10,6 +10,10 @@ pub async fn on_rebroadcast_timeout<Ctx>(
where
Ctx: Context,
{
if state.params.vote_sync_mode != VoteSyncMode::Rebroadcast {
return Ok(());
}

let (height, round) = (state.driver.height(), state.driver.round());

let (maybe_vote, timeout) = match timeout.kind {
Expand Down
2 changes: 1 addition & 1 deletion code/crates/core-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod error;
pub use error::Error;

mod params;
pub use params::{Params, ThresholdParams};
pub use params::{Params, ThresholdParams, VoteSyncMode};

mod effect;
pub use effect::{Effect, Resumable, Resume};
Expand Down
13 changes: 13 additions & 0 deletions code/crates/core-consensus/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,17 @@ pub struct Params<Ctx: Context> {

/// The messages required to deliver proposals
pub value_payload: ValuePayload,

/// The VoteSync mode
pub vote_sync_mode: VoteSyncMode,
}

/// The mode of vote synchronization
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum VoteSyncMode {
/// The lagging node sends a request to a peer for the missing votes
#[default]
RequestResponse,
/// Nodes rebroadcast their last vote to all peers
Rebroadcast,
}
2 changes: 1 addition & 1 deletion code/crates/core-types/src/signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
/// It is parameterized by a context type `Ctx` that defines the specific types used
/// for votes, proposals, and other consensus-related data structures.
///
/// Implementors of this trait are responsible for managing the private keys used for signing
/// Implementers of this trait are responsible for managing the private keys used for signing
/// and providing verification logic using the corresponding public keys.
pub trait SigningProvider<Ctx>
where
Expand Down
3 changes: 2 additions & 1 deletion code/crates/core-votekeeper/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ where
conflicting,
}) => {
// This is an equivocating vote
self.evidence.add(existing, conflicting);
self.evidence.add(existing.clone(), conflicting);
//panic!("Equivocating vote {:?}, existing {:?}", &vote, &existing);
return None;
}
}
Expand Down
7 changes: 3 additions & 4 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, error, error_span, info, warn};
use malachitebft_codec as codec;
use malachitebft_config::TimeoutConfig;
use malachitebft_core_consensus::{
Effect, PeerId, Resumable, Resume, SignedConsensusMsg, VoteExtensionError,
Effect, PeerId, Resumable, Resume, SignedConsensusMsg, VoteExtensionError, VoteSyncMode,
};
use malachitebft_core_types::{
Context, Round, SigningProvider, SigningProviderExt, Timeout, TimeoutKind, ValidatorSet,
Expand Down Expand Up @@ -977,9 +977,8 @@ where

Effect::Rebroadcast(msg, r) => {
// Rebroadcast last vote only if sync is not enabled, otherwise vote set requests are issued.
// TODO - there is currently no easy access to the sync configuration. In addition there is
// a single configuration for both value and vote sync.
if self.sync.is_none() {
// TODO: There is a single configuration for both value and vote sync.
if self.params.vote_sync_mode == VoteSyncMode::Rebroadcast {
// Notify any subscribers that we are about to rebroadcast a message
self.tx_event.send(|| Event::Rebroadcast(msg.clone()));

Expand Down
2 changes: 2 additions & 0 deletions code/crates/starknet/host/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::path::{Path, PathBuf};
use std::time::Duration;

use libp2p_identity::ecdsa;
use malachitebft_core_consensus::VoteSyncMode;
use tokio::task::JoinHandle;
use tracing::warn;

Expand Down Expand Up @@ -171,6 +172,7 @@ async fn spawn_consensus_actor(
address,
threshold_params: Default::default(),
value_payload: ValuePayload::PartsOnly,
vote_sync_mode: VoteSyncMode::Rebroadcast,
};

Consensus::spawn(
Expand Down
3 changes: 3 additions & 0 deletions code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ impl TestRunner {
moniker: format!("node-{}", node),
logging: LoggingConfig::default(),
consensus: ConsensusConfig {
vote_sync: VoteSyncConfig {
mode: VoteSyncMode::Rebroadcast,
},
timeouts: TimeoutConfig::default(),
p2p: P2pConfig {
transport,
Expand Down
17 changes: 13 additions & 4 deletions code/crates/test/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,23 @@ pub async fn run(
// we send back the very same value.
let proposal = match state.get_previously_built_value(height, round).await? {
Some(proposal) => {
info!(value = %proposal.value.id(), "Re-using previously built value");
proposal
let new_proposal = state.propose_value(height, round).await?;
error!(
"XXX Not Re-using previously built value {:} but a new one {:}",
proposal.value.id(),
new_proposal.value.id()
);
new_proposal
}
None => {
// If we have not previously built a value for that very same height and round,
// we need to create a new value to propose and send it back to consensus.
info!("Building a new value to propose");
state.propose_value(height, round).await?
let proposal = state.propose_value(height, round).await?;
error!(
"XXX Building a new value to propose {:}",
proposal.value.id()
);
proposal
}
};

Expand Down
8 changes: 7 additions & 1 deletion code/crates/test/app/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ impl State {
store: Store,
signing_provider: Ed25519Provider,
) -> Self {
let rng = if config.test.is_byzantine_proposer {
StdRng::from_entropy()
} else {
StdRng::seed_from_u64(seed_from_address(&address))
};

Self {
ctx,
config,
Expand All @@ -80,7 +86,7 @@ impl State {
current_round: Round::new(0),
current_proposer: None,
streams_map: PartStreamsMap::new(),
rng: StdRng::seed_from_u64(seed_from_address(&address)),
rng,
peers: HashSet::new(),
}
}
Expand Down
3 changes: 3 additions & 0 deletions code/crates/test/cli/src/cmd/distributed_testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ fn generate_distributed_config(
Config {
moniker: format!("test-{}", index),
consensus: ConsensusConfig {
vote_sync: VoteSyncConfig {
mode: VoteSyncMode::RequestResponse,
},
timeouts: TimeoutConfig::default(),
p2p: P2pConfig {
protocol: PubSubProtocol::default(),
Expand Down
3 changes: 3 additions & 0 deletions code/crates/test/cli/src/new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub fn generate_config(
Config {
moniker: format!("test-{}", index),
consensus: ConsensusConfig {
vote_sync: VoteSyncConfig {
mode: VoteSyncMode::RequestResponse,
},
timeouts: TimeoutConfig::default(),
p2p: P2pConfig {
protocol: PubSubProtocol::default(),
Expand Down
7 changes: 7 additions & 0 deletions code/crates/test/framework/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ where
pub voting_power: VotingPower,
pub start_height: Ctx::Height,
pub start_delay: Duration,
pub is_byzantine_proposer: bool,
pub steps: Vec<Step<Ctx, State>>,
pub state: State,
}
Expand All @@ -63,6 +64,7 @@ where
voting_power: 1,
start_height: Ctx::Height::INITIAL,
start_delay: Duration::from_secs(0),
is_byzantine_proposer: false,
steps: vec![],
state,
}
Expand All @@ -78,6 +80,11 @@ where
self
}

pub fn byzantine_proposer(&mut self) -> &mut Self {
self.is_byzantine_proposer = true;
self
}

pub fn start(&mut self) -> &mut Self {
self.start_at(1)
}
Expand Down
8 changes: 7 additions & 1 deletion code/crates/test/framework/src/params.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use bytesize::ByteSize;
use malachitebft_config::{Config, PubSubProtocol, ValuePayload};
use malachitebft_config::{Config, PubSubProtocol, ValuePayload, VoteSyncMode};

#[derive(Copy, Clone, Debug)]
pub struct TestParams {
Expand All @@ -14,6 +14,7 @@ pub struct TestParams {
pub value_payload: ValuePayload,
pub max_retain_blocks: usize,
pub timeout_step: Duration,
pub vote_sync_mode: Option<VoteSyncMode>,
}

impl Default for TestParams {
Expand All @@ -28,6 +29,7 @@ impl Default for TestParams {
value_payload: ValuePayload::PartsOnly,
max_retain_blocks: 50,
timeout_step: Duration::from_secs(30),
vote_sync_mode: None,
}
}
}
Expand All @@ -44,5 +46,9 @@ impl TestParams {
config.test.vote_extensions.enabled = self.vote_extensions.is_some();
config.test.vote_extensions.size = self.vote_extensions.unwrap_or_default();
config.test.max_retain_blocks = self.max_retain_blocks;

if let Some(vote_sync_mode) = self.vote_sync_mode {
config.consensus.vote_sync.mode = vote_sync_mode;
}
}
}
Loading
Loading