diff --git a/code/Cargo.lock b/code/Cargo.lock index 34ded7422..12ff4d43c 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2147,6 +2147,22 @@ dependencies = [ "toml", ] +[[package]] +name = "informalsystems-malachitebft-consensus-async" +version = "0.0.1" +dependencies = [ + "async-trait", + "derive-where", + "humantime-serde", + "informalsystems-malachitebft-core-consensus", + "informalsystems-malachitebft-core-types", + "informalsystems-malachitebft-sync", + "serde", + "thiserror 2.0.11", + "tokio", + "tracing", +] + [[package]] name = "informalsystems-malachitebft-core-consensus" version = "0.0.1" diff --git a/code/Cargo.toml b/code/Cargo.toml index 7ba6c9cb8..0eb3d0798 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -11,6 +11,7 @@ members = [ "crates/core-state-machine", "crates/core-types", "crates/core-votekeeper", + "crates/consensus-async", "crates/engine", "crates/metrics", "crates/network", @@ -72,6 +73,7 @@ malachitebft-core-driver = { version = "0.0.1", package = "informalsystem malachitebft-core-state-machine = { version = "0.0.1", package = "informalsystems-malachitebft-core-state-machine", path = "crates/core-state-machine" } malachitebft-core-types = { version = "0.0.1", package = "informalsystems-malachitebft-core-types", path = "crates/core-types" } malachitebft-core-votekeeper = { version = "0.0.1", package = "informalsystems-malachitebft-core-votekeeper", path = "crates/core-votekeeper" } +malachitebft-consensus-async = { version = "0.0.1", package = "informalsystems-malachitebft-consensus-async", path = "crates/consensus-async" } malachitebft-discovery = { version = "0.0.1", package = "informalsystems-malachitebft-discovery", path = "crates/discovery" } malachitebft-network = { version = "0.0.1", package = "informalsystems-malachitebft-network", path = "crates/network" } malachitebft-metrics = { version = "0.0.1", package = "informalsystems-malachitebft-metrics", path = "crates/metrics" } diff --git a/code/crates/app/src/types.rs b/code/crates/app/src/types.rs index 8867044fb..97a7e78d6 100644 --- a/code/crates/app/src/types.rs +++ b/code/crates/app/src/types.rs @@ -1,6 +1,6 @@ //! Re-export of all types required to build a Malachite application. -pub use malachitebft_core_consensus::{ +pub use malachitebft_core_consensus::types::{ ConsensusMsg, ProposedValue, SignedConsensusMsg, ValuePayload, }; pub use malachitebft_engine::host::LocallyProposedValue; diff --git a/code/crates/consensus-async/Cargo.toml b/code/crates/consensus-async/Cargo.toml new file mode 100644 index 000000000..fb31a3fd9 --- /dev/null +++ b/code/crates/consensus-async/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "informalsystems-malachitebft-consensus-async" +description = "High-level async consensus API for the Malachite BFT consensus engine" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true +rust-version.workspace = true +readme = "../../../README.md" + +[package.metadata.docs.rs] +all-features = false +rustdoc-args = ["--cfg", "docsrs"] + +[features] +serde = ["dep:serde", "dep:humantime-serde"] + +[dependencies] +malachitebft-core-types = { workspace = true } +malachitebft-core-consensus = { workspace = true } +malachitebft-sync = { workspace = true } + +async-trait = { workspace = true } +derive-where = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true, optional = true } +humantime-serde = { workspace = true, optional = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/code/crates/consensus-async/src/config.rs b/code/crates/consensus-async/src/config.rs new file mode 100644 index 000000000..3fdb7e624 --- /dev/null +++ b/code/crates/consensus-async/src/config.rs @@ -0,0 +1,82 @@ +use std::time::Duration; + +use malachitebft_core_types::TimeoutKind; + +/// Timeouts +#[derive(Copy, Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Timeouts { + /// How long we wait for a proposal block before prevoting nil + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub propose: Duration, + + /// How much timeout_propose increases with each round + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub propose_delta: Duration, + + /// How long we wait after receiving +2/3 prevotes for “anything” (ie. not a single block or nil) + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub prevote: Duration, + + /// How much the timeout_prevote increases with each round + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub prevote_delta: Duration, + + /// How long we wait after receiving +2/3 precommits for “anything” (ie. not a single block or nil) + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub precommit: Duration, + + /// How much the timeout_precommit increases with each round + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub precommit_delta: Duration, + + /// How long we wait after committing a block, before starting on the new + /// height (this gives us a chance to receive some more precommits, even + /// though we already have +2/3). + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub commit: Duration, + + /// How long we stay in preovte or precommit steps before starting + /// the vote synchronization protocol. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub step: Duration, +} + +impl Timeouts { + pub fn timeout_duration(&self, kind: TimeoutKind) -> Duration { + match kind { + TimeoutKind::Propose => self.propose, + TimeoutKind::Prevote => self.prevote, + TimeoutKind::Precommit => self.precommit, + TimeoutKind::Commit => self.commit, + TimeoutKind::PrevoteTimeLimit => self.step, + TimeoutKind::PrecommitTimeLimit => self.step, + } + } + + pub fn delta_duration(&self, step: TimeoutKind) -> Option { + match step { + TimeoutKind::Propose => Some(self.propose_delta), + TimeoutKind::Prevote => Some(self.prevote_delta), + TimeoutKind::Precommit => Some(self.precommit_delta), + TimeoutKind::Commit => None, + TimeoutKind::PrevoteTimeLimit => None, + TimeoutKind::PrecommitTimeLimit => None, + } + } +} + +impl Default for Timeouts { + fn default() -> Self { + Self { + propose: Duration::from_secs(3), + propose_delta: Duration::from_millis(500), + prevote: Duration::from_secs(1), + prevote_delta: Duration::from_millis(500), + precommit: Duration::from_secs(1), + precommit_delta: Duration::from_millis(500), + commit: Duration::from_secs(0), + step: Duration::from_secs(30), + } + } +} diff --git a/code/crates/consensus-async/src/lib.rs b/code/crates/consensus-async/src/lib.rs new file mode 100644 index 000000000..7335abecd --- /dev/null +++ b/code/crates/consensus-async/src/lib.rs @@ -0,0 +1,313 @@ +#![cfg_attr(docsrs, feature(doc_cfg))] + +use config::Timeouts; +use derive_where::derive_where; +use tokio::sync::{mpsc, oneshot}; + +use malachitebft_core_consensus::{Effect, Params, Resumable, Resume}; + +pub use malachitebft_core_consensus::types; +pub use malachitebft_core_consensus::{process, Error as ConsensusError, Input, State}; + +pub mod config; +pub mod query; + +use query::*; +use types::*; + +mod timers; +use timers::Timers; + +#[derive(thiserror::Error)] +#[derive_where(Debug)] +pub enum Error +where + Ctx: Context, +{ + #[error("Consensus error: {0}")] + Consensus(#[from] ConsensusError), + + #[error("Send error")] + SendError(Input), +} + +pub struct Handle +where + Ctx: Context, +{ + capacity: usize, + tx_input: mpsc::Sender<(Input, mpsc::Sender>)>, +} + +impl Handle +where + Ctx: Context, +{ + pub async fn process( + &self, + input: Input, + ) -> Result>, Error> { + let (tx_query, rx_query) = mpsc::channel(self.capacity); + + self.tx_input + .send((input, tx_query)) + .await + .map_err(|e| Error::SendError(e.0 .0))?; + + Ok(rx_query) + } +} + +pub struct Consensus +where + Ctx: Context, +{ + ctx: Ctx, + state: State, + timers: Timers, + timeouts: Timeouts, + metrics: Metrics, + rx_input: mpsc::Receiver<(Input, mpsc::Sender>)>, +} + +impl Consensus +where + Ctx: Context, +{ + pub fn new( + ctx: Ctx, + params: Params, + timeouts: Timeouts, + capacity: usize, + ) -> (Self, Handle) { + let (tx_input, rx_input) = mpsc::channel(capacity); + + ( + Self { + ctx: ctx.clone(), + state: State::new(ctx, params), + timers: Timers::new(), + timeouts, + metrics: Metrics::default(), + rx_input, + }, + Handle { capacity, tx_input }, + ) + } + + pub async fn run(mut self) { + let mut timer_elapsed = self.timers.subscribe(); + + loop { + tokio::select! { + Ok(elapsed) = timer_elapsed.recv() => { + let Some(timeout) = self.timers.intercept_timer_msg(elapsed) else { + continue; + }; + + let (tx_query, _) = mpsc::channel(1); + let input = Input::TimeoutElapsed(timeout); + if let Err(e) = self.process(input, tx_query).await { + tracing::error!("Error: {e}"); + } + } + + input = self.rx_input.recv() => { + match input { + Some((input, tx_query)) => { + if let Err(e) = self.process(input, tx_query).await { + tracing::error!("Error: {e}"); + } + } + None => break, + } + + } + } + } + } + + pub async fn process( + &mut self, + input: Input, + tx_query: mpsc::Sender>, + ) -> Result<(), Error> { + process!( + input: input, + state: &mut self.state, + metrics: &self.metrics, + with: effect => handle_effect( + &self.ctx, + effect, + &mut self.timers, + &self.timeouts, + &tx_query + ).await + ) + } +} + +macro_rules! query { + ($tx_query:expr, $resume:expr, $map:expr, $query:expr) => {{ + let (reply, rx) = oneshot::channel(); + $tx_query.send($query(reply).into()).await.unwrap(); + let result = rx.await.unwrap(); + Ok($resume.resume_with($map(result))) + }}; + + ($tx_query:expr, $resume:expr, $query:expr) => {{ + query!($tx_query, $resume, |x| x, $query) + }}; +} + +async fn handle_effect( + ctx: &Ctx, + effect: Effect, + timers: &mut Timers, + timeouts: &Timeouts, + tx_query: &mpsc::Sender>, +) -> Result, Error> +where + Ctx: Context, +{ + match effect { + // Timers + Effect::ResetTimeouts(resume) => { + // TODO + Ok(resume.resume_with(())) + } + + Effect::CancelAllTimeouts(resume) => { + timers.cancel_all(); + Ok(resume.resume_with(())) + } + + Effect::CancelTimeout(timeout, resume) => { + timers.cancel(&timeout); + Ok(resume.resume_with(())) + } + + Effect::ScheduleTimeout(timeout, resume) => { + let duration = timeouts.timeout_duration(timeout.kind); + timers.start(timeout, duration); + + Ok(resume.resume_with(())) + } + + // Consensus + Effect::StartRound(height, round, proposer, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::StartRound( + height, round, proposer, reply + )) + } + Effect::Publish(signed_consensus_msg, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::Publish( + signed_consensus_msg, + reply + )) + } + Effect::GetValue(height, round, timeout, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::GetValue( + height, round, timeout, reply + )) + } + Effect::RestreamValue(height, round, pol_round, proposer, id, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::RestreamValue( + height, round, pol_round, proposer, id, reply + )) + } + Effect::GetValidatorSet(height, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::GetValidatorSet( + height, reply + )) + } + Effect::Decide(certificate, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::Decide( + certificate, + reply + )) + } + + // Vote Sync + Effect::GetVoteSet(height, round, resume) => { + query!(tx_query, resume, |reply| SyncQuery::GetVoteSet( + height, round, reply + )) + } + Effect::SendVoteSetResponse(request_id, height, round, vote_set, resume) => { + query!(tx_query, resume, |reply| { + SyncQuery::SendVoteSetResponse(request_id, height, round, vote_set, reply) + }) + } + + // WAL + Effect::WalAppendMessage(msg, resume) => { + query!(tx_query, resume, |reply| WalQuery::AppendMessage( + msg, reply + )) + } + Effect::WalAppendTimeout(timeout, resume) => { + query!(tx_query, resume, |reply| WalQuery::AppendTimeout( + timeout, reply + )) + } + + // Signing + Effect::SignVote(vote, resume) => { + // query!(tx_query, resume, |reply| SigningQuery::SignVote( + // vote, reply + // )) + + Ok(resume.resume_with(ctx.signing_provider().sign_vote(vote))) + } + Effect::SignProposal(proposal, resume) => { + // query!(tx_query, resume, |reply| SigningQuery::SignProposal( + // proposal, reply + // )) + + Ok(resume.resume_with(ctx.signing_provider().sign_proposal(proposal))) + } + Effect::VerifySignature(msg, public_key, resume) => { + // query!(tx_query, resume, |reply| { + // SigningQuery::VerifySignature(msg, public_key, reply) + // }) + + match msg.message { + ConsensusMsg::Vote(vote) => { + let valid = ctx.signing_provider().verify_signed_vote( + &vote, + &msg.signature, + &public_key, + ); + + Ok(resume.resume_with(valid.into())) + } + ConsensusMsg::Proposal(proposal) => { + let valid = ctx.signing_provider().verify_signed_proposal( + &proposal, + &msg.signature, + &public_key, + ); + + Ok(resume.resume_with(valid.into())) + } + } + } + Effect::VerifyCertificate(certificate, validator_set, threshold_params, resume) => { + // query!(tx_query, resume, |reply| SigningQuery::VerifyCertificate( + // certificate, + // validator_set, + // threshold_params, + // reply + // )) + + let result = ctx.signing_provider().verify_certificate( + &certificate, + &validator_set, + threshold_params, + ); + + Ok(resume.resume_with(result)) + } + } +} diff --git a/code/crates/consensus-async/src/query.rs b/code/crates/consensus-async/src/query.rs new file mode 100644 index 000000000..36f24438c --- /dev/null +++ b/code/crates/consensus-async/src/query.rs @@ -0,0 +1,170 @@ +use derive_where::derive_where; +use tokio::sync::oneshot; + +use malachitebft_core_consensus::types::*; + +pub type Reply = oneshot::Sender; + +#[must_use] +#[derive_where(Debug)] +pub enum Query +where + Ctx: Context, +{ + Consensus(ConsensusQuery), + Sync(SyncQuery), + Wal(WalQuery), + // Timers(TimersQuery), + // Signing(SigningQuery), +} + +impl From> for Query { + fn from(query: ConsensusQuery) -> Self { + Self::Consensus(query) + } +} + +impl From> for Query { + fn from(query: SyncQuery) -> Self { + Self::Sync(query) + } +} + +impl From> for Query { + fn from(query: WalQuery) -> Self { + Self::Wal(query) + } +} + +// impl From for Query { +// fn from(query: TimersQuery) -> Self { +// Self::Timers(query) +// } +// } +// +// impl From> for Query { +// fn from(query: SigningQuery) -> Self { +// Self::Signing(query) +// } +// } + +#[must_use] +#[derive_where(Debug)] +pub enum ConsensusQuery +where + Ctx: Context, +{ + /// Consensus is starting a new round with the given proposer + StartRound(Ctx::Height, Round, Ctx::Address, Reply<()>), + + /// Get the validator set at the given height + GetValidatorSet(Ctx::Height, Reply>), + + /// Publish a message to peers + Publish(SignedConsensusMsg, Reply<()>), + + /// Requests the application to build a value for consensus to run on. + /// + /// Because this operation may be asynchronous, this effect does not expect a resumption + /// with a value, rather the application is expected to propose a value within the timeout duration. + /// + /// The application MUST eventually feed a [`Propose`][crate::Input::Propose] + /// input to consensus within the specified timeout duration. + GetValue(Ctx::Height, Round, Timeout, Reply<()>), + + /// Requests the application to re-stream a proposal that it has already seen. + /// + /// The application MUST re-publish again to its pwers all + /// the proposal parts pertaining to that value. + RestreamValue( + /// Height of the value + Ctx::Height, + /// Round of the value + Round, + /// Valid round of the value + Round, + /// Address of the proposer for that value + Ctx::Address, + /// Value ID of the value to restream + ValueId, + /// For resumption + Reply<()>, + ), + + /// Notifies the application that consensus has decided on a value. + /// + /// This message includes a commit certificate containing the ID of + /// the value that was decided on, the height and round at which it was decided, + /// and the aggregated signatures of the validators that committed to it. + Decide(CommitCertificate, Reply<()>), +} + +#[must_use] +#[derive_where(Debug)] +pub enum SyncQuery +where + Ctx: Context, +{ + /// Consensus has been stuck in Prevote or Precommit step, ask for vote sets from peers + GetVoteSet(Ctx::Height, Round, Reply<()>), + + /// A peer has required our vote set, send the response + SendVoteSetResponse(RequestId, Ctx::Height, Round, VoteSet, Reply<()>), +} + +#[must_use] +#[derive_where(Debug)] +pub enum WalQuery +where + Ctx: Context, +{ + /// Append a consensus message to the Write-Ahead Log for crash recovery + AppendMessage(SignedConsensusMsg, Reply<()>), + + /// Append a timeout to the Write-Ahead Log for crash recovery + AppendTimeout(Timeout, Reply<()>), +} + +// #[must_use] +// #[derive(Debug)] +// pub enum TimersQuery { +// /// Reset all timeouts to their initial values +// ResetTimeouts(Reply<()>), +// +// /// Cancel all outstanding timeouts +// CancelAllTimeouts(Reply<()>), +// +// /// Cancel a given timeout +// CancelTimeout(Timeout, Reply<()>), +// +// /// Schedule a timeout +// ScheduleTimeout(Timeout, Reply<()>), +// } + +// #[must_use] +// #[derive_where(Debug)] +// pub enum SigningQuery +// where +// Ctx: Context, +// { +// /// Sign a vote with this node's private key +// SignVote(Ctx::Vote, Reply>), +// +// /// Sign a proposal with this node's private key +// SignProposal(Ctx::Proposal, Reply>), +// +// /// Verify a signature +// VerifySignature( +// SignedMessage>, +// PublicKey, +// Reply, +// ), +// +// /// Verify a commit certificate +// VerifyCertificate( +// CommitCertificate, +// Ctx::ValidatorSet, +// ThresholdParams, +// Reply>>, +// ), +// } diff --git a/code/crates/consensus-async/src/timers.rs b/code/crates/consensus-async/src/timers.rs new file mode 100644 index 000000000..b00a3ddd3 --- /dev/null +++ b/code/crates/consensus-async/src/timers.rs @@ -0,0 +1,294 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt::Debug; +use std::hash::Hash; +use std::ops::RangeFrom; +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::broadcast; +use tokio::task::JoinHandle; +use tracing::trace; + +#[derive(Debug)] +struct Timer { + /// Message to give to the actor when the timer expires + key: Key, + + // Task that will notify the actor that the timer has elapsed + task: JoinHandle<()>, + + /// Generation counter to the timer to check if we received a timeout + /// message from an old timer that was enqueued in mailbox before canceled + generation: u64, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct TimeoutElapsed { + key: Key, + generation: u64, +} + +#[derive(Debug)] +pub struct Timers +where + Key: Clone + Eq + Hash + Send + 'static, +{ + tx: Arc>>, + timers: HashMap>, + generations: RangeFrom, +} + +impl Timers +where + Key: Clone + Eq + Hash + Send + 'static, +{ + pub fn new() -> Self { + let (tx, _) = broadcast::channel(1); + + Self { + tx: Arc::new(tx), + timers: HashMap::new(), + generations: 1.., + } + } + + pub fn subscribe(&self) -> broadcast::Receiver> { + self.tx.subscribe() + } + + /// Start a timer that will send `msg` once to the actor after the given `timeout`. + /// + /// Each timer has a key and if a new timer with same key is started + /// the previous is cancelled. + /// + /// # Warning + /// It is NOT guaranteed that a message from the previous timer is not received, + /// as it could already be enqueued in the mailbox when the new timer was started. + /// + /// When the actor receives a timeout message for timer from the scheduler, it should + /// check if the timer is still active by calling [`TimerScheduler::intercept_timer_msg`] + /// and ignore the message otherwise. + pub fn start(&mut self, key: Key, timeout: Duration) + where + Key: Clone + Send + 'static, + { + self.cancel(&key); + + let generation = self + .generations + .next() + .expect("generation counter overflowed"); + + let task = { + let key = key.clone(); + let tx = Arc::clone(&self.tx); + + tokio::spawn(async move { + tokio::time::sleep(timeout).await; + let _ = tx.send(TimeoutElapsed { key, generation }); + }) + }; + + self.timers.insert( + key.clone(), + Timer { + key, + task, + generation, + }, + ); + } + + /// Cancel a timer with a given `key`. + /// + /// If canceling a timer that was already canceled, or key never was used to start a timer + /// this operation will do nothing. + /// + /// # Warning + /// It is NOT guaranteed that a message from a canceled timer, including its previous incarnation + /// for the same key, will not be received by the actor, as the message might already + /// be enqueued in the mailbox when cancel is called. + /// + /// When the actor receives a timeout message for timer from the scheduler, it should + /// check if the timer is still active by calling [`TimerScheduler::intercept_timer_msg`] + /// and ignore the message otherwise. + pub fn cancel(&mut self, key: &Key) { + if let Some(timer) = self.timers.remove(key) { + timer.task.abort(); + } + } + + /// Cancel all timers. + pub fn cancel_all(&mut self) { + self.timers.drain().for_each(|(_, timer)| { + timer.task.abort(); + }); + } + + /// Intercepts a timer message and checks the state of the timer associated with the provided `timer_msg`: + /// + /// 1. If the timer message was from canceled timer that was already enqueued in mailbox, returns `None`. + /// 2. If the timer message was from an old timer that was enqueued in mailbox before being canceled, returns `None`. + /// 3. Otherwise it is a valid timer message, returns the associated `Key` wrapped in `Some`. + pub fn intercept_timer_msg(&mut self, timer_msg: TimeoutElapsed) -> Option + where + Key: Debug, + { + match self.timers.entry(timer_msg.key) { + // The timer message was from canceled timer that was already enqueued in mailbox + Entry::Vacant(entry) => { + let key = entry.key(); + trace!("Received timer {key:?} that has been removed, discarding"); + None + } + + // The timer message was from an old timer that was enqueued in mailbox before being canceled + Entry::Occupied(entry) if timer_msg.generation != entry.get().generation => { + let (key, timer) = (entry.key(), entry.get()); + + trace!( + "Received timer {key:?} from old generation {}, expected generation {}, discarding", + timer_msg.generation, + timer.generation, + ); + + None + } + + // Valid timer message + Entry::Occupied(entry) => { + let timer = entry.remove(); + Some(timer.key) + } + } + } +} + +impl Default for Timers +where + Key: Clone + Eq + Hash + Send + 'static, +{ + fn default() -> Self { + Self::new() + } +} + +impl Drop for Timers +where + Key: Clone + Eq + Hash + Send + 'static, +{ + fn drop(&mut self) { + self.cancel_all(); + } +} + +#[cfg(test)] +impl Timers +where + Key: Clone + Eq + Hash + Send + 'static, +{ + /// Check if a timer with a given `key` is active, ie. it hasn't been canceled nor has it elapsed yet. + fn is_timer_active(&self, key: &Key) -> bool { + self.timers.contains_key(key) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::time::Duration; + use tokio::time::sleep; + + #[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] + struct TestKey(&'static str); + + async fn scheduler() -> Timers { + Timers::new() + } + + #[tokio::test] + async fn test_start_timer() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start(key, Duration::from_millis(100)); + assert!(scheduler.is_timer_active(&key)); + + sleep(Duration::from_millis(150)).await; + let elapsed_key = scheduler.intercept_timer_msg(TimeoutElapsed { key, generation: 1 }); + assert_eq!(elapsed_key, Some(key)); + + assert!(!scheduler.is_timer_active(&key)); + } + + #[tokio::test] + async fn test_cancel_timer() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start(key, Duration::from_millis(100)); + scheduler.cancel(&key); + + assert!(!scheduler.is_timer_active(&key)); + } + + #[tokio::test] + async fn test_cancel_all_timers() { + let mut scheduler = scheduler().await; + + scheduler.start(TestKey("timer1"), Duration::from_millis(100)); + scheduler.start(TestKey("timer2"), Duration::from_millis(200)); + + scheduler.cancel_all(); + + assert!(!scheduler.is_timer_active(&TestKey("timer1"))); + assert!(!scheduler.is_timer_active(&TestKey("timer2"))); + } + + #[tokio::test] + async fn test_intercept_timer_msg_valid() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start(key, Duration::from_millis(100)); + sleep(Duration::from_millis(150)).await; + + let timer_msg = TimeoutElapsed { key, generation: 1 }; + + let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); + + assert_eq!(intercepted_msg, Some(key)); + } + + #[tokio::test] + async fn test_intercept_timer_msg_invalid_generation() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start(key, Duration::from_millis(100)); + scheduler.start(key, Duration::from_millis(200)); + + let timer_msg = TimeoutElapsed { key, generation: 1 }; + + let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); + + assert_eq!(intercepted_msg, None); + } + + #[tokio::test] + async fn test_intercept_timer_msg_cancelled() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start(key, Duration::from_millis(100)); + scheduler.cancel(&key); + + let timer_msg = TimeoutElapsed { key, generation: 1 }; + + let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); + + assert_eq!(intercepted_msg, None); + } +} diff --git a/code/crates/core-consensus/src/effect.rs b/code/crates/core-consensus/src/effect.rs index 8884ed0cd..29a41044c 100644 --- a/code/crates/core-consensus/src/effect.rs +++ b/code/crates/core-consensus/src/effect.rs @@ -1,10 +1,8 @@ -use derive_where::derive_where; +#![allow(rustdoc::private_intra_doc_links)] -use malachitebft_core_types::*; +use derive_where::derive_where; -use crate::input::RequestId; -use crate::types::SignedConsensusMsg; -use crate::ConsensusMsg; +use crate::types::*; /// Provides a way to construct the appropriate [`Resume`] value to /// resume execution after handling an [`Effect`]. @@ -16,16 +14,17 @@ use crate::ConsensusMsg; /// /// ```rust,ignore /// fn effect_handler(effect: Effect) -> Result, Error> { -/// match effect { -/// Effect::ResetTimeouts(r) => { -/// reset_timeouts(); -/// Ok(r.resume_with(())) -/// } -/// Effect::GetValidatorSet(height, r) => {) -/// let validator_set = get_validator_set(height); -/// Ok(r.resume_with(validator_set)) -/// } -/// // ... +/// match effect { +/// Effect::ResetTimeouts(r) => { +/// reset_timeouts(); +/// Ok(r.resume_with(())) +/// } +/// Effect::GetValidatorSet(height, r) => { +/// let validator_set = get_validator_set(height); +/// Ok(r.resume_with(validator_set)) +/// } +/// // ... +/// } /// } /// ``` pub trait Resumable { @@ -84,7 +83,7 @@ where /// Because this operation may be asynchronous, this effect does not expect a resumption /// with a value, rather the application is expected to propose a value within the timeout duration. /// - /// The application MUST eventually feed a [`ProposeValue`][crate::input::Input::ProposeValue] + /// The application MUST eventually feed a [`Propose`][crate::input::Input::Propose] /// input to consensus within the specified timeout duration. /// /// Resume with: [`resume::Continue`] @@ -201,13 +200,13 @@ where ValidatorSet(Option), /// Resume execution with the validity of the signature - SignatureValidity(bool), + SignatureValidity(Validity), /// Resume execution with the signed vote - SignedVote(SignedMessage), + SignedVote(SignedVote), /// Resume execution with the signed proposal - SignedProposal(SignedMessage), + SignedProposal(SignedProposal), /// Resume execution with the result of the verification of the [`CommitCertificate`] CertificateValidity(Result<(), CertificateError>), @@ -242,7 +241,7 @@ pub mod resume { pub struct SignatureValidity; impl Resumable for SignatureValidity { - type Value = bool; + type Value = Validity; fn resume_with(self, value: Self::Value) -> Resume { Resume::SignatureValidity(value) diff --git a/code/crates/core-consensus/src/full_proposal.rs b/code/crates/core-consensus/src/full_proposal.rs index c385b437c..d05cef224 100644 --- a/code/crates/core-consensus/src/full_proposal.rs +++ b/code/crates/core-consensus/src/full_proposal.rs @@ -3,12 +3,11 @@ use std::collections::BTreeMap; use derive_where::derive_where; use tracing::debug; -use malachitebft_core_types::{ - Context, Height, Proposal, Round, SignedExtension, SignedProposal, Validity, Value, +use crate::types::{ + Context, Height, Proposal, ProposedValue, Round, SignedExtension, SignedProposal, Validity, + Value, }; -use crate::ProposedValue; - /// A full proposal, ie. a proposal together with its value and validity. #[derive_where(Clone, Debug)] pub struct FullProposal { diff --git a/code/crates/core-consensus/src/handle/proposal.rs b/code/crates/core-consensus/src/handle/proposal.rs index a22d9de46..c861cb71d 100644 --- a/code/crates/core-consensus/src/handle/proposal.rs +++ b/code/crates/core-consensus/src/handle/proposal.rs @@ -3,9 +3,8 @@ use crate::handle::signature::verify_signature; use crate::handle::validator_set::get_validator_set; use crate::input::Input; use crate::prelude::*; -use crate::types::ConsensusMsg; +use crate::types::{ConsensusMsg, ProposedValue, SignedConsensusMsg}; use crate::util::pretty::PrettyProposal; -use crate::{ProposedValue, SignedConsensusMsg}; pub async fn on_proposal( co: &Co, @@ -168,7 +167,7 @@ where }; let signed_msg = signed_proposal.clone().map(ConsensusMsg::Proposal); - if !verify_signature(co, signed_msg, proposer).await? { + if !verify_signature(co, signed_msg, proposer).await?.is_valid() { warn!( consensus.height = %consensus_height, proposal.height = %proposal_height, diff --git a/code/crates/core-consensus/src/handle/signature.rs b/code/crates/core-consensus/src/handle/signature.rs index a1fdf35f4..1006cf778 100644 --- a/code/crates/core-consensus/src/handle/signature.rs +++ b/code/crates/core-consensus/src/handle/signature.rs @@ -6,16 +6,16 @@ pub async fn verify_signature( co: &Co, signed_msg: SignedMessage>, validator: &Ctx::Validator, -) -> Result> +) -> Result> where Ctx: Context, { - let valid = perform!(co, + let validity = perform!(co, Effect::VerifySignature(signed_msg, validator.public_key().clone(), Default::default()), Resume::SignatureValidity(valid) => valid ); - Ok(valid) + Ok(validity) } pub async fn sign_vote(co: &Co, vote: Ctx::Vote) -> Result, Error> diff --git a/code/crates/core-consensus/src/handle/vote.rs b/code/crates/core-consensus/src/handle/vote.rs index 223286912..a89631f66 100644 --- a/code/crates/core-consensus/src/handle/vote.rs +++ b/code/crates/core-consensus/src/handle/vote.rs @@ -3,9 +3,8 @@ use crate::handle::signature::verify_signature; use crate::handle::validator_set::get_validator_set; use crate::input::Input; use crate::prelude::*; -use crate::types::ConsensusMsg; +use crate::types::{ConsensusMsg, SignedConsensusMsg}; use crate::util::pretty::PrettyVote; -use crate::SignedConsensusMsg; pub async fn on_vote( co: &Co, @@ -132,7 +131,10 @@ where }; let signed_msg = signed_vote.clone().map(ConsensusMsg::Vote); - if !verify_signature(co, signed_msg, validator).await? { + if !verify_signature(co, signed_msg, validator) + .await? + .is_valid() + { warn!( consensus.height = %consensus_height, vote.height = %vote_height, diff --git a/code/crates/core-consensus/src/handle/vote_set.rs b/code/crates/core-consensus/src/handle/vote_set.rs index ec235596a..dadd6ee30 100644 --- a/code/crates/core-consensus/src/handle/vote_set.rs +++ b/code/crates/core-consensus/src/handle/vote_set.rs @@ -1,6 +1,6 @@ use crate::handle::vote::on_vote; -use crate::input::RequestId; use crate::prelude::*; +use crate::types::RequestId; pub async fn on_vote_set_request( co: &Co, diff --git a/code/crates/core-consensus/src/input.rs b/code/crates/core-consensus/src/input.rs index 3e829d2fa..d563548c1 100644 --- a/code/crates/core-consensus/src/input.rs +++ b/code/crates/core-consensus/src/input.rs @@ -1,12 +1,10 @@ use derive_where::derive_where; + use malachitebft_core_types::{ CommitCertificate, Context, Round, SignedProposal, SignedVote, Timeout, ValueOrigin, VoteSet, }; -use crate::types::ProposedValue; -use crate::LocallyProposedValue; - -pub type RequestId = String; +use crate::types::{LocallyProposedValue, ProposedValue, RequestId}; /// Inputs to be handled by the consensus process. #[derive_where(Clone, Debug, PartialEq, Eq)] diff --git a/code/crates/core-consensus/src/lib.rs b/code/crates/core-consensus/src/lib.rs index 444e71eca..5b9588ed7 100644 --- a/code/crates/core-consensus/src/lib.rs +++ b/code/crates/core-consensus/src/lib.rs @@ -15,10 +15,9 @@ pub use params::{Params, ThresholdParams}; mod effect; pub use effect::{Effect, Resumable, Resume}; -mod types; -pub use types::*; +pub mod full_proposal; +pub mod types; -mod full_proposal; mod macros; mod util; @@ -31,10 +30,6 @@ mod handle; #[doc(hidden)] pub use handle::handle; -// Only used internally, but needs to be exposed for tests -#[doc(hidden)] -pub use full_proposal::{FullProposal, FullProposalKeeper}; - // Used in macros #[doc(hidden)] pub use tracing; diff --git a/code/crates/core-consensus/src/macros.rs b/code/crates/core-consensus/src/macros.rs index afe9e4d99..387f4c19b 100644 --- a/code/crates/core-consensus/src/macros.rs +++ b/code/crates/core-consensus/src/macros.rs @@ -14,7 +14,7 @@ /// // Metrics /// metrics: &metrics, /// // Effect handler -/// on: effect => handle_effect(effect).await +/// with: effect => handle_effect(effect).await /// ) /// ``` #[macro_export] @@ -23,7 +23,7 @@ macro_rules! process { let mut gen = $crate::gen::Gen::new(|co| $crate::handle(co, $state, $metrics, $input)); let mut co_result = gen.resume_with($crate::Resume::Start); - loop { + 'process: loop { match co_result { $crate::gen::CoResult::Yielded($effect) => { let resume = match $handle { @@ -36,7 +36,7 @@ macro_rules! process { co_result = gen.resume_with(resume) } $crate::gen::CoResult::Complete(result) => { - return result.map_err(Into::into); + break 'process result.map_err(Into::into); } } } diff --git a/code/crates/core-consensus/src/params.rs b/code/crates/core-consensus/src/params.rs index 5f8033730..1dce33d26 100644 --- a/code/crates/core-consensus/src/params.rs +++ b/code/crates/core-consensus/src/params.rs @@ -3,7 +3,7 @@ use malachitebft_core_types::Context; pub use malachitebft_core_driver::ThresholdParams; -use crate::ValuePayload; +use crate::types::ValuePayload; /// Consensus parameters. #[derive_where(Clone, Debug)] diff --git a/code/crates/core-consensus/src/prelude.rs b/code/crates/core-consensus/src/prelude.rs index 0e114e0f7..0a23adb62 100644 --- a/code/crates/core-consensus/src/prelude.rs +++ b/code/crates/core-consensus/src/prelude.rs @@ -2,7 +2,6 @@ pub use async_recursion::async_recursion; pub use tracing::{debug, info, warn}; pub use malachitebft_core_driver::Input as DriverInput; -pub use malachitebft_core_types::*; pub use crate::effect::{Effect, Resume}; pub use crate::error::Error; @@ -10,9 +9,4 @@ pub use crate::gen::Co; pub use crate::input::Input; pub use crate::perform; pub use crate::state::State; - -#[cfg(feature = "metrics")] -pub use malachitebft_metrics::Metrics; - -#[cfg(not(feature = "metrics"))] -pub type Metrics = (); +pub use crate::types::*; diff --git a/code/crates/core-consensus/src/state.rs b/code/crates/core-consensus/src/state.rs index 1814f5dac..b5443160b 100644 --- a/code/crates/core-consensus/src/state.rs +++ b/code/crates/core-consensus/src/state.rs @@ -4,9 +4,11 @@ use tracing::{debug, warn}; use malachitebft_core_driver::Driver; use malachitebft_core_types::*; +use crate::full_proposal::{FullProposal, FullProposalKeeper}; use crate::input::Input; +use crate::params::Params; +use crate::types::ProposedValue; use crate::util::max_queue::MaxQueue; -use crate::{FullProposal, FullProposalKeeper, Params, ProposedValue}; /// The state maintained by consensus for processing a [`Input`][crate::Input]. pub struct State diff --git a/code/crates/core-consensus/src/types.rs b/code/crates/core-consensus/src/types.rs index 32e6b43ae..ed8cb36c4 100644 --- a/code/crates/core-consensus/src/types.rs +++ b/code/crates/core-consensus/src/types.rs @@ -1,13 +1,17 @@ use derive_where::derive_where; -use malachitebft_core_types::{ - Context, Proposal, Round, Signature, SignedExtension, SignedProposal, SignedVote, Validity, - Vote, -}; - +pub use malachitebft_core_types::*; pub use malachitebft_peer::PeerId; pub use multiaddr::Multiaddr; +pub type RequestId = String; + +#[cfg(feature = "metrics")] +pub use malachitebft_metrics::Metrics; + +#[cfg(not(feature = "metrics"))] +pub type Metrics = (); + /// A signed consensus message, ie. a signed vote or a signed proposal. #[derive_where(Clone, Debug, PartialEq, Eq)] pub enum SignedConsensusMsg { diff --git a/code/crates/core-consensus/tests/full_proposal.rs b/code/crates/core-consensus/tests/full_proposal.rs index cb0bede92..08227e9f6 100644 --- a/code/crates/core-consensus/tests/full_proposal.rs +++ b/code/crates/core-consensus/tests/full_proposal.rs @@ -5,9 +5,11 @@ use malachitebft_test::utils::validators::make_validators; use malachitebft_test::{Address, Proposal, Value}; use malachitebft_test::{Height, TestContext}; -use informalsystems_malachitebft_core_consensus::{ - FullProposal, FullProposalKeeper, Input, ProposedValue, +use informalsystems_malachitebft_core_consensus::full_proposal::{ + FullProposal, FullProposalKeeper, }; +use informalsystems_malachitebft_core_consensus::types::ProposedValue; +use informalsystems_malachitebft_core_consensus::Input; fn signed_proposal_pol( ctx: &TestContext, diff --git a/code/crates/core-types/src/proposal.rs b/code/crates/core-types/src/proposal.rs index 0c9c001c3..660d082fb 100644 --- a/code/crates/core-types/src/proposal.rs +++ b/code/crates/core-types/src/proposal.rs @@ -57,3 +57,9 @@ impl Validity { } } } + +impl From for Validity { + fn from(valid: bool) -> Self { + Validity::from_bool(valid) + } +} diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 6b733196a..7cf5de197 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -9,10 +9,11 @@ use tracing::{debug, error, info, warn}; use malachitebft_codec as codec; use malachitebft_config::TimeoutConfig; -use malachitebft_core_consensus::{Effect, PeerId, Resumable, Resume, SignedConsensusMsg}; +use malachitebft_core_consensus::types::{PeerId, SignedConsensusMsg}; +use malachitebft_core_consensus::{Effect, Resumable, Resume}; use malachitebft_core_types::{ Context, Round, SignedExtension, SigningProvider, SigningProviderExt, Timeout, TimeoutKind, - ValidatorSet, ValueOrigin, + ValidatorSet, Validity, ValueOrigin, }; use malachitebft_metrics::Metrics; use malachitebft_sync::{ @@ -833,7 +834,7 @@ where } Effect::VerifySignature(msg, pk, r) => { - use malachitebft_core_consensus::ConsensusMsg as Msg; + use malachitebft_core_consensus::types::ConsensusMsg as Msg; let start = Instant::now(); @@ -854,7 +855,7 @@ where .signature_verification_time .observe(start.elapsed().as_secs_f64()); - Ok(r.resume_with(valid)) + Ok(r.resume_with(Validity::from_bool(valid))) } Effect::VerifyCertificate(certificate, validator_set, thresholds, r) => { diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index 91bb7dcf0..0e03b071c 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -4,14 +4,14 @@ use std::time::Duration; use derive_where::derive_where; use ractor::{ActorRef, RpcReplyPort}; -use malachitebft_core_consensus::PeerId; +use malachitebft_core_consensus::types::PeerId; use malachitebft_core_types::{CommitCertificate, Context, Round, ValueId}; use malachitebft_sync::RawDecidedValue; use crate::consensus::ConsensusRef; use crate::util::streaming::StreamMessage; -pub use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue}; +pub use malachitebft_core_consensus::types::{LocallyProposedValue, ProposedValue}; /// A reference to the host actor. pub type HostRef = ActorRef>; diff --git a/code/crates/engine/src/network.rs b/code/crates/engine/src/network.rs index 38c9a8192..881d74509 100644 --- a/code/crates/engine/src/network.rs +++ b/code/crates/engine/src/network.rs @@ -16,8 +16,7 @@ use malachitebft_sync::{ }; use malachitebft_codec as codec; -use malachitebft_core_consensus::SignedConsensusMsg; -use malachitebft_core_types::{Context, SignedProposal, SignedVote}; +use malachitebft_core_consensus::types::{Context, SignedConsensusMsg, SignedProposal, SignedVote}; use malachitebft_metrics::SharedRegistry; use malachitebft_network::handle::CtrlHandle; use malachitebft_network::{Channel, Config, Event, Multiaddr, PeerId}; diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index b9e78357f..7d65defb8 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -12,8 +12,9 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info, warn}; use malachitebft_codec as codec; -use malachitebft_core_consensus::PeerId; -use malachitebft_core_types::{CertificateError, CommitCertificate, Context, Height, Round}; +use malachitebft_core_consensus::types::{ + CertificateError, CommitCertificate, Context, Height, PeerId, Round, +}; use malachitebft_sync::{self as sync, InboundRequestId, OutboundRequestId, Response}; use malachitebft_sync::{RawDecidedValue, Request}; diff --git a/code/crates/engine/src/util/events.rs b/code/crates/engine/src/util/events.rs index cde03d24e..34d080cd3 100644 --- a/code/crates/engine/src/util/events.rs +++ b/code/crates/engine/src/util/events.rs @@ -3,8 +3,10 @@ use core::fmt; use derive_where::derive_where; use tokio::sync::broadcast; -use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue, SignedConsensusMsg}; -use malachitebft_core_types::{CommitCertificate, Context, Round, Timeout, ValueOrigin}; +use malachitebft_core_consensus::types::{ + CommitCertificate, Context, LocallyProposedValue, ProposedValue, Round, SignedConsensusMsg, + Timeout, ValueOrigin, +}; pub type RxEvent = broadcast::Receiver>; diff --git a/code/crates/engine/src/wal/entry.rs b/code/crates/engine/src/wal/entry.rs index fd7b2de07..ff36ec7a0 100644 --- a/code/crates/engine/src/wal/entry.rs +++ b/code/crates/engine/src/wal/entry.rs @@ -4,8 +4,7 @@ use byteorder::{ReadBytesExt, WriteBytesExt, BE}; use derive_where::derive_where; use malachitebft_codec::Codec; -use malachitebft_core_consensus::SignedConsensusMsg; -use malachitebft_core_types::{Context, Round, Timeout}; +use malachitebft_core_consensus::types::{Context, Round, SignedConsensusMsg, Timeout}; /// Codec for encoding and decoding WAL entries. /// diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index e47756fbe..2fda38c38 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -10,8 +10,7 @@ use rand::SeedableRng; use tokio::time::Instant; use tracing::{debug, error, info, trace, warn}; -use malachitebft_core_consensus::PeerId; -use malachitebft_core_types::{CommitCertificate, Round, Validity, ValueOrigin}; +use malachitebft_core_consensus::types::{CommitCertificate, PeerId, Round, Validity, ValueOrigin}; use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef}; use malachitebft_engine::host::{LocallyProposedValue, ProposedValue}; use malachitebft_engine::network::{NetworkMsg, NetworkRef}; diff --git a/code/crates/starknet/host/src/block_store.rs b/code/crates/starknet/host/src/block_store.rs index 4f04aecce..1393cdc7e 100644 --- a/code/crates/starknet/host/src/block_store.rs +++ b/code/crates/starknet/host/src/block_store.rs @@ -9,8 +9,7 @@ use thiserror::Error; use tracing::error; use malachitebft_codec::Codec; -use malachitebft_core_consensus::ProposedValue; -use malachitebft_core_types::{CommitCertificate, Round}; +use malachitebft_core_consensus::types::{CommitCertificate, ProposedValue, Round}; use malachitebft_proto::Protobuf; use crate::codec::{self, ProtobufCodec}; diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index 060256dda..325429f58 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -2,18 +2,16 @@ use bytes::Bytes; use prost::Message; use malachitebft_codec::Codec; -use malachitebft_core_types::{ - AggregatedSignature, CommitCertificate, CommitSignature, Extension, Round, SignedExtension, - SignedProposal, SignedVote, Validity, +use malachitebft_core_consensus::types::{ + AggregatedSignature, CommitCertificate, CommitSignature, Extension, PeerId, ProposedValue, + Round, SignedConsensusMsg, SignedExtension, SignedProposal, SignedVote, Validity, }; use malachitebft_engine::util::streaming::{StreamContent, StreamMessage}; +use malachitebft_starknet_p2p_proto::ConsensusMessage; use malachitebft_sync::{ self as sync, ValueRequest, ValueResponse, VoteSetRequest, VoteSetResponse, }; -use malachitebft_core_consensus::{PeerId, ProposedValue, SignedConsensusMsg}; -use malachitebft_starknet_p2p_proto::ConsensusMessage; - use crate::proto::consensus_message::Messages; use crate::proto::{self as proto, Error as ProtoError, Protobuf}; use crate::types::{self as p2p, Address, BlockHash, Height, MockContext, ProposalPart, Vote}; diff --git a/code/crates/starknet/host/src/host/starknet.rs b/code/crates/starknet/host/src/host/starknet.rs index 5e27bd993..638a71236 100644 --- a/code/crates/starknet/host/src/host/starknet.rs +++ b/code/crates/starknet/host/src/host/starknet.rs @@ -8,8 +8,9 @@ use tokio::time::Instant; use tracing::{debug, Instrument}; use malachitebft_config::VoteExtensionsConfig; -use malachitebft_core_consensus::ValuePayload; -use malachitebft_core_types::{CommitCertificate, Extension, Round, SignedExtension, SignedVote}; +use malachitebft_core_consensus::types::{ + CommitCertificate, Extension, Round, SignedExtension, SignedVote, ValuePayload, +}; use crate::host::Host; use crate::mempool::MempoolRef; diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index e612bf8d3..7e059e2f2 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -2,19 +2,19 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use libp2p_identity::ecdsa; -use malachitebft_engine::util::events::TxEvent; -use malachitebft_engine::wal::{Wal, WalRef}; use tokio::task::JoinHandle; use malachitebft_config::{ self as config, Config as NodeConfig, MempoolConfig, SyncConfig, TestConfig, TransportProtocol, }; -use malachitebft_core_consensus::ValuePayload; +use malachitebft_core_consensus::types::ValuePayload; use malachitebft_engine::consensus::{Consensus, ConsensusParams, ConsensusRef}; use malachitebft_engine::host::HostRef; use malachitebft_engine::network::{Network, NetworkRef}; use malachitebft_engine::node::{Node, NodeRef}; use malachitebft_engine::sync::{Params as SyncParams, Sync, SyncRef}; +use malachitebft_engine::util::events::TxEvent; +use malachitebft_engine::wal::{Wal, WalRef}; use malachitebft_metrics::Metrics; use malachitebft_metrics::SharedRegistry; use malachitebft_network::Keypair; diff --git a/code/crates/starknet/host/src/streaming.rs b/code/crates/starknet/host/src/streaming.rs index dbe05f0da..fac305127 100644 --- a/code/crates/starknet/host/src/streaming.rs +++ b/code/crates/starknet/host/src/streaming.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BinaryHeap, HashSet}; use derive_where::derive_where; -use malachitebft_core_consensus::PeerId; +use malachitebft_core_consensus::types::PeerId; use malachitebft_core_types::Round; use malachitebft_engine::util::streaming::{Sequence, StreamId, StreamMessage}; diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 12a2969ac..ecfc29cfb 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -17,7 +17,7 @@ use malachitebft_config::{ Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, PubSubProtocol, SyncConfig, TestConfig, TransportProtocol, }; -use malachitebft_core_consensus::{LocallyProposedValue, SignedConsensusMsg}; +use malachitebft_core_consensus::types::{LocallyProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{SignedVote, VotingPower}; use malachitebft_engine::util::events::{Event, RxEvent, TxEvent}; use malachitebft_starknet_host::spawn::spawn_node_actor; diff --git a/code/crates/starknet/test/src/tests/wal.rs b/code/crates/starknet/test/src/tests/wal.rs index 88b14a0db..cd3fb92a8 100644 --- a/code/crates/starknet/test/src/tests/wal.rs +++ b/code/crates/starknet/test/src/tests/wal.rs @@ -4,7 +4,7 @@ use eyre::bail; use tracing::info; use malachitebft_config::ValuePayload; -use malachitebft_core_consensus::LocallyProposedValue; +use malachitebft_core_consensus::types::LocallyProposedValue; use malachitebft_core_types::SignedVote; use malachitebft_engine::util::events::Event; use malachitebft_starknet_host::types::MockContext; diff --git a/code/crates/test/src/codec/proto/mod.rs b/code/crates/test/src/codec/proto/mod.rs index 39cf620a9..6cb82a667 100644 --- a/code/crates/test/src/codec/proto/mod.rs +++ b/code/crates/test/src/codec/proto/mod.rs @@ -3,10 +3,9 @@ use prost::Message; use malachitebft_app::streaming::{StreamContent, StreamMessage}; use malachitebft_codec::Codec; -use malachitebft_core_consensus::{ProposedValue, SignedConsensusMsg}; -use malachitebft_core_types::{ - AggregatedSignature, CommitCertificate, CommitSignature, Extension, Round, SignedExtension, - SignedProposal, SignedVote, Validity, VoteSet, +use malachitebft_core_consensus::types::{ + AggregatedSignature, CommitCertificate, CommitSignature, Extension, ProposedValue, Round, + SignedConsensusMsg, SignedExtension, SignedProposal, SignedVote, Validity, VoteSet, }; use malachitebft_proto::{Error as ProtoError, Protobuf}; use malachitebft_signing_ed25519::Signature; diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 7a95d3182..9f95d4dde 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -10,11 +10,10 @@ use rand::{Rng, SeedableRng}; use sha3::Digest; use tracing::{debug, error}; -use malachitebft_app_channel::app::consensus::ProposedValue; use malachitebft_app_channel::app::streaming::{StreamContent, StreamMessage}; use malachitebft_app_channel::app::types::codec::Codec; use malachitebft_app_channel::app::types::core::{CommitCertificate, Round, Validity}; -use malachitebft_app_channel::app::types::{LocallyProposedValue, PeerId}; +use malachitebft_app_channel::app::types::{LocallyProposedValue, PeerId, ProposedValue}; use malachitebft_test::codec::proto::ProtobufCodec; use malachitebft_test::{ Address, Genesis, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, diff --git a/code/examples/channel/src/streaming.rs b/code/examples/channel/src/streaming.rs index 6792b0a5a..85ed7abb2 100644 --- a/code/examples/channel/src/streaming.rs +++ b/code/examples/channel/src/streaming.rs @@ -1,9 +1,9 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, BinaryHeap, HashSet}; -use malachitebft_app_channel::app::consensus::PeerId; use malachitebft_app_channel::app::streaming::{Sequence, StreamId, StreamMessage}; use malachitebft_app_channel::app::types::core::Round; +use malachitebft_app_channel::app::types::PeerId; use malachitebft_test::{Address, Height, ProposalInit, ProposalPart}; struct MinSeq(StreamMessage);