From 6f171c08bce8edfc5c67faa41917fa853858777d Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 23 Jan 2025 16:56:29 +0100 Subject: [PATCH 1/6] Re-organize exports of `core-consensus` crate --- code/crates/app/src/types.rs | 2 +- code/crates/core-consensus/src/effect.rs | 10 +++------- code/crates/core-consensus/src/full_proposal.rs | 7 +++---- code/crates/core-consensus/src/handle/proposal.rs | 3 +-- code/crates/core-consensus/src/handle/vote.rs | 3 +-- code/crates/core-consensus/src/handle/vote_set.rs | 2 +- code/crates/core-consensus/src/input.rs | 6 ++---- code/crates/core-consensus/src/lib.rs | 9 ++------- code/crates/core-consensus/src/params.rs | 2 +- code/crates/core-consensus/src/state.rs | 4 +++- code/crates/core-consensus/src/types.rs | 8 +++----- code/crates/core-consensus/tests/full_proposal.rs | 6 ++++-- code/crates/engine/src/consensus.rs | 5 +++-- code/crates/engine/src/host.rs | 4 ++-- code/crates/engine/src/network.rs | 3 +-- code/crates/engine/src/sync.rs | 5 +++-- code/crates/engine/src/util/events.rs | 6 ++++-- code/crates/engine/src/wal/entry.rs | 3 +-- code/crates/starknet/host/src/actor.rs | 3 +-- code/crates/starknet/host/src/block_store.rs | 3 +-- code/crates/starknet/host/src/codec.rs | 10 ++++------ code/crates/starknet/host/src/host/starknet.rs | 5 +++-- code/crates/starknet/host/src/spawn.rs | 6 +++--- code/crates/starknet/host/src/streaming.rs | 2 +- code/crates/starknet/test/src/lib.rs | 2 +- code/crates/starknet/test/src/tests/wal.rs | 2 +- code/crates/test/src/codec/proto/mod.rs | 7 +++---- code/examples/channel/src/state.rs | 3 +-- code/examples/channel/src/streaming.rs | 2 +- 29 files changed, 59 insertions(+), 74 deletions(-) diff --git a/code/crates/app/src/types.rs b/code/crates/app/src/types.rs index 8867044fb..97a7e78d6 100644 --- a/code/crates/app/src/types.rs +++ b/code/crates/app/src/types.rs @@ -1,6 +1,6 @@ //! Re-export of all types required to build a Malachite application. -pub use malachitebft_core_consensus::{ +pub use malachitebft_core_consensus::types::{ ConsensusMsg, ProposedValue, SignedConsensusMsg, ValuePayload, }; pub use malachitebft_engine::host::LocallyProposedValue; diff --git a/code/crates/core-consensus/src/effect.rs b/code/crates/core-consensus/src/effect.rs index 8884ed0cd..2e6edbde2 100644 --- a/code/crates/core-consensus/src/effect.rs +++ b/code/crates/core-consensus/src/effect.rs @@ -1,10 +1,6 @@ use derive_where::derive_where; -use malachitebft_core_types::*; - -use crate::input::RequestId; -use crate::types::SignedConsensusMsg; -use crate::ConsensusMsg; +use crate::types::*; /// Provides a way to construct the appropriate [`Resume`] value to /// resume execution after handling an [`Effect`]. @@ -204,10 +200,10 @@ where SignatureValidity(bool), /// Resume execution with the signed vote - SignedVote(SignedMessage), + SignedVote(SignedVote), /// Resume execution with the signed proposal - SignedProposal(SignedMessage), + SignedProposal(SignedProposal), /// Resume execution with the result of the verification of the [`CommitCertificate`] CertificateValidity(Result<(), CertificateError>), diff --git a/code/crates/core-consensus/src/full_proposal.rs b/code/crates/core-consensus/src/full_proposal.rs index c385b437c..d05cef224 100644 --- a/code/crates/core-consensus/src/full_proposal.rs +++ b/code/crates/core-consensus/src/full_proposal.rs @@ -3,12 +3,11 @@ use std::collections::BTreeMap; use derive_where::derive_where; use tracing::debug; -use malachitebft_core_types::{ - Context, Height, Proposal, Round, SignedExtension, SignedProposal, Validity, Value, +use crate::types::{ + Context, Height, Proposal, ProposedValue, Round, SignedExtension, SignedProposal, Validity, + Value, }; -use crate::ProposedValue; - /// A full proposal, ie. a proposal together with its value and validity. #[derive_where(Clone, Debug)] pub struct FullProposal { diff --git a/code/crates/core-consensus/src/handle/proposal.rs b/code/crates/core-consensus/src/handle/proposal.rs index a22d9de46..4ae9f4a55 100644 --- a/code/crates/core-consensus/src/handle/proposal.rs +++ b/code/crates/core-consensus/src/handle/proposal.rs @@ -3,9 +3,8 @@ use crate::handle::signature::verify_signature; use crate::handle::validator_set::get_validator_set; use crate::input::Input; use crate::prelude::*; -use crate::types::ConsensusMsg; +use crate::types::{ConsensusMsg, ProposedValue, SignedConsensusMsg}; use crate::util::pretty::PrettyProposal; -use crate::{ProposedValue, SignedConsensusMsg}; pub async fn on_proposal( co: &Co, diff --git a/code/crates/core-consensus/src/handle/vote.rs b/code/crates/core-consensus/src/handle/vote.rs index 223286912..7a0fee744 100644 --- a/code/crates/core-consensus/src/handle/vote.rs +++ b/code/crates/core-consensus/src/handle/vote.rs @@ -3,9 +3,8 @@ use crate::handle::signature::verify_signature; use crate::handle::validator_set::get_validator_set; use crate::input::Input; use crate::prelude::*; -use crate::types::ConsensusMsg; +use crate::types::{ConsensusMsg, SignedConsensusMsg}; use crate::util::pretty::PrettyVote; -use crate::SignedConsensusMsg; pub async fn on_vote( co: &Co, diff --git a/code/crates/core-consensus/src/handle/vote_set.rs b/code/crates/core-consensus/src/handle/vote_set.rs index ec235596a..dadd6ee30 100644 --- a/code/crates/core-consensus/src/handle/vote_set.rs +++ b/code/crates/core-consensus/src/handle/vote_set.rs @@ -1,6 +1,6 @@ use crate::handle::vote::on_vote; -use crate::input::RequestId; use crate::prelude::*; +use crate::types::RequestId; pub async fn on_vote_set_request( co: &Co, diff --git a/code/crates/core-consensus/src/input.rs b/code/crates/core-consensus/src/input.rs index 3e829d2fa..d563548c1 100644 --- a/code/crates/core-consensus/src/input.rs +++ b/code/crates/core-consensus/src/input.rs @@ -1,12 +1,10 @@ use derive_where::derive_where; + use malachitebft_core_types::{ CommitCertificate, Context, Round, SignedProposal, SignedVote, Timeout, ValueOrigin, VoteSet, }; -use crate::types::ProposedValue; -use crate::LocallyProposedValue; - -pub type RequestId = String; +use crate::types::{LocallyProposedValue, ProposedValue, RequestId}; /// Inputs to be handled by the consensus process. #[derive_where(Clone, Debug, PartialEq, Eq)] diff --git a/code/crates/core-consensus/src/lib.rs b/code/crates/core-consensus/src/lib.rs index 444e71eca..5b9588ed7 100644 --- a/code/crates/core-consensus/src/lib.rs +++ b/code/crates/core-consensus/src/lib.rs @@ -15,10 +15,9 @@ pub use params::{Params, ThresholdParams}; mod effect; pub use effect::{Effect, Resumable, Resume}; -mod types; -pub use types::*; +pub mod full_proposal; +pub mod types; -mod full_proposal; mod macros; mod util; @@ -31,10 +30,6 @@ mod handle; #[doc(hidden)] pub use handle::handle; -// Only used internally, but needs to be exposed for tests -#[doc(hidden)] -pub use full_proposal::{FullProposal, FullProposalKeeper}; - // Used in macros #[doc(hidden)] pub use tracing; diff --git a/code/crates/core-consensus/src/params.rs b/code/crates/core-consensus/src/params.rs index 5f8033730..1dce33d26 100644 --- a/code/crates/core-consensus/src/params.rs +++ b/code/crates/core-consensus/src/params.rs @@ -3,7 +3,7 @@ use malachitebft_core_types::Context; pub use malachitebft_core_driver::ThresholdParams; -use crate::ValuePayload; +use crate::types::ValuePayload; /// Consensus parameters. #[derive_where(Clone, Debug)] diff --git a/code/crates/core-consensus/src/state.rs b/code/crates/core-consensus/src/state.rs index 1814f5dac..b5443160b 100644 --- a/code/crates/core-consensus/src/state.rs +++ b/code/crates/core-consensus/src/state.rs @@ -4,9 +4,11 @@ use tracing::{debug, warn}; use malachitebft_core_driver::Driver; use malachitebft_core_types::*; +use crate::full_proposal::{FullProposal, FullProposalKeeper}; use crate::input::Input; +use crate::params::Params; +use crate::types::ProposedValue; use crate::util::max_queue::MaxQueue; -use crate::{FullProposal, FullProposalKeeper, Params, ProposedValue}; /// The state maintained by consensus for processing a [`Input`][crate::Input]. pub struct State diff --git a/code/crates/core-consensus/src/types.rs b/code/crates/core-consensus/src/types.rs index 32e6b43ae..50840b2a4 100644 --- a/code/crates/core-consensus/src/types.rs +++ b/code/crates/core-consensus/src/types.rs @@ -1,13 +1,11 @@ use derive_where::derive_where; -use malachitebft_core_types::{ - Context, Proposal, Round, Signature, SignedExtension, SignedProposal, SignedVote, Validity, - Vote, -}; - +pub use malachitebft_core_types::*; pub use malachitebft_peer::PeerId; pub use multiaddr::Multiaddr; +pub type RequestId = String; + /// A signed consensus message, ie. a signed vote or a signed proposal. #[derive_where(Clone, Debug, PartialEq, Eq)] pub enum SignedConsensusMsg { diff --git a/code/crates/core-consensus/tests/full_proposal.rs b/code/crates/core-consensus/tests/full_proposal.rs index cb0bede92..08227e9f6 100644 --- a/code/crates/core-consensus/tests/full_proposal.rs +++ b/code/crates/core-consensus/tests/full_proposal.rs @@ -5,9 +5,11 @@ use malachitebft_test::utils::validators::make_validators; use malachitebft_test::{Address, Proposal, Value}; use malachitebft_test::{Height, TestContext}; -use informalsystems_malachitebft_core_consensus::{ - FullProposal, FullProposalKeeper, Input, ProposedValue, +use informalsystems_malachitebft_core_consensus::full_proposal::{ + FullProposal, FullProposalKeeper, }; +use informalsystems_malachitebft_core_consensus::types::ProposedValue; +use informalsystems_malachitebft_core_consensus::Input; fn signed_proposal_pol( ctx: &TestContext, diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 6b733196a..e2fdf9b5b 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -9,7 +9,8 @@ use tracing::{debug, error, info, warn}; use malachitebft_codec as codec; use malachitebft_config::TimeoutConfig; -use malachitebft_core_consensus::{Effect, PeerId, Resumable, Resume, SignedConsensusMsg}; +use malachitebft_core_consensus::types::{PeerId, SignedConsensusMsg}; +use malachitebft_core_consensus::{Effect, Resumable, Resume}; use malachitebft_core_types::{ Context, Round, SignedExtension, SigningProvider, SigningProviderExt, Timeout, TimeoutKind, ValidatorSet, ValueOrigin, @@ -833,7 +834,7 @@ where } Effect::VerifySignature(msg, pk, r) => { - use malachitebft_core_consensus::ConsensusMsg as Msg; + use malachitebft_core_consensus::types::ConsensusMsg as Msg; let start = Instant::now(); diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index 91bb7dcf0..0e03b071c 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -4,14 +4,14 @@ use std::time::Duration; use derive_where::derive_where; use ractor::{ActorRef, RpcReplyPort}; -use malachitebft_core_consensus::PeerId; +use malachitebft_core_consensus::types::PeerId; use malachitebft_core_types::{CommitCertificate, Context, Round, ValueId}; use malachitebft_sync::RawDecidedValue; use crate::consensus::ConsensusRef; use crate::util::streaming::StreamMessage; -pub use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue}; +pub use malachitebft_core_consensus::types::{LocallyProposedValue, ProposedValue}; /// A reference to the host actor. pub type HostRef = ActorRef>; diff --git a/code/crates/engine/src/network.rs b/code/crates/engine/src/network.rs index 38c9a8192..881d74509 100644 --- a/code/crates/engine/src/network.rs +++ b/code/crates/engine/src/network.rs @@ -16,8 +16,7 @@ use malachitebft_sync::{ }; use malachitebft_codec as codec; -use malachitebft_core_consensus::SignedConsensusMsg; -use malachitebft_core_types::{Context, SignedProposal, SignedVote}; +use malachitebft_core_consensus::types::{Context, SignedConsensusMsg, SignedProposal, SignedVote}; use malachitebft_metrics::SharedRegistry; use malachitebft_network::handle::CtrlHandle; use malachitebft_network::{Channel, Config, Event, Multiaddr, PeerId}; diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index b9e78357f..7d65defb8 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -12,8 +12,9 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info, warn}; use malachitebft_codec as codec; -use malachitebft_core_consensus::PeerId; -use malachitebft_core_types::{CertificateError, CommitCertificate, Context, Height, Round}; +use malachitebft_core_consensus::types::{ + CertificateError, CommitCertificate, Context, Height, PeerId, Round, +}; use malachitebft_sync::{self as sync, InboundRequestId, OutboundRequestId, Response}; use malachitebft_sync::{RawDecidedValue, Request}; diff --git a/code/crates/engine/src/util/events.rs b/code/crates/engine/src/util/events.rs index cde03d24e..34d080cd3 100644 --- a/code/crates/engine/src/util/events.rs +++ b/code/crates/engine/src/util/events.rs @@ -3,8 +3,10 @@ use core::fmt; use derive_where::derive_where; use tokio::sync::broadcast; -use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue, SignedConsensusMsg}; -use malachitebft_core_types::{CommitCertificate, Context, Round, Timeout, ValueOrigin}; +use malachitebft_core_consensus::types::{ + CommitCertificate, Context, LocallyProposedValue, ProposedValue, Round, SignedConsensusMsg, + Timeout, ValueOrigin, +}; pub type RxEvent = broadcast::Receiver>; diff --git a/code/crates/engine/src/wal/entry.rs b/code/crates/engine/src/wal/entry.rs index fd7b2de07..ff36ec7a0 100644 --- a/code/crates/engine/src/wal/entry.rs +++ b/code/crates/engine/src/wal/entry.rs @@ -4,8 +4,7 @@ use byteorder::{ReadBytesExt, WriteBytesExt, BE}; use derive_where::derive_where; use malachitebft_codec::Codec; -use malachitebft_core_consensus::SignedConsensusMsg; -use malachitebft_core_types::{Context, Round, Timeout}; +use malachitebft_core_consensus::types::{Context, Round, SignedConsensusMsg, Timeout}; /// Codec for encoding and decoding WAL entries. /// diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index e47756fbe..2fda38c38 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -10,8 +10,7 @@ use rand::SeedableRng; use tokio::time::Instant; use tracing::{debug, error, info, trace, warn}; -use malachitebft_core_consensus::PeerId; -use malachitebft_core_types::{CommitCertificate, Round, Validity, ValueOrigin}; +use malachitebft_core_consensus::types::{CommitCertificate, PeerId, Round, Validity, ValueOrigin}; use malachitebft_engine::consensus::{ConsensusMsg, ConsensusRef}; use malachitebft_engine::host::{LocallyProposedValue, ProposedValue}; use malachitebft_engine::network::{NetworkMsg, NetworkRef}; diff --git a/code/crates/starknet/host/src/block_store.rs b/code/crates/starknet/host/src/block_store.rs index 4f04aecce..1393cdc7e 100644 --- a/code/crates/starknet/host/src/block_store.rs +++ b/code/crates/starknet/host/src/block_store.rs @@ -9,8 +9,7 @@ use thiserror::Error; use tracing::error; use malachitebft_codec::Codec; -use malachitebft_core_consensus::ProposedValue; -use malachitebft_core_types::{CommitCertificate, Round}; +use malachitebft_core_consensus::types::{CommitCertificate, ProposedValue, Round}; use malachitebft_proto::Protobuf; use crate::codec::{self, ProtobufCodec}; diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index 060256dda..325429f58 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -2,18 +2,16 @@ use bytes::Bytes; use prost::Message; use malachitebft_codec::Codec; -use malachitebft_core_types::{ - AggregatedSignature, CommitCertificate, CommitSignature, Extension, Round, SignedExtension, - SignedProposal, SignedVote, Validity, +use malachitebft_core_consensus::types::{ + AggregatedSignature, CommitCertificate, CommitSignature, Extension, PeerId, ProposedValue, + Round, SignedConsensusMsg, SignedExtension, SignedProposal, SignedVote, Validity, }; use malachitebft_engine::util::streaming::{StreamContent, StreamMessage}; +use malachitebft_starknet_p2p_proto::ConsensusMessage; use malachitebft_sync::{ self as sync, ValueRequest, ValueResponse, VoteSetRequest, VoteSetResponse, }; -use malachitebft_core_consensus::{PeerId, ProposedValue, SignedConsensusMsg}; -use malachitebft_starknet_p2p_proto::ConsensusMessage; - use crate::proto::consensus_message::Messages; use crate::proto::{self as proto, Error as ProtoError, Protobuf}; use crate::types::{self as p2p, Address, BlockHash, Height, MockContext, ProposalPart, Vote}; diff --git a/code/crates/starknet/host/src/host/starknet.rs b/code/crates/starknet/host/src/host/starknet.rs index 5e27bd993..638a71236 100644 --- a/code/crates/starknet/host/src/host/starknet.rs +++ b/code/crates/starknet/host/src/host/starknet.rs @@ -8,8 +8,9 @@ use tokio::time::Instant; use tracing::{debug, Instrument}; use malachitebft_config::VoteExtensionsConfig; -use malachitebft_core_consensus::ValuePayload; -use malachitebft_core_types::{CommitCertificate, Extension, Round, SignedExtension, SignedVote}; +use malachitebft_core_consensus::types::{ + CommitCertificate, Extension, Round, SignedExtension, SignedVote, ValuePayload, +}; use crate::host::Host; use crate::mempool::MempoolRef; diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index e612bf8d3..7e059e2f2 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -2,19 +2,19 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use libp2p_identity::ecdsa; -use malachitebft_engine::util::events::TxEvent; -use malachitebft_engine::wal::{Wal, WalRef}; use tokio::task::JoinHandle; use malachitebft_config::{ self as config, Config as NodeConfig, MempoolConfig, SyncConfig, TestConfig, TransportProtocol, }; -use malachitebft_core_consensus::ValuePayload; +use malachitebft_core_consensus::types::ValuePayload; use malachitebft_engine::consensus::{Consensus, ConsensusParams, ConsensusRef}; use malachitebft_engine::host::HostRef; use malachitebft_engine::network::{Network, NetworkRef}; use malachitebft_engine::node::{Node, NodeRef}; use malachitebft_engine::sync::{Params as SyncParams, Sync, SyncRef}; +use malachitebft_engine::util::events::TxEvent; +use malachitebft_engine::wal::{Wal, WalRef}; use malachitebft_metrics::Metrics; use malachitebft_metrics::SharedRegistry; use malachitebft_network::Keypair; diff --git a/code/crates/starknet/host/src/streaming.rs b/code/crates/starknet/host/src/streaming.rs index dbe05f0da..fac305127 100644 --- a/code/crates/starknet/host/src/streaming.rs +++ b/code/crates/starknet/host/src/streaming.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BinaryHeap, HashSet}; use derive_where::derive_where; -use malachitebft_core_consensus::PeerId; +use malachitebft_core_consensus::types::PeerId; use malachitebft_core_types::Round; use malachitebft_engine::util::streaming::{Sequence, StreamId, StreamMessage}; diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 12a2969ac..ecfc29cfb 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -17,7 +17,7 @@ use malachitebft_config::{ Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, PubSubProtocol, SyncConfig, TestConfig, TransportProtocol, }; -use malachitebft_core_consensus::{LocallyProposedValue, SignedConsensusMsg}; +use malachitebft_core_consensus::types::{LocallyProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{SignedVote, VotingPower}; use malachitebft_engine::util::events::{Event, RxEvent, TxEvent}; use malachitebft_starknet_host::spawn::spawn_node_actor; diff --git a/code/crates/starknet/test/src/tests/wal.rs b/code/crates/starknet/test/src/tests/wal.rs index 88b14a0db..cd3fb92a8 100644 --- a/code/crates/starknet/test/src/tests/wal.rs +++ b/code/crates/starknet/test/src/tests/wal.rs @@ -4,7 +4,7 @@ use eyre::bail; use tracing::info; use malachitebft_config::ValuePayload; -use malachitebft_core_consensus::LocallyProposedValue; +use malachitebft_core_consensus::types::LocallyProposedValue; use malachitebft_core_types::SignedVote; use malachitebft_engine::util::events::Event; use malachitebft_starknet_host::types::MockContext; diff --git a/code/crates/test/src/codec/proto/mod.rs b/code/crates/test/src/codec/proto/mod.rs index 39cf620a9..6cb82a667 100644 --- a/code/crates/test/src/codec/proto/mod.rs +++ b/code/crates/test/src/codec/proto/mod.rs @@ -3,10 +3,9 @@ use prost::Message; use malachitebft_app::streaming::{StreamContent, StreamMessage}; use malachitebft_codec::Codec; -use malachitebft_core_consensus::{ProposedValue, SignedConsensusMsg}; -use malachitebft_core_types::{ - AggregatedSignature, CommitCertificate, CommitSignature, Extension, Round, SignedExtension, - SignedProposal, SignedVote, Validity, VoteSet, +use malachitebft_core_consensus::types::{ + AggregatedSignature, CommitCertificate, CommitSignature, Extension, ProposedValue, Round, + SignedConsensusMsg, SignedExtension, SignedProposal, SignedVote, Validity, VoteSet, }; use malachitebft_proto::{Error as ProtoError, Protobuf}; use malachitebft_signing_ed25519::Signature; diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 7a95d3182..9f95d4dde 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -10,11 +10,10 @@ use rand::{Rng, SeedableRng}; use sha3::Digest; use tracing::{debug, error}; -use malachitebft_app_channel::app::consensus::ProposedValue; use malachitebft_app_channel::app::streaming::{StreamContent, StreamMessage}; use malachitebft_app_channel::app::types::codec::Codec; use malachitebft_app_channel::app::types::core::{CommitCertificate, Round, Validity}; -use malachitebft_app_channel::app::types::{LocallyProposedValue, PeerId}; +use malachitebft_app_channel::app::types::{LocallyProposedValue, PeerId, ProposedValue}; use malachitebft_test::codec::proto::ProtobufCodec; use malachitebft_test::{ Address, Genesis, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, diff --git a/code/examples/channel/src/streaming.rs b/code/examples/channel/src/streaming.rs index 6792b0a5a..85ed7abb2 100644 --- a/code/examples/channel/src/streaming.rs +++ b/code/examples/channel/src/streaming.rs @@ -1,9 +1,9 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, BinaryHeap, HashSet}; -use malachitebft_app_channel::app::consensus::PeerId; use malachitebft_app_channel::app::streaming::{Sequence, StreamId, StreamMessage}; use malachitebft_app_channel::app::types::core::Round; +use malachitebft_app_channel::app::types::PeerId; use malachitebft_test::{Address, Height, ProposalInit, ProposalPart}; struct MinSeq(StreamMessage); From b377ec404c809c26b488b0fabd796d03565a790d Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 23 Jan 2025 17:26:57 +0100 Subject: [PATCH 2/6] wip --- code/Cargo.lock | 13 + code/Cargo.toml | 2 + code/crates/consensus-async/Cargo.toml | 29 ++ .../consensus-async/src/handlers/consensus.rs | 48 +++ .../consensus-async/src/handlers/impls/mod.rs | 2 + .../src/handlers/impls/tokio/mod.rs | 2 + .../src/handlers/impls/tokio/timeout.rs | 355 ++++++++++++++++++ .../consensus-async/src/handlers/mod.rs | 6 + .../consensus-async/src/handlers/signing.rs | 29 ++ .../consensus-async/src/handlers/sync.rs | 22 ++ .../consensus-async/src/handlers/timeout.rs | 19 + .../consensus-async/src/handlers/wal.rs | 16 + code/crates/consensus-async/src/lib.rs | 1 + 13 files changed, 544 insertions(+) create mode 100644 code/crates/consensus-async/Cargo.toml create mode 100644 code/crates/consensus-async/src/handlers/consensus.rs create mode 100644 code/crates/consensus-async/src/handlers/impls/mod.rs create mode 100644 code/crates/consensus-async/src/handlers/impls/tokio/mod.rs create mode 100644 code/crates/consensus-async/src/handlers/impls/tokio/timeout.rs create mode 100644 code/crates/consensus-async/src/handlers/mod.rs create mode 100644 code/crates/consensus-async/src/handlers/signing.rs create mode 100644 code/crates/consensus-async/src/handlers/sync.rs create mode 100644 code/crates/consensus-async/src/handlers/timeout.rs create mode 100644 code/crates/consensus-async/src/handlers/wal.rs create mode 100644 code/crates/consensus-async/src/lib.rs diff --git a/code/Cargo.lock b/code/Cargo.lock index 34ded7422..3d73e076d 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2147,6 +2147,19 @@ dependencies = [ "toml", ] +[[package]] +name = "informalsystems-malachitebft-consensus-async" +version = "0.0.1" +dependencies = [ + "async-trait", + "informalsystems-malachitebft-core-consensus", + "informalsystems-malachitebft-core-types", + "informalsystems-malachitebft-sync", + "thiserror 2.0.11", + "tokio", + "tracing", +] + [[package]] name = "informalsystems-malachitebft-core-consensus" version = "0.0.1" diff --git a/code/Cargo.toml b/code/Cargo.toml index 7ba6c9cb8..0eb3d0798 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -11,6 +11,7 @@ members = [ "crates/core-state-machine", "crates/core-types", "crates/core-votekeeper", + "crates/consensus-async", "crates/engine", "crates/metrics", "crates/network", @@ -72,6 +73,7 @@ malachitebft-core-driver = { version = "0.0.1", package = "informalsystem malachitebft-core-state-machine = { version = "0.0.1", package = "informalsystems-malachitebft-core-state-machine", path = "crates/core-state-machine" } malachitebft-core-types = { version = "0.0.1", package = "informalsystems-malachitebft-core-types", path = "crates/core-types" } malachitebft-core-votekeeper = { version = "0.0.1", package = "informalsystems-malachitebft-core-votekeeper", path = "crates/core-votekeeper" } +malachitebft-consensus-async = { version = "0.0.1", package = "informalsystems-malachitebft-consensus-async", path = "crates/consensus-async" } malachitebft-discovery = { version = "0.0.1", package = "informalsystems-malachitebft-discovery", path = "crates/discovery" } malachitebft-network = { version = "0.0.1", package = "informalsystems-malachitebft-network", path = "crates/network" } malachitebft-metrics = { version = "0.0.1", package = "informalsystems-malachitebft-metrics", path = "crates/metrics" } diff --git a/code/crates/consensus-async/Cargo.toml b/code/crates/consensus-async/Cargo.toml new file mode 100644 index 000000000..3797a1db5 --- /dev/null +++ b/code/crates/consensus-async/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "informalsystems-malachitebft-consensus-async" +description = "High-level async consensus API for the Malachite BFT consensus engine" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true +rust-version.workspace = true +readme = "../../../README.md" + +[package.metadata.docs.rs] +all-features = true + +[features] +tokio = ["dep:tokio"] + +[dependencies] +malachitebft-core-types = { workspace = true } +malachitebft-core-consensus = { workspace = true } +malachitebft-sync = { workspace = true } + +async-trait = { workspace = true } +tokio = { workspace = true, optional = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/code/crates/consensus-async/src/handlers/consensus.rs b/code/crates/consensus-async/src/handlers/consensus.rs new file mode 100644 index 000000000..833d5d925 --- /dev/null +++ b/code/crates/consensus-async/src/handlers/consensus.rs @@ -0,0 +1,48 @@ +use async_trait::async_trait; +use malachitebft_core_consensus::types::*; + +#[async_trait] +pub trait ConsensusHandler +where + Ctx: Context, +{ + type Error: core::error::Error; + + /// Consensus is starting a new round with the given proposer + async fn start_round( + &mut self, + height: Ctx::Height, + round: Round, + proposer: Ctx::Address, + ) -> Result<(), Self::Error>; + + /// Publish a message to peers + async fn publish(&mut self, msg: SignedConsensusMsg) -> Result<(), Self::Error>; + + /// Requests the application to build a value for consensus to run on. + async fn get_value( + &mut self, + height: Ctx::Height, + round: Round, + timeout: Timeout, + ) -> Result<(), Self::Error>; + + /// Get the validator set at the given height + async fn get_validator_set( + &mut self, + height: Ctx::Height, + ) -> Result, Self::Error>; + + /// Requests the application to re-stream a proposal that it has already seen. + async fn restream_value( + &mut self, + height: Ctx::Height, + round: Round, + valid_round: Round, + proposer: Ctx::Address, + value_id: ValueId, + ) -> Result<(), Self::Error>; + + /// Notifies the application that consensus has decided on a value. + async fn decide(&mut self, certificate: CommitCertificate) -> Result<(), Self::Error>; +} diff --git a/code/crates/consensus-async/src/handlers/impls/mod.rs b/code/crates/consensus-async/src/handlers/impls/mod.rs new file mode 100644 index 000000000..4e099cdaa --- /dev/null +++ b/code/crates/consensus-async/src/handlers/impls/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "tokio")] +pub mod tokio; diff --git a/code/crates/consensus-async/src/handlers/impls/tokio/mod.rs b/code/crates/consensus-async/src/handlers/impls/tokio/mod.rs new file mode 100644 index 000000000..6e354ef72 --- /dev/null +++ b/code/crates/consensus-async/src/handlers/impls/tokio/mod.rs @@ -0,0 +1,2 @@ +mod timeout; +pub use timeout::TimeoutHandler; diff --git a/code/crates/consensus-async/src/handlers/impls/tokio/timeout.rs b/code/crates/consensus-async/src/handlers/impls/tokio/timeout.rs new file mode 100644 index 000000000..f2ba0a7a7 --- /dev/null +++ b/code/crates/consensus-async/src/handlers/impls/tokio/timeout.rs @@ -0,0 +1,355 @@ +use async_trait::async_trait; +use thiserror::Error; + +use malachitebft_core_consensus::types::Timeout; +use tokio::sync::broadcast; +use tracing::trace; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Error)] +pub enum Never {} + +pub struct TimeoutHandler { + scheduler: TimerScheduler, +} + +#[async_trait] +impl crate::handlers::timeout::TimeoutHandler for TimeoutHandler { + type Error = Never; + + /// Reset all timeouts to their initial values + async fn reset_timeouts(&mut self) -> Result<(), Self::Error> { + Ok(()) + } + + /// Cancel all outstanding timeouts + async fn cancel_all_timeouts(&mut self) -> Result<(), Self::Error> { + self.scheduler.cancel_all(); + Ok(()) + } + + /// Cancel a given timeout + async fn cancel_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error> { + self.scheduler.cancel(&timeout); + Ok(()) + } + + /// Schedule a timeout + async fn schedule_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error> { + let duration = Duration::from_secs(1); + self.scheduler.start_timer(timeout, duration); + Ok(()) + } +} + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt::Debug; +use std::hash::Hash; +use std::ops::RangeFrom; +use std::sync::Arc; +use std::time::Duration; + +use tokio::task::JoinHandle; + +#[derive(Debug)] +struct Timer { + /// Message to give to the actor when the timer expires + key: Key, + + // Task that will notify the actor that the timer has elapsed + task: JoinHandle<()>, + + /// Generation counter to the timer to check if we received a timeout + /// message from an old timer that was enqueued in mailbox before canceled + generation: u64, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct TimeoutElapsed { + key: Key, + generation: u64, +} + +#[derive(Debug)] +pub struct TimerScheduler +where + Key: Clone + Eq + Hash + Send + 'static, +{ + tx: Arc>>, + timers: HashMap>, + generations: RangeFrom, +} + +impl TimerScheduler +where + Key: Clone + Eq + Hash + Send + 'static, +{ + pub fn new() -> Self { + let (tx, _) = broadcast::channel(1); + + Self { + tx: Arc::new(tx), + timers: HashMap::new(), + generations: 1.., + } + } + + /// Start a timer that will send `msg` once to the actor after the given `timeout`. + /// + /// Each timer has a key and if a new timer with same key is started + /// the previous is cancelled. + /// + /// # Warning + /// It is NOT guaranteed that a message from the previous timer is not received, + /// as it could already be enqueued in the mailbox when the new timer was started. + /// + /// When the actor receives a timeout message for timer from the scheduler, it should + /// check if the timer is still active by calling [`TimerScheduler::intercept_timer_msg`] + /// and ignore the message otherwise. + pub fn start_timer(&mut self, key: Key, timeout: Duration) + where + Key: Clone + Send + 'static, + { + self.cancel(&key); + + let generation = self + .generations + .next() + .expect("generation counter overflowed"); + + let task = { + let key = key.clone(); + let tx = Arc::clone(&self.tx); + + tokio::spawn(async move { + tokio::time::sleep(timeout).await; + let _ = tx.send(TimeoutElapsed { key, generation }); + }) + }; + + self.timers.insert( + key.clone(), + Timer { + key, + task, + generation, + }, + ); + } + + /// Check if a timer with a given `key` is active, ie. it hasn't been canceled nor has it elapsed yet. + pub fn is_timer_active(&self, key: &Key) -> bool { + self.timers.contains_key(key) + } + + /// Cancel a timer with a given `key`. + /// + /// If canceling a timer that was already canceled, or key never was used to start a timer + /// this operation will do nothing. + /// + /// # Warning + /// It is NOT guaranteed that a message from a canceled timer, including its previous incarnation + /// for the same key, will not be received by the actor, as the message might already + /// be enqueued in the mailbox when cancel is called. + /// + /// When the actor receives a timeout message for timer from the scheduler, it should + /// check if the timer is still active by calling [`TimerScheduler::intercept_timer_msg`] + /// and ignore the message otherwise. + pub fn cancel(&mut self, key: &Key) { + if let Some(timer) = self.timers.remove(key) { + timer.task.abort(); + } + } + + /// Cancel all timers. + pub fn cancel_all(&mut self) { + self.timers.drain().for_each(|(_, timer)| { + timer.task.abort(); + }); + } + + /// Intercepts a timer message and checks the state of the timer associated with the provided `timer_msg`: + /// + /// 1. If the timer message was from canceled timer that was already enqueued in mailbox, returns `None`. + /// 2. If the timer message was from an old timer that was enqueued in mailbox before being canceled, returns `None`. + /// 3. Otherwise it is a valid timer message, returns the associated `Key` wrapped in `Some`. + pub fn intercept_timer_msg(&mut self, timer_msg: TimeoutElapsed) -> Option + where + Key: Debug, + { + match self.timers.entry(timer_msg.key) { + // The timer message was from canceled timer that was already enqueued in mailbox + Entry::Vacant(entry) => { + let key = entry.key(); + trace!("Received timer {key:?} that has been removed, discarding"); + None + } + + // The timer message was from an old timer that was enqueued in mailbox before being canceled + Entry::Occupied(entry) if timer_msg.generation != entry.get().generation => { + let (key, timer) = (entry.key(), entry.get()); + + trace!( + "Received timer {key:?} from old generation {}, expected generation {}, discarding", + timer_msg.generation, + timer.generation, + ); + + None + } + + // Valid timer message + Entry::Occupied(entry) => { + let timer = entry.remove(); + Some(timer.key) + } + } + } +} + +impl Drop for TimerScheduler +where + Key: Clone + Eq + Hash + Send + 'static, +{ + fn drop(&mut self) { + self.cancel_all(); + } +} + +// #[cfg(test)] +// mod tests { +// use super::*; +// +// use ractor::{Actor, ActorRef}; +// use std::time::Duration; +// use tokio::time::sleep; +// +// #[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] +// struct TestKey(&'static str); +// +// #[derive(Debug)] +// struct TestMsg(TimeoutElapsed); +// +// impl From> for TestMsg { +// fn from(timer_msg: TimeoutElapsed) -> Self { +// TestMsg(timer_msg) +// } +// } +// +// struct TestActor; +// +// #[async_trait::async_trait] +// impl Actor for TestActor { +// type State = (); +// type Arguments = (); +// type Msg = TestMsg; +// +// async fn pre_start( +// &self, +// _myself: ActorRef, +// _args: (), +// ) -> Result<(), ractor::ActorProcessingErr> { +// Ok(()) +// } +// +// async fn handle( +// &self, +// _myself: ActorRef, +// TestMsg(elapsed): TestMsg, +// _state: &mut (), +// ) -> Result<(), ractor::ActorProcessingErr> { +// println!("Received timer message: {elapsed:?}"); +// Ok(()) +// } +// } +// +// async fn spawn() -> TimerScheduler { +// let actor_ref = TestActor::spawn(None, TestActor, ()).await.unwrap().0; +// // let subscriber: OutputPortSubscriber> = ; +// TimerScheduler::new(Box::new(actor_ref)) +// } +// +// #[tokio::test] +// async fn test_start_timer() { +// let mut scheduler = spawn().await; +// let key = TestKey("timer1"); +// +// scheduler.start_timer(key, Duration::from_millis(100)); +// assert!(scheduler.is_timer_active(&key)); +// +// sleep(Duration::from_millis(150)).await; +// let elapsed_key = scheduler.intercept_timer_msg(TimeoutElapsed { key, generation: 1 }); +// assert_eq!(elapsed_key, Some(key)); +// +// assert!(!scheduler.is_timer_active(&key)); +// } +// +// #[tokio::test] +// async fn test_cancel_timer() { +// let mut scheduler = spawn().await; +// let key = TestKey("timer1"); +// +// scheduler.start_timer(key, Duration::from_millis(100)); +// scheduler.cancel(&key); +// +// assert!(!scheduler.is_timer_active(&key)); +// } +// +// #[tokio::test] +// async fn test_cancel_all_timers() { +// let mut scheduler = spawn().await; +// +// scheduler.start_timer(TestKey("timer1"), Duration::from_millis(100)); +// scheduler.start_timer(TestKey("timer2"), Duration::from_millis(200)); +// +// scheduler.cancel_all(); +// +// assert!(!scheduler.is_timer_active(&TestKey("timer1"))); +// assert!(!scheduler.is_timer_active(&TestKey("timer2"))); +// } +// +// #[tokio::test] +// async fn test_intercept_timer_msg_valid() { +// let mut scheduler = spawn().await; +// let key = TestKey("timer1"); +// +// scheduler.start_timer(key, Duration::from_millis(100)); +// sleep(Duration::from_millis(150)).await; +// +// let timer_msg = TimeoutElapsed { key, generation: 1 }; +// +// let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); +// +// assert_eq!(intercepted_msg, Some(key)); +// } +// +// #[tokio::test] +// async fn test_intercept_timer_msg_invalid_generation() { +// let mut scheduler = spawn().await; +// let key = TestKey("timer1"); +// +// scheduler.start_timer(key, Duration::from_millis(100)); +// scheduler.start_timer(key, Duration::from_millis(200)); +// +// let timer_msg = TimeoutElapsed { key, generation: 1 }; +// +// let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); +// +// assert_eq!(intercepted_msg, None); +// } +// +// #[tokio::test] +// async fn test_intercept_timer_msg_cancelled() { +// let mut scheduler = spawn().await; +// let key = TestKey("timer1"); +// +// scheduler.start_timer(key, Duration::from_millis(100)); +// scheduler.cancel(&key); +// +// let timer_msg = TimeoutElapsed { key, generation: 1 }; +// +// let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); +// +// assert_eq!(intercepted_msg, None); +// } +// } diff --git a/code/crates/consensus-async/src/handlers/mod.rs b/code/crates/consensus-async/src/handlers/mod.rs new file mode 100644 index 000000000..ae0cf8162 --- /dev/null +++ b/code/crates/consensus-async/src/handlers/mod.rs @@ -0,0 +1,6 @@ +pub mod consensus; +pub mod impls; +pub mod signing; +pub mod sync; +pub mod timeout; +pub mod wal; diff --git a/code/crates/consensus-async/src/handlers/signing.rs b/code/crates/consensus-async/src/handlers/signing.rs new file mode 100644 index 000000000..0173dc920 --- /dev/null +++ b/code/crates/consensus-async/src/handlers/signing.rs @@ -0,0 +1,29 @@ +use async_trait::async_trait; +use malachitebft_core_consensus::types::*; + +#[async_trait] +pub trait SigningHandler +where + Ctx: Context, +{ + /// Sign a vote with this node's private key + async fn sign_vote(&mut self, vote: Ctx::Vote) -> SignedVote; + + /// Sign a proposal with this node's private key + async fn sign_proposal(&mut self, proposal: Ctx::Proposal) -> SignedProposal; + + /// Verify a signature + async fn verify_signature( + &mut self, + signed_msg: SignedMessage>, + public_key: PublicKey, + ) -> bool; + + /// Verify a commit certificate + async fn verify_certificate( + &mut self, + certificate: CommitCertificate, + validator_set: Ctx::ValidatorSet, + threshold_params: ThresholdParams, + ) -> Result<(), CertificateError>; +} diff --git a/code/crates/consensus-async/src/handlers/sync.rs b/code/crates/consensus-async/src/handlers/sync.rs new file mode 100644 index 000000000..b4a250836 --- /dev/null +++ b/code/crates/consensus-async/src/handlers/sync.rs @@ -0,0 +1,22 @@ +use async_trait::async_trait; +use malachitebft_core_consensus::types::*; + +#[async_trait] +pub trait SyncHandler +where + Ctx: Context, +{ + type Error: core::error::Error; + + /// Consensus has been stuck in Prevote or Precommit step, ask for vote sets from peers + async fn get_vote_set(&mut self, height: Ctx::Height, round: Round) -> Result<(), Self::Error>; + + /// A peer has required our vote set, send the response + async fn send_vote_set_response( + &mut self, + request_id: RequestId, + height: Ctx::Height, + round: Round, + vote_set: VoteSet, + ); +} diff --git a/code/crates/consensus-async/src/handlers/timeout.rs b/code/crates/consensus-async/src/handlers/timeout.rs new file mode 100644 index 000000000..495e56bcc --- /dev/null +++ b/code/crates/consensus-async/src/handlers/timeout.rs @@ -0,0 +1,19 @@ +use async_trait::async_trait; +use malachitebft_core_consensus::types::*; + +#[async_trait] +pub trait TimeoutHandler { + type Error: core::error::Error; + + /// Reset all timeouts to their initial values + async fn reset_timeouts(&mut self) -> Result<(), Self::Error>; + + /// Cancel all outstanding timeouts + async fn cancel_all_timeouts(&mut self) -> Result<(), Self::Error>; + + /// Cancel a given timeout + async fn cancel_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error>; + + /// Schedule a timeout + async fn schedule_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error>; +} diff --git a/code/crates/consensus-async/src/handlers/wal.rs b/code/crates/consensus-async/src/handlers/wal.rs new file mode 100644 index 000000000..deedf7b81 --- /dev/null +++ b/code/crates/consensus-async/src/handlers/wal.rs @@ -0,0 +1,16 @@ +use async_trait::async_trait; +use malachitebft_core_consensus::types::*; + +#[async_trait] +pub trait WalHandler +where + Ctx: Context, +{ + type Error: core::error::Error; + + /// Append a consensus message to the Write-Ahead Log for crash recovery + async fn append_msg(&mut self, msg: SignedConsensusMsg) -> Result<(), Self::Error>; + + /// Append a timeout to the Write-Ahead Log for crash recovery + async fn append_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error>; +} diff --git a/code/crates/consensus-async/src/lib.rs b/code/crates/consensus-async/src/lib.rs new file mode 100644 index 000000000..c3d449565 --- /dev/null +++ b/code/crates/consensus-async/src/lib.rs @@ -0,0 +1 @@ +pub mod handlers; From 46b4cdf85c28ef580cd2c7efe798731335932966 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 23 Jan 2025 17:56:43 +0100 Subject: [PATCH 3/6] wip --- .../consensus-async/src/handlers/consensus.rs | 48 --- .../consensus-async/src/handlers/impls/mod.rs | 2 - .../src/handlers/impls/tokio/mod.rs | 2 - .../consensus-async/src/handlers/mod.rs | 6 - .../consensus-async/src/handlers/signing.rs | 29 -- .../consensus-async/src/handlers/sync.rs | 22 -- .../consensus-async/src/handlers/timeout.rs | 19 -- .../consensus-async/src/handlers/wal.rs | 16 - code/crates/consensus-async/src/lib.rs | 3 +- .../src/{handlers/impls/tokio => }/timeout.rs | 293 +++++++----------- 10 files changed, 115 insertions(+), 325 deletions(-) delete mode 100644 code/crates/consensus-async/src/handlers/consensus.rs delete mode 100644 code/crates/consensus-async/src/handlers/impls/mod.rs delete mode 100644 code/crates/consensus-async/src/handlers/impls/tokio/mod.rs delete mode 100644 code/crates/consensus-async/src/handlers/mod.rs delete mode 100644 code/crates/consensus-async/src/handlers/signing.rs delete mode 100644 code/crates/consensus-async/src/handlers/sync.rs delete mode 100644 code/crates/consensus-async/src/handlers/timeout.rs delete mode 100644 code/crates/consensus-async/src/handlers/wal.rs rename code/crates/consensus-async/src/{handlers/impls/tokio => }/timeout.rs (51%) diff --git a/code/crates/consensus-async/src/handlers/consensus.rs b/code/crates/consensus-async/src/handlers/consensus.rs deleted file mode 100644 index 833d5d925..000000000 --- a/code/crates/consensus-async/src/handlers/consensus.rs +++ /dev/null @@ -1,48 +0,0 @@ -use async_trait::async_trait; -use malachitebft_core_consensus::types::*; - -#[async_trait] -pub trait ConsensusHandler -where - Ctx: Context, -{ - type Error: core::error::Error; - - /// Consensus is starting a new round with the given proposer - async fn start_round( - &mut self, - height: Ctx::Height, - round: Round, - proposer: Ctx::Address, - ) -> Result<(), Self::Error>; - - /// Publish a message to peers - async fn publish(&mut self, msg: SignedConsensusMsg) -> Result<(), Self::Error>; - - /// Requests the application to build a value for consensus to run on. - async fn get_value( - &mut self, - height: Ctx::Height, - round: Round, - timeout: Timeout, - ) -> Result<(), Self::Error>; - - /// Get the validator set at the given height - async fn get_validator_set( - &mut self, - height: Ctx::Height, - ) -> Result, Self::Error>; - - /// Requests the application to re-stream a proposal that it has already seen. - async fn restream_value( - &mut self, - height: Ctx::Height, - round: Round, - valid_round: Round, - proposer: Ctx::Address, - value_id: ValueId, - ) -> Result<(), Self::Error>; - - /// Notifies the application that consensus has decided on a value. - async fn decide(&mut self, certificate: CommitCertificate) -> Result<(), Self::Error>; -} diff --git a/code/crates/consensus-async/src/handlers/impls/mod.rs b/code/crates/consensus-async/src/handlers/impls/mod.rs deleted file mode 100644 index 4e099cdaa..000000000 --- a/code/crates/consensus-async/src/handlers/impls/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[cfg(feature = "tokio")] -pub mod tokio; diff --git a/code/crates/consensus-async/src/handlers/impls/tokio/mod.rs b/code/crates/consensus-async/src/handlers/impls/tokio/mod.rs deleted file mode 100644 index 6e354ef72..000000000 --- a/code/crates/consensus-async/src/handlers/impls/tokio/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod timeout; -pub use timeout::TimeoutHandler; diff --git a/code/crates/consensus-async/src/handlers/mod.rs b/code/crates/consensus-async/src/handlers/mod.rs deleted file mode 100644 index ae0cf8162..000000000 --- a/code/crates/consensus-async/src/handlers/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod consensus; -pub mod impls; -pub mod signing; -pub mod sync; -pub mod timeout; -pub mod wal; diff --git a/code/crates/consensus-async/src/handlers/signing.rs b/code/crates/consensus-async/src/handlers/signing.rs deleted file mode 100644 index 0173dc920..000000000 --- a/code/crates/consensus-async/src/handlers/signing.rs +++ /dev/null @@ -1,29 +0,0 @@ -use async_trait::async_trait; -use malachitebft_core_consensus::types::*; - -#[async_trait] -pub trait SigningHandler -where - Ctx: Context, -{ - /// Sign a vote with this node's private key - async fn sign_vote(&mut self, vote: Ctx::Vote) -> SignedVote; - - /// Sign a proposal with this node's private key - async fn sign_proposal(&mut self, proposal: Ctx::Proposal) -> SignedProposal; - - /// Verify a signature - async fn verify_signature( - &mut self, - signed_msg: SignedMessage>, - public_key: PublicKey, - ) -> bool; - - /// Verify a commit certificate - async fn verify_certificate( - &mut self, - certificate: CommitCertificate, - validator_set: Ctx::ValidatorSet, - threshold_params: ThresholdParams, - ) -> Result<(), CertificateError>; -} diff --git a/code/crates/consensus-async/src/handlers/sync.rs b/code/crates/consensus-async/src/handlers/sync.rs deleted file mode 100644 index b4a250836..000000000 --- a/code/crates/consensus-async/src/handlers/sync.rs +++ /dev/null @@ -1,22 +0,0 @@ -use async_trait::async_trait; -use malachitebft_core_consensus::types::*; - -#[async_trait] -pub trait SyncHandler -where - Ctx: Context, -{ - type Error: core::error::Error; - - /// Consensus has been stuck in Prevote or Precommit step, ask for vote sets from peers - async fn get_vote_set(&mut self, height: Ctx::Height, round: Round) -> Result<(), Self::Error>; - - /// A peer has required our vote set, send the response - async fn send_vote_set_response( - &mut self, - request_id: RequestId, - height: Ctx::Height, - round: Round, - vote_set: VoteSet, - ); -} diff --git a/code/crates/consensus-async/src/handlers/timeout.rs b/code/crates/consensus-async/src/handlers/timeout.rs deleted file mode 100644 index 495e56bcc..000000000 --- a/code/crates/consensus-async/src/handlers/timeout.rs +++ /dev/null @@ -1,19 +0,0 @@ -use async_trait::async_trait; -use malachitebft_core_consensus::types::*; - -#[async_trait] -pub trait TimeoutHandler { - type Error: core::error::Error; - - /// Reset all timeouts to their initial values - async fn reset_timeouts(&mut self) -> Result<(), Self::Error>; - - /// Cancel all outstanding timeouts - async fn cancel_all_timeouts(&mut self) -> Result<(), Self::Error>; - - /// Cancel a given timeout - async fn cancel_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error>; - - /// Schedule a timeout - async fn schedule_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error>; -} diff --git a/code/crates/consensus-async/src/handlers/wal.rs b/code/crates/consensus-async/src/handlers/wal.rs deleted file mode 100644 index deedf7b81..000000000 --- a/code/crates/consensus-async/src/handlers/wal.rs +++ /dev/null @@ -1,16 +0,0 @@ -use async_trait::async_trait; -use malachitebft_core_consensus::types::*; - -#[async_trait] -pub trait WalHandler -where - Ctx: Context, -{ - type Error: core::error::Error; - - /// Append a consensus message to the Write-Ahead Log for crash recovery - async fn append_msg(&mut self, msg: SignedConsensusMsg) -> Result<(), Self::Error>; - - /// Append a timeout to the Write-Ahead Log for crash recovery - async fn append_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error>; -} diff --git a/code/crates/consensus-async/src/lib.rs b/code/crates/consensus-async/src/lib.rs index c3d449565..582f60dbd 100644 --- a/code/crates/consensus-async/src/lib.rs +++ b/code/crates/consensus-async/src/lib.rs @@ -1 +1,2 @@ -pub mod handlers; +#[cfg(feature = "tokio")] +pub mod timeout; diff --git a/code/crates/consensus-async/src/handlers/impls/tokio/timeout.rs b/code/crates/consensus-async/src/timeout.rs similarity index 51% rename from code/crates/consensus-async/src/handlers/impls/tokio/timeout.rs rename to code/crates/consensus-async/src/timeout.rs index f2ba0a7a7..7f3f43ab8 100644 --- a/code/crates/consensus-async/src/handlers/impls/tokio/timeout.rs +++ b/code/crates/consensus-async/src/timeout.rs @@ -1,46 +1,3 @@ -use async_trait::async_trait; -use thiserror::Error; - -use malachitebft_core_consensus::types::Timeout; -use tokio::sync::broadcast; -use tracing::trace; - -#[derive(Copy, Clone, Debug, PartialEq, Eq, Error)] -pub enum Never {} - -pub struct TimeoutHandler { - scheduler: TimerScheduler, -} - -#[async_trait] -impl crate::handlers::timeout::TimeoutHandler for TimeoutHandler { - type Error = Never; - - /// Reset all timeouts to their initial values - async fn reset_timeouts(&mut self) -> Result<(), Self::Error> { - Ok(()) - } - - /// Cancel all outstanding timeouts - async fn cancel_all_timeouts(&mut self) -> Result<(), Self::Error> { - self.scheduler.cancel_all(); - Ok(()) - } - - /// Cancel a given timeout - async fn cancel_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error> { - self.scheduler.cancel(&timeout); - Ok(()) - } - - /// Schedule a timeout - async fn schedule_timeout(&mut self, timeout: Timeout) -> Result<(), Self::Error> { - let duration = Duration::from_secs(1); - self.scheduler.start_timer(timeout, duration); - Ok(()) - } -} - use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::Debug; @@ -49,7 +6,9 @@ use std::ops::RangeFrom; use std::sync::Arc; use std::time::Duration; +use tokio::sync::broadcast; use tokio::task::JoinHandle; +use tracing::trace; #[derive(Debug)] struct Timer { @@ -94,6 +53,10 @@ where } } + pub fn subscribe(&self) -> broadcast::Receiver> { + self.tx.subscribe() + } + /// Start a timer that will send `msg` once to the actor after the given `timeout`. /// /// Each timer has a key and if a new timer with same key is started @@ -207,6 +170,15 @@ where } } +impl Default for TimerScheduler +where + Key: Clone + Eq + Hash + Send + 'static, +{ + fn default() -> Self { + Self::new() + } +} + impl Drop for TimerScheduler where Key: Clone + Eq + Hash + Send + 'static, @@ -216,140 +188,101 @@ where } } -// #[cfg(test)] -// mod tests { -// use super::*; -// -// use ractor::{Actor, ActorRef}; -// use std::time::Duration; -// use tokio::time::sleep; -// -// #[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] -// struct TestKey(&'static str); -// -// #[derive(Debug)] -// struct TestMsg(TimeoutElapsed); -// -// impl From> for TestMsg { -// fn from(timer_msg: TimeoutElapsed) -> Self { -// TestMsg(timer_msg) -// } -// } -// -// struct TestActor; -// -// #[async_trait::async_trait] -// impl Actor for TestActor { -// type State = (); -// type Arguments = (); -// type Msg = TestMsg; -// -// async fn pre_start( -// &self, -// _myself: ActorRef, -// _args: (), -// ) -> Result<(), ractor::ActorProcessingErr> { -// Ok(()) -// } -// -// async fn handle( -// &self, -// _myself: ActorRef, -// TestMsg(elapsed): TestMsg, -// _state: &mut (), -// ) -> Result<(), ractor::ActorProcessingErr> { -// println!("Received timer message: {elapsed:?}"); -// Ok(()) -// } -// } -// -// async fn spawn() -> TimerScheduler { -// let actor_ref = TestActor::spawn(None, TestActor, ()).await.unwrap().0; -// // let subscriber: OutputPortSubscriber> = ; -// TimerScheduler::new(Box::new(actor_ref)) -// } -// -// #[tokio::test] -// async fn test_start_timer() { -// let mut scheduler = spawn().await; -// let key = TestKey("timer1"); -// -// scheduler.start_timer(key, Duration::from_millis(100)); -// assert!(scheduler.is_timer_active(&key)); -// -// sleep(Duration::from_millis(150)).await; -// let elapsed_key = scheduler.intercept_timer_msg(TimeoutElapsed { key, generation: 1 }); -// assert_eq!(elapsed_key, Some(key)); -// -// assert!(!scheduler.is_timer_active(&key)); -// } -// -// #[tokio::test] -// async fn test_cancel_timer() { -// let mut scheduler = spawn().await; -// let key = TestKey("timer1"); -// -// scheduler.start_timer(key, Duration::from_millis(100)); -// scheduler.cancel(&key); -// -// assert!(!scheduler.is_timer_active(&key)); -// } -// -// #[tokio::test] -// async fn test_cancel_all_timers() { -// let mut scheduler = spawn().await; -// -// scheduler.start_timer(TestKey("timer1"), Duration::from_millis(100)); -// scheduler.start_timer(TestKey("timer2"), Duration::from_millis(200)); -// -// scheduler.cancel_all(); -// -// assert!(!scheduler.is_timer_active(&TestKey("timer1"))); -// assert!(!scheduler.is_timer_active(&TestKey("timer2"))); -// } -// -// #[tokio::test] -// async fn test_intercept_timer_msg_valid() { -// let mut scheduler = spawn().await; -// let key = TestKey("timer1"); -// -// scheduler.start_timer(key, Duration::from_millis(100)); -// sleep(Duration::from_millis(150)).await; -// -// let timer_msg = TimeoutElapsed { key, generation: 1 }; -// -// let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); -// -// assert_eq!(intercepted_msg, Some(key)); -// } -// -// #[tokio::test] -// async fn test_intercept_timer_msg_invalid_generation() { -// let mut scheduler = spawn().await; -// let key = TestKey("timer1"); -// -// scheduler.start_timer(key, Duration::from_millis(100)); -// scheduler.start_timer(key, Duration::from_millis(200)); -// -// let timer_msg = TimeoutElapsed { key, generation: 1 }; -// -// let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); -// -// assert_eq!(intercepted_msg, None); -// } -// -// #[tokio::test] -// async fn test_intercept_timer_msg_cancelled() { -// let mut scheduler = spawn().await; -// let key = TestKey("timer1"); -// -// scheduler.start_timer(key, Duration::from_millis(100)); -// scheduler.cancel(&key); -// -// let timer_msg = TimeoutElapsed { key, generation: 1 }; -// -// let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); -// -// assert_eq!(intercepted_msg, None); -// } -// } +#[cfg(test)] +mod tests { + use super::*; + + use std::time::Duration; + use tokio::time::sleep; + + #[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] + struct TestKey(&'static str); + + async fn scheduler() -> TimerScheduler { + TimerScheduler::new() + } + + #[tokio::test] + async fn test_start_timer() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start_timer(key, Duration::from_millis(100)); + assert!(scheduler.is_timer_active(&key)); + + sleep(Duration::from_millis(150)).await; + let elapsed_key = scheduler.intercept_timer_msg(TimeoutElapsed { key, generation: 1 }); + assert_eq!(elapsed_key, Some(key)); + + assert!(!scheduler.is_timer_active(&key)); + } + + #[tokio::test] + async fn test_cancel_timer() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start_timer(key, Duration::from_millis(100)); + scheduler.cancel(&key); + + assert!(!scheduler.is_timer_active(&key)); + } + + #[tokio::test] + async fn test_cancel_all_timers() { + let mut scheduler = scheduler().await; + + scheduler.start_timer(TestKey("timer1"), Duration::from_millis(100)); + scheduler.start_timer(TestKey("timer2"), Duration::from_millis(200)); + + scheduler.cancel_all(); + + assert!(!scheduler.is_timer_active(&TestKey("timer1"))); + assert!(!scheduler.is_timer_active(&TestKey("timer2"))); + } + + #[tokio::test] + async fn test_intercept_timer_msg_valid() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start_timer(key, Duration::from_millis(100)); + sleep(Duration::from_millis(150)).await; + + let timer_msg = TimeoutElapsed { key, generation: 1 }; + + let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); + + assert_eq!(intercepted_msg, Some(key)); + } + + #[tokio::test] + async fn test_intercept_timer_msg_invalid_generation() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start_timer(key, Duration::from_millis(100)); + scheduler.start_timer(key, Duration::from_millis(200)); + + let timer_msg = TimeoutElapsed { key, generation: 1 }; + + let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); + + assert_eq!(intercepted_msg, None); + } + + #[tokio::test] + async fn test_intercept_timer_msg_cancelled() { + let mut scheduler = scheduler().await; + let key = TestKey("timer1"); + + scheduler.start_timer(key, Duration::from_millis(100)); + scheduler.cancel(&key); + + let timer_msg = TimeoutElapsed { key, generation: 1 }; + + let intercepted_msg = scheduler.intercept_timer_msg(timer_msg); + + assert_eq!(intercepted_msg, None); + } +} From e684374a42e5325c05e27c07b249cee92be91b85 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 24 Jan 2025 14:54:52 +0100 Subject: [PATCH 4/6] wip --- code/Cargo.lock | 1 + code/crates/consensus-async/Cargo.toml | 5 +- code/crates/consensus-async/src/lib.rs | 218 +++++++++++++++++- code/crates/consensus-async/src/query.rs | 146 ++++++++++++ .../src/{timeout.rs => timers.rs} | 30 +-- code/crates/core-consensus/src/macros.rs | 6 +- code/crates/core-consensus/src/prelude.rs | 8 +- code/crates/core-consensus/src/types.rs | 6 + 8 files changed, 391 insertions(+), 29 deletions(-) create mode 100644 code/crates/consensus-async/src/query.rs rename code/crates/consensus-async/src/{timeout.rs => timers.rs} (90%) diff --git a/code/Cargo.lock b/code/Cargo.lock index 3d73e076d..882f8cf04 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2152,6 +2152,7 @@ name = "informalsystems-malachitebft-consensus-async" version = "0.0.1" dependencies = [ "async-trait", + "derive-where", "informalsystems-malachitebft-core-consensus", "informalsystems-malachitebft-core-types", "informalsystems-malachitebft-sync", diff --git a/code/crates/consensus-async/Cargo.toml b/code/crates/consensus-async/Cargo.toml index 3797a1db5..ff257355c 100644 --- a/code/crates/consensus-async/Cargo.toml +++ b/code/crates/consensus-async/Cargo.toml @@ -13,7 +13,7 @@ readme = "../../../README.md" all-features = true [features] -tokio = ["dep:tokio"] +timers = [] [dependencies] malachitebft-core-types = { workspace = true } @@ -21,7 +21,8 @@ malachitebft-core-consensus = { workspace = true } malachitebft-sync = { workspace = true } async-trait = { workspace = true } -tokio = { workspace = true, optional = true } +derive-where = { workspace = true } +tokio = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } diff --git a/code/crates/consensus-async/src/lib.rs b/code/crates/consensus-async/src/lib.rs index 582f60dbd..d13fd1752 100644 --- a/code/crates/consensus-async/src/lib.rs +++ b/code/crates/consensus-async/src/lib.rs @@ -1,2 +1,216 @@ -#[cfg(feature = "tokio")] -pub mod timeout; +use std::ops::ControlFlow; +use std::time::Duration; + +use derive_where::derive_where; +use tokio::sync::mpsc; + +use malachitebft_core_consensus::{Effect, Params, Resumable, Resume}; + +pub use malachitebft_core_consensus::types::*; +pub use malachitebft_core_consensus::{process, Error as ConsensusError, Input, State}; + +mod query; +pub use query::{Query, Reply}; + +#[cfg(feature = "timers")] +mod timers; + +#[cfg(feature = "timers")] +use timers::Timers; + +#[derive(thiserror::Error)] +#[derive_where(Debug)] +pub enum Error +where + Ctx: Context, +{ + #[error("Consensus error: {0}")] + Consensus(#[from] ConsensusError), + + #[error("Send error")] + SendError(Input), +} + +pub struct Handle +where + Ctx: Context, +{ + capacity: usize, + tx_input: mpsc::Sender<(Input, mpsc::Sender>)>, +} + +impl Handle +where + Ctx: Context, +{ + pub async fn process( + &self, + input: Input, + ) -> Result>, Error> { + let (tx_query, rx_query) = mpsc::channel(self.capacity); + + self.tx_input + .send((input, tx_query)) + .await + .map_err(|e| Error::SendError(e.0 .0))?; + + Ok(rx_query) + } +} + +pub struct Consensus +where + Ctx: Context, +{ + state: State, + #[cfg(feature = "timers")] + timers: Timers, + metrics: Metrics, + rx_input: mpsc::Receiver<(Input, mpsc::Sender>)>, +} + +impl Consensus +where + Ctx: Context, +{ + pub fn new(ctx: Ctx, params: Params, capacity: usize) -> (Self, Handle) { + let (tx_input, rx_input) = mpsc::channel(capacity); + + ( + Self { + state: State::new(ctx, params), + #[cfg(feature = "timers")] + timers: Timers::new(), + metrics: Metrics::default(), + rx_input, + }, + Handle { capacity, tx_input }, + ) + } + + pub async fn run(mut self) { + loop { + match self.run_inner().await { + Ok(ControlFlow::Continue(())) => continue, + Ok(ControlFlow::Break(())) => break, + Err(e) => { + tracing::error!("Error: {e}"); + } + } + } + } + + pub async fn run_inner(&mut self) -> Result, Error> { + match self.rx_input.recv().await { + Some((input, tx_query)) => { + self.process(input, tx_query).await?; + Ok(ControlFlow::Continue(())) + } + None => Ok(ControlFlow::Break(())), + } + } + + pub async fn process( + &mut self, + input: Input, + tx_query: mpsc::Sender>, + ) -> Result<(), Error> { + process!( + input: input, + state: &mut self.state, + metrics: &self.metrics, + with: effect => handle_effect( + effect, + #[cfg(feature = "timers")] + &mut self.timers, + &tx_query + ).await + ) + } +} + +async fn handle_effect( + effect: Effect, + #[cfg(feature = "timers")] timers: &mut Timers, + tx_query: &mpsc::Sender>, +) -> Result, Error> +where + Ctx: Context, +{ + match effect { + // Timers + #[cfg(feature = "timers")] + Effect::ResetTimeouts(resume) => { + // TODO + Ok(resume.resume_with(())) + } + + #[cfg(not(feature = "timers"))] + Effect::ResetTimeouts(resume) => { + // TODO + Ok(resume.resume_with(())) + } + + #[cfg(feature = "timers")] + Effect::CancelAllTimeouts(resume) => { + timers.cancel_all(); + Ok(resume.resume_with(())) + } + + #[cfg(not(feature = "timers"))] + Effect::CancelAllTimeouts(resume) => { + // TODO + Ok(resume.resume_with(())) + } + + #[cfg(feature = "timers")] + Effect::CancelTimeout(timeout, resume) => { + timers.cancel(&timeout); + Ok(resume.resume_with(())) + } + + #[cfg(not(feature = "timers"))] + Effect::CancelTimeout(timeout, resume) => { + // TODO + Ok(resume.resume_with(())) + } + + #[cfg(feature = "timers")] + Effect::ScheduleTimeout(timeout, resume) => { + let duration = Duration::from_secs(1); + timers.start(timeout, duration); + + Ok(resume.resume_with(())) + } + + #[cfg(not(feature = "timers"))] + Effect::ScheduleTimeout(timeout, resume) => { + // TODO + Ok(resume.resume_with(())) + } + + // Consensus + Effect::StartRound(height, round, proposer, resume) => todo!(), + Effect::Publish(signed_consensus_msg, resume) => todo!(), + Effect::GetValue(height, round, timeout, resume) => todo!(), + Effect::RestreamValue(height, round, pol_round, proposer, id, resume) => todo!(), + Effect::GetValidatorSet(height, resume) => todo!(), + Effect::Decide(certificate, resume) => todo!(), + + // Vote Sync + Effect::GetVoteSet(height, round, resume) => todo!(), + Effect::SendVoteSetResponse(request_id, height, round, vote_set, resume) => todo!(), + + // WAL + Effect::WalAppendMessage(msg, resume) => todo!(), + Effect::WalAppendTimeout(timeout, resume) => todo!(), + + // Signing + Effect::SignVote(vote, resume) => todo!(), + Effect::SignProposal(proposal, resume) => todo!(), + Effect::VerifySignature(msg, public_key, resume) => todo!(), + Effect::VerifyCertificate(certificate, validator_set, threshold_params, resume) => { + todo!() + } + } +} diff --git a/code/crates/consensus-async/src/query.rs b/code/crates/consensus-async/src/query.rs new file mode 100644 index 000000000..25050cbfc --- /dev/null +++ b/code/crates/consensus-async/src/query.rs @@ -0,0 +1,146 @@ +use derive_where::derive_where; +use tokio::sync::oneshot; + +pub use malachitebft_core_consensus::types::*; + +pub type Reply = oneshot::Sender; + +#[cfg(not(feature = "timers"))] +#[must_use] +#[derive(Debug)] +pub enum TimersQuery { + /// Reset all timeouts to their initial values + ResetTimeouts(Reply<()>), + + /// Cancel all outstanding timeouts + CancelAllTimeouts(Reply<()>), + + /// Cancel a given timeout + CancelTimeout(Timeout, Reply<()>), + + /// Schedule a timeout + ScheduleTimeout(Timeout, Reply<()>), +} + +#[must_use] +#[derive_where(Debug)] +pub enum Query +where + Ctx: Context, +{ + #[cfg(not(feature = "timers"))] + Timers(TimersQuery), + + Consensus(ConsensusQuery), + + Signing(SigningQuery), + + Sync(SyncQuery), + + Wal(WalQuery), +} + +#[must_use] +#[derive_where(Debug)] +pub enum ConsensusQuery +where + Ctx: Context, +{ + /// Consensus is starting a new round with the given proposer + StartRound(Ctx::Height, Round, Ctx::Address, Reply<()>), + + /// Get the validator set at the given height + GetValidatorSet(Ctx::Height, Reply>), + + /// Publish a message to peers + Publish(SignedConsensusMsg, Reply<()>), + + /// Requests the application to build a value for consensus to run on. + /// + /// Because this operation may be asynchronous, this effect does not expect a resumption + /// with a value, rather the application is expected to propose a value within the timeout duration. + /// + /// The application MUST eventually feed a [`ProposeValue`][crate::input::Input::ProposeValue] + /// input to consensus within the specified timeout duration. + GetValue(Ctx::Height, Round, Timeout, Reply<()>), + + /// Requests the application to re-stream a proposal that it has already seen. + /// + /// The application MUST re-publish again to its pwers all + /// the proposal parts pertaining to that value. + RestreamValue( + /// Height of the value + Ctx::Height, + /// Round of the value + Round, + /// Valid round of the value + Round, + /// Address of the proposer for that value + Ctx::Address, + /// Value ID of the value to restream + ValueId, + /// For resumption + Reply<()>, + ), + + /// Notifies the application that consensus has decided on a value. + /// + /// This message includes a commit certificate containing the ID of + /// the value that was decided on, the height and round at which it was decided, + /// and the aggregated signatures of the validators that committed to it. + Decide(CommitCertificate, Reply<()>), +} + +#[must_use] +#[derive_where(Debug)] +pub enum SigningQuery +where + Ctx: Context, +{ + /// Sign a vote with this node's private key + SignVote(Ctx::Vote, Reply>), + + /// Sign a proposal with this node's private key + SignProposal(Ctx::Proposal, Reply>), + + /// Verify a signature + VerifySignature( + SignedMessage>, + PublicKey, + Reply, + ), + + /// Verify a commit certificate + VerifyCertificate( + CommitCertificate, + Ctx::ValidatorSet, + ThresholdParams, + Reply>>, + ), +} + +#[must_use] +#[derive_where(Debug)] +pub enum SyncQuery +where + Ctx: Context, +{ + /// Consensus has been stuck in Prevote or Precommit step, ask for vote sets from peers + GetVoteSet(Ctx::Height, Round, Reply<()>), + + /// A peer has required our vote set, send the response + SendVoteSetResponse(RequestId, Ctx::Height, Round, VoteSet, Reply<()>), +} + +#[must_use] +#[derive_where(Debug)] +pub enum WalQuery +where + Ctx: Context, +{ + /// Append a consensus message to the Write-Ahead Log for crash recovery + WalAppendMessage(SignedConsensusMsg, Reply<()>), + + /// Append a timeout to the Write-Ahead Log for crash recovery + WalAppendTimeout(Timeout, Reply<()>), +} diff --git a/code/crates/consensus-async/src/timeout.rs b/code/crates/consensus-async/src/timers.rs similarity index 90% rename from code/crates/consensus-async/src/timeout.rs rename to code/crates/consensus-async/src/timers.rs index 7f3f43ab8..3dda40a14 100644 --- a/code/crates/consensus-async/src/timeout.rs +++ b/code/crates/consensus-async/src/timers.rs @@ -30,7 +30,7 @@ pub struct TimeoutElapsed { } #[derive(Debug)] -pub struct TimerScheduler +pub struct Timers where Key: Clone + Eq + Hash + Send + 'static, { @@ -39,7 +39,7 @@ where generations: RangeFrom, } -impl TimerScheduler +impl Timers where Key: Clone + Eq + Hash + Send + 'static, { @@ -69,7 +69,7 @@ where /// When the actor receives a timeout message for timer from the scheduler, it should /// check if the timer is still active by calling [`TimerScheduler::intercept_timer_msg`] /// and ignore the message otherwise. - pub fn start_timer(&mut self, key: Key, timeout: Duration) + pub fn start(&mut self, key: Key, timeout: Duration) where Key: Clone + Send + 'static, { @@ -170,7 +170,7 @@ where } } -impl Default for TimerScheduler +impl Default for Timers where Key: Clone + Eq + Hash + Send + 'static, { @@ -179,7 +179,7 @@ where } } -impl Drop for TimerScheduler +impl Drop for Timers where Key: Clone + Eq + Hash + Send + 'static, { @@ -198,8 +198,8 @@ mod tests { #[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] struct TestKey(&'static str); - async fn scheduler() -> TimerScheduler { - TimerScheduler::new() + async fn scheduler() -> Timers { + Timers::new() } #[tokio::test] @@ -207,7 +207,7 @@ mod tests { let mut scheduler = scheduler().await; let key = TestKey("timer1"); - scheduler.start_timer(key, Duration::from_millis(100)); + scheduler.start(key, Duration::from_millis(100)); assert!(scheduler.is_timer_active(&key)); sleep(Duration::from_millis(150)).await; @@ -222,7 +222,7 @@ mod tests { let mut scheduler = scheduler().await; let key = TestKey("timer1"); - scheduler.start_timer(key, Duration::from_millis(100)); + scheduler.start(key, Duration::from_millis(100)); scheduler.cancel(&key); assert!(!scheduler.is_timer_active(&key)); @@ -232,8 +232,8 @@ mod tests { async fn test_cancel_all_timers() { let mut scheduler = scheduler().await; - scheduler.start_timer(TestKey("timer1"), Duration::from_millis(100)); - scheduler.start_timer(TestKey("timer2"), Duration::from_millis(200)); + scheduler.start(TestKey("timer1"), Duration::from_millis(100)); + scheduler.start(TestKey("timer2"), Duration::from_millis(200)); scheduler.cancel_all(); @@ -246,7 +246,7 @@ mod tests { let mut scheduler = scheduler().await; let key = TestKey("timer1"); - scheduler.start_timer(key, Duration::from_millis(100)); + scheduler.start(key, Duration::from_millis(100)); sleep(Duration::from_millis(150)).await; let timer_msg = TimeoutElapsed { key, generation: 1 }; @@ -261,8 +261,8 @@ mod tests { let mut scheduler = scheduler().await; let key = TestKey("timer1"); - scheduler.start_timer(key, Duration::from_millis(100)); - scheduler.start_timer(key, Duration::from_millis(200)); + scheduler.start(key, Duration::from_millis(100)); + scheduler.start(key, Duration::from_millis(200)); let timer_msg = TimeoutElapsed { key, generation: 1 }; @@ -276,7 +276,7 @@ mod tests { let mut scheduler = scheduler().await; let key = TestKey("timer1"); - scheduler.start_timer(key, Duration::from_millis(100)); + scheduler.start(key, Duration::from_millis(100)); scheduler.cancel(&key); let timer_msg = TimeoutElapsed { key, generation: 1 }; diff --git a/code/crates/core-consensus/src/macros.rs b/code/crates/core-consensus/src/macros.rs index afe9e4d99..387f4c19b 100644 --- a/code/crates/core-consensus/src/macros.rs +++ b/code/crates/core-consensus/src/macros.rs @@ -14,7 +14,7 @@ /// // Metrics /// metrics: &metrics, /// // Effect handler -/// on: effect => handle_effect(effect).await +/// with: effect => handle_effect(effect).await /// ) /// ``` #[macro_export] @@ -23,7 +23,7 @@ macro_rules! process { let mut gen = $crate::gen::Gen::new(|co| $crate::handle(co, $state, $metrics, $input)); let mut co_result = gen.resume_with($crate::Resume::Start); - loop { + 'process: loop { match co_result { $crate::gen::CoResult::Yielded($effect) => { let resume = match $handle { @@ -36,7 +36,7 @@ macro_rules! process { co_result = gen.resume_with(resume) } $crate::gen::CoResult::Complete(result) => { - return result.map_err(Into::into); + break 'process result.map_err(Into::into); } } } diff --git a/code/crates/core-consensus/src/prelude.rs b/code/crates/core-consensus/src/prelude.rs index 0e114e0f7..0a23adb62 100644 --- a/code/crates/core-consensus/src/prelude.rs +++ b/code/crates/core-consensus/src/prelude.rs @@ -2,7 +2,6 @@ pub use async_recursion::async_recursion; pub use tracing::{debug, info, warn}; pub use malachitebft_core_driver::Input as DriverInput; -pub use malachitebft_core_types::*; pub use crate::effect::{Effect, Resume}; pub use crate::error::Error; @@ -10,9 +9,4 @@ pub use crate::gen::Co; pub use crate::input::Input; pub use crate::perform; pub use crate::state::State; - -#[cfg(feature = "metrics")] -pub use malachitebft_metrics::Metrics; - -#[cfg(not(feature = "metrics"))] -pub type Metrics = (); +pub use crate::types::*; diff --git a/code/crates/core-consensus/src/types.rs b/code/crates/core-consensus/src/types.rs index 50840b2a4..ed8cb36c4 100644 --- a/code/crates/core-consensus/src/types.rs +++ b/code/crates/core-consensus/src/types.rs @@ -6,6 +6,12 @@ pub use multiaddr::Multiaddr; pub type RequestId = String; +#[cfg(feature = "metrics")] +pub use malachitebft_metrics::Metrics; + +#[cfg(not(feature = "metrics"))] +pub type Metrics = (); + /// A signed consensus message, ie. a signed vote or a signed proposal. #[derive_where(Clone, Debug, PartialEq, Eq)] pub enum SignedConsensusMsg { From ea9004ad19862d2f8a7f4b028626088edbc7c7a0 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 24 Jan 2025 16:22:00 +0100 Subject: [PATCH 5/6] wip --- code/Cargo.lock | 2 + code/crates/consensus-async/Cargo.toml | 7 +- code/crates/consensus-async/src/config.rs | 82 +++++++ code/crates/consensus-async/src/lib.rs | 226 ++++++++++++------ code/crates/consensus-async/src/query.rs | 134 ++++++----- code/crates/consensus-async/src/timers.rs | 16 +- code/crates/core-consensus/src/effect.rs | 29 ++- .../core-consensus/src/handle/proposal.rs | 2 +- .../core-consensus/src/handle/signature.rs | 6 +- code/crates/core-consensus/src/handle/vote.rs | 5 +- code/crates/core-types/src/proposal.rs | 6 + code/crates/engine/src/consensus.rs | 4 +- 12 files changed, 366 insertions(+), 153 deletions(-) create mode 100644 code/crates/consensus-async/src/config.rs diff --git a/code/Cargo.lock b/code/Cargo.lock index 882f8cf04..12ff4d43c 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2153,9 +2153,11 @@ version = "0.0.1" dependencies = [ "async-trait", "derive-where", + "humantime-serde", "informalsystems-malachitebft-core-consensus", "informalsystems-malachitebft-core-types", "informalsystems-malachitebft-sync", + "serde", "thiserror 2.0.11", "tokio", "tracing", diff --git a/code/crates/consensus-async/Cargo.toml b/code/crates/consensus-async/Cargo.toml index ff257355c..fb31a3fd9 100644 --- a/code/crates/consensus-async/Cargo.toml +++ b/code/crates/consensus-async/Cargo.toml @@ -10,10 +10,11 @@ rust-version.workspace = true readme = "../../../README.md" [package.metadata.docs.rs] -all-features = true +all-features = false +rustdoc-args = ["--cfg", "docsrs"] [features] -timers = [] +serde = ["dep:serde", "dep:humantime-serde"] [dependencies] malachitebft-core-types = { workspace = true } @@ -23,6 +24,8 @@ malachitebft-sync = { workspace = true } async-trait = { workspace = true } derive-where = { workspace = true } tokio = { workspace = true } +serde = { workspace = true, optional = true } +humantime-serde = { workspace = true, optional = true } thiserror = { workspace = true } tracing = { workspace = true } diff --git a/code/crates/consensus-async/src/config.rs b/code/crates/consensus-async/src/config.rs new file mode 100644 index 000000000..3fdb7e624 --- /dev/null +++ b/code/crates/consensus-async/src/config.rs @@ -0,0 +1,82 @@ +use std::time::Duration; + +use malachitebft_core_types::TimeoutKind; + +/// Timeouts +#[derive(Copy, Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Timeouts { + /// How long we wait for a proposal block before prevoting nil + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub propose: Duration, + + /// How much timeout_propose increases with each round + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub propose_delta: Duration, + + /// How long we wait after receiving +2/3 prevotes for “anything” (ie. not a single block or nil) + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub prevote: Duration, + + /// How much the timeout_prevote increases with each round + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub prevote_delta: Duration, + + /// How long we wait after receiving +2/3 precommits for “anything” (ie. not a single block or nil) + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub precommit: Duration, + + /// How much the timeout_precommit increases with each round + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub precommit_delta: Duration, + + /// How long we wait after committing a block, before starting on the new + /// height (this gives us a chance to receive some more precommits, even + /// though we already have +2/3). + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub commit: Duration, + + /// How long we stay in preovte or precommit steps before starting + /// the vote synchronization protocol. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub step: Duration, +} + +impl Timeouts { + pub fn timeout_duration(&self, kind: TimeoutKind) -> Duration { + match kind { + TimeoutKind::Propose => self.propose, + TimeoutKind::Prevote => self.prevote, + TimeoutKind::Precommit => self.precommit, + TimeoutKind::Commit => self.commit, + TimeoutKind::PrevoteTimeLimit => self.step, + TimeoutKind::PrecommitTimeLimit => self.step, + } + } + + pub fn delta_duration(&self, step: TimeoutKind) -> Option { + match step { + TimeoutKind::Propose => Some(self.propose_delta), + TimeoutKind::Prevote => Some(self.prevote_delta), + TimeoutKind::Precommit => Some(self.precommit_delta), + TimeoutKind::Commit => None, + TimeoutKind::PrevoteTimeLimit => None, + TimeoutKind::PrecommitTimeLimit => None, + } + } +} + +impl Default for Timeouts { + fn default() -> Self { + Self { + propose: Duration::from_secs(3), + propose_delta: Duration::from_millis(500), + prevote: Duration::from_secs(1), + prevote_delta: Duration::from_millis(500), + precommit: Duration::from_secs(1), + precommit_delta: Duration::from_millis(500), + commit: Duration::from_secs(0), + step: Duration::from_secs(30), + } + } +} diff --git a/code/crates/consensus-async/src/lib.rs b/code/crates/consensus-async/src/lib.rs index d13fd1752..803122400 100644 --- a/code/crates/consensus-async/src/lib.rs +++ b/code/crates/consensus-async/src/lib.rs @@ -1,21 +1,21 @@ -use std::ops::ControlFlow; -use std::time::Duration; +#![cfg_attr(docsrs, feature(doc_cfg))] +use config::Timeouts; use derive_where::derive_where; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use malachitebft_core_consensus::{Effect, Params, Resumable, Resume}; -pub use malachitebft_core_consensus::types::*; +pub use malachitebft_core_consensus::types; pub use malachitebft_core_consensus::{process, Error as ConsensusError, Input, State}; -mod query; -pub use query::{Query, Reply}; +pub mod config; +pub mod query; -#[cfg(feature = "timers")] -mod timers; +use query::*; +use types::*; -#[cfg(feature = "timers")] +mod timers; use timers::Timers; #[derive(thiserror::Error)] @@ -62,9 +62,10 @@ pub struct Consensus where Ctx: Context, { + ctx: Ctx, state: State, - #[cfg(feature = "timers")] timers: Timers, + timeouts: Timeouts, metrics: Metrics, rx_input: mpsc::Receiver<(Input, mpsc::Sender>)>, } @@ -73,14 +74,20 @@ impl Consensus where Ctx: Context, { - pub fn new(ctx: Ctx, params: Params, capacity: usize) -> (Self, Handle) { + pub fn new( + ctx: Ctx, + params: Params, + timeouts: Timeouts, + capacity: usize, + ) -> (Self, Handle) { let (tx_input, rx_input) = mpsc::channel(capacity); ( Self { + ctx: ctx.clone(), state: State::new(ctx, params), - #[cfg(feature = "timers")] timers: Timers::new(), + timeouts, metrics: Metrics::default(), rx_input, }, @@ -89,24 +96,25 @@ where } pub async fn run(mut self) { + let mut timer_elapsed = self.timers.subscribe(); + loop { - match self.run_inner().await { - Ok(ControlFlow::Continue(())) => continue, - Ok(ControlFlow::Break(())) => break, - Err(e) => { - tracing::error!("Error: {e}"); + tokio::select! { + _ = timer_elapsed.recv() => { + // TODO } - } - } - } + input = self.rx_input.recv() => { + match input { + Some((input, tx_query)) => { + if let Err(e) = self.process(input, tx_query).await { + tracing::error!("Error: {e}"); + } + } + None => break, + } - pub async fn run_inner(&mut self) -> Result, Error> { - match self.rx_input.recv().await { - Some((input, tx_query)) => { - self.process(input, tx_query).await?; - Ok(ControlFlow::Continue(())) + } } - None => Ok(ControlFlow::Break(())), } } @@ -120,18 +128,34 @@ where state: &mut self.state, metrics: &self.metrics, with: effect => handle_effect( + &self.ctx, effect, - #[cfg(feature = "timers")] &mut self.timers, + &self.timeouts, &tx_query ).await ) } } +macro_rules! query { + ($tx_query:expr, $resume:expr, $map:expr, $query:expr) => {{ + let (reply, rx) = oneshot::channel(); + $tx_query.send($query(reply).into()).await.unwrap(); + let result = rx.await.unwrap(); + Ok($resume.resume_with($map(result))) + }}; + + ($tx_query:expr, $resume:expr, $query:expr) => {{ + query!($tx_query, $resume, |x| x, $query) + }}; +} + async fn handle_effect( + ctx: &Ctx, effect: Effect, - #[cfg(feature = "timers")] timers: &mut Timers, + timers: &mut Timers, + timeouts: &Timeouts, tx_query: &mpsc::Sender>, ) -> Result, Error> where @@ -139,78 +163,138 @@ where { match effect { // Timers - #[cfg(feature = "timers")] - Effect::ResetTimeouts(resume) => { - // TODO - Ok(resume.resume_with(())) - } - - #[cfg(not(feature = "timers"))] Effect::ResetTimeouts(resume) => { // TODO Ok(resume.resume_with(())) } - #[cfg(feature = "timers")] Effect::CancelAllTimeouts(resume) => { timers.cancel_all(); Ok(resume.resume_with(())) } - #[cfg(not(feature = "timers"))] - Effect::CancelAllTimeouts(resume) => { - // TODO - Ok(resume.resume_with(())) - } - - #[cfg(feature = "timers")] Effect::CancelTimeout(timeout, resume) => { timers.cancel(&timeout); Ok(resume.resume_with(())) } - #[cfg(not(feature = "timers"))] - Effect::CancelTimeout(timeout, resume) => { - // TODO - Ok(resume.resume_with(())) - } - - #[cfg(feature = "timers")] Effect::ScheduleTimeout(timeout, resume) => { - let duration = Duration::from_secs(1); + let duration = timeouts.timeout_duration(timeout.kind); timers.start(timeout, duration); Ok(resume.resume_with(())) } - #[cfg(not(feature = "timers"))] - Effect::ScheduleTimeout(timeout, resume) => { - // TODO - Ok(resume.resume_with(())) - } - // Consensus - Effect::StartRound(height, round, proposer, resume) => todo!(), - Effect::Publish(signed_consensus_msg, resume) => todo!(), - Effect::GetValue(height, round, timeout, resume) => todo!(), - Effect::RestreamValue(height, round, pol_round, proposer, id, resume) => todo!(), - Effect::GetValidatorSet(height, resume) => todo!(), - Effect::Decide(certificate, resume) => todo!(), + Effect::StartRound(height, round, proposer, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::StartRound( + height, round, proposer, reply + )) + } + Effect::Publish(signed_consensus_msg, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::Publish( + signed_consensus_msg, + reply + )) + } + Effect::GetValue(height, round, timeout, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::GetValue( + height, round, timeout, reply + )) + } + Effect::RestreamValue(height, round, pol_round, proposer, id, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::RestreamValue( + height, round, pol_round, proposer, id, reply + )) + } + Effect::GetValidatorSet(height, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::GetValidatorSet( + height, reply + )) + } + Effect::Decide(certificate, resume) => { + query!(tx_query, resume, |reply| ConsensusQuery::Decide( + certificate, + reply + )) + } // Vote Sync - Effect::GetVoteSet(height, round, resume) => todo!(), - Effect::SendVoteSetResponse(request_id, height, round, vote_set, resume) => todo!(), + Effect::GetVoteSet(height, round, resume) => { + query!(tx_query, resume, |reply| SyncQuery::GetVoteSet( + height, round, reply + )) + } + Effect::SendVoteSetResponse(request_id, height, round, vote_set, resume) => { + query!(tx_query, resume, |reply| { + SyncQuery::SendVoteSetResponse(request_id, height, round, vote_set, reply) + }) + } // WAL - Effect::WalAppendMessage(msg, resume) => todo!(), - Effect::WalAppendTimeout(timeout, resume) => todo!(), + Effect::WalAppendMessage(msg, resume) => { + query!(tx_query, resume, |reply| WalQuery::AppendMessage( + msg, reply + )) + } + Effect::WalAppendTimeout(timeout, resume) => { + query!(tx_query, resume, |reply| WalQuery::AppendTimeout( + timeout, reply + )) + } // Signing - Effect::SignVote(vote, resume) => todo!(), - Effect::SignProposal(proposal, resume) => todo!(), - Effect::VerifySignature(msg, public_key, resume) => todo!(), + Effect::SignVote(vote, resume) => { + // query!(tx_query, resume, |reply| SigningQuery::SignVote( + // vote, reply + // )) + Ok(resume.resume_with(ctx.signing_provider().sign_vote(vote))) + } + Effect::SignProposal(proposal, resume) => { + // query!(tx_query, resume, |reply| SigningQuery::SignProposal( + // proposal, reply + // )) + Ok(resume.resume_with(ctx.signing_provider().sign_proposal(proposal))) + } + Effect::VerifySignature(msg, public_key, resume) => { + // query!(tx_query, resume, |reply| { + // SigningQuery::VerifySignature(msg, public_key, reply) + // }) + match msg.message { + ConsensusMsg::Vote(vote) => { + let valid = ctx.signing_provider().verify_signed_vote( + &vote, + &msg.signature, + &public_key, + ); + + Ok(resume.resume_with(valid.into())) + } + ConsensusMsg::Proposal(proposal) => { + let valid = ctx.signing_provider().verify_signed_proposal( + &proposal, + &msg.signature, + &public_key, + ); + + Ok(resume.resume_with(valid.into())) + } + } + } Effect::VerifyCertificate(certificate, validator_set, threshold_params, resume) => { - todo!() + // query!(tx_query, resume, |reply| SigningQuery::VerifyCertificate( + // certificate, + // validator_set, + // threshold_params, + // reply + // )) + Ok( + resume.resume_with(ctx.signing_provider().verify_certificate( + &certificate, + &validator_set, + threshold_params, + )), + ) } } } diff --git a/code/crates/consensus-async/src/query.rs b/code/crates/consensus-async/src/query.rs index 25050cbfc..36f24438c 100644 --- a/code/crates/consensus-async/src/query.rs +++ b/code/crates/consensus-async/src/query.rs @@ -1,45 +1,53 @@ use derive_where::derive_where; use tokio::sync::oneshot; -pub use malachitebft_core_consensus::types::*; +use malachitebft_core_consensus::types::*; pub type Reply = oneshot::Sender; -#[cfg(not(feature = "timers"))] -#[must_use] -#[derive(Debug)] -pub enum TimersQuery { - /// Reset all timeouts to their initial values - ResetTimeouts(Reply<()>), - - /// Cancel all outstanding timeouts - CancelAllTimeouts(Reply<()>), - - /// Cancel a given timeout - CancelTimeout(Timeout, Reply<()>), - - /// Schedule a timeout - ScheduleTimeout(Timeout, Reply<()>), -} - #[must_use] #[derive_where(Debug)] pub enum Query where Ctx: Context, { - #[cfg(not(feature = "timers"))] - Timers(TimersQuery), - Consensus(ConsensusQuery), + Sync(SyncQuery), + Wal(WalQuery), + // Timers(TimersQuery), + // Signing(SigningQuery), +} - Signing(SigningQuery), +impl From> for Query { + fn from(query: ConsensusQuery) -> Self { + Self::Consensus(query) + } +} - Sync(SyncQuery), +impl From> for Query { + fn from(query: SyncQuery) -> Self { + Self::Sync(query) + } +} - Wal(WalQuery), +impl From> for Query { + fn from(query: WalQuery) -> Self { + Self::Wal(query) + } } +// impl From for Query { +// fn from(query: TimersQuery) -> Self { +// Self::Timers(query) +// } +// } +// +// impl From> for Query { +// fn from(query: SigningQuery) -> Self { +// Self::Signing(query) +// } +// } + #[must_use] #[derive_where(Debug)] pub enum ConsensusQuery @@ -60,7 +68,7 @@ where /// Because this operation may be asynchronous, this effect does not expect a resumption /// with a value, rather the application is expected to propose a value within the timeout duration. /// - /// The application MUST eventually feed a [`ProposeValue`][crate::input::Input::ProposeValue] + /// The application MUST eventually feed a [`Propose`][crate::Input::Propose] /// input to consensus within the specified timeout duration. GetValue(Ctx::Height, Round, Timeout, Reply<()>), @@ -91,34 +99,6 @@ where Decide(CommitCertificate, Reply<()>), } -#[must_use] -#[derive_where(Debug)] -pub enum SigningQuery -where - Ctx: Context, -{ - /// Sign a vote with this node's private key - SignVote(Ctx::Vote, Reply>), - - /// Sign a proposal with this node's private key - SignProposal(Ctx::Proposal, Reply>), - - /// Verify a signature - VerifySignature( - SignedMessage>, - PublicKey, - Reply, - ), - - /// Verify a commit certificate - VerifyCertificate( - CommitCertificate, - Ctx::ValidatorSet, - ThresholdParams, - Reply>>, - ), -} - #[must_use] #[derive_where(Debug)] pub enum SyncQuery @@ -139,8 +119,52 @@ where Ctx: Context, { /// Append a consensus message to the Write-Ahead Log for crash recovery - WalAppendMessage(SignedConsensusMsg, Reply<()>), + AppendMessage(SignedConsensusMsg, Reply<()>), /// Append a timeout to the Write-Ahead Log for crash recovery - WalAppendTimeout(Timeout, Reply<()>), + AppendTimeout(Timeout, Reply<()>), } + +// #[must_use] +// #[derive(Debug)] +// pub enum TimersQuery { +// /// Reset all timeouts to their initial values +// ResetTimeouts(Reply<()>), +// +// /// Cancel all outstanding timeouts +// CancelAllTimeouts(Reply<()>), +// +// /// Cancel a given timeout +// CancelTimeout(Timeout, Reply<()>), +// +// /// Schedule a timeout +// ScheduleTimeout(Timeout, Reply<()>), +// } + +// #[must_use] +// #[derive_where(Debug)] +// pub enum SigningQuery +// where +// Ctx: Context, +// { +// /// Sign a vote with this node's private key +// SignVote(Ctx::Vote, Reply>), +// +// /// Sign a proposal with this node's private key +// SignProposal(Ctx::Proposal, Reply>), +// +// /// Verify a signature +// VerifySignature( +// SignedMessage>, +// PublicKey, +// Reply, +// ), +// +// /// Verify a commit certificate +// VerifyCertificate( +// CommitCertificate, +// Ctx::ValidatorSet, +// ThresholdParams, +// Reply>>, +// ), +// } diff --git a/code/crates/consensus-async/src/timers.rs b/code/crates/consensus-async/src/timers.rs index 3dda40a14..b00a3ddd3 100644 --- a/code/crates/consensus-async/src/timers.rs +++ b/code/crates/consensus-async/src/timers.rs @@ -100,11 +100,6 @@ where ); } - /// Check if a timer with a given `key` is active, ie. it hasn't been canceled nor has it elapsed yet. - pub fn is_timer_active(&self, key: &Key) -> bool { - self.timers.contains_key(key) - } - /// Cancel a timer with a given `key`. /// /// If canceling a timer that was already canceled, or key never was used to start a timer @@ -188,6 +183,17 @@ where } } +#[cfg(test)] +impl Timers +where + Key: Clone + Eq + Hash + Send + 'static, +{ + /// Check if a timer with a given `key` is active, ie. it hasn't been canceled nor has it elapsed yet. + fn is_timer_active(&self, key: &Key) -> bool { + self.timers.contains_key(key) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/code/crates/core-consensus/src/effect.rs b/code/crates/core-consensus/src/effect.rs index 2e6edbde2..29a41044c 100644 --- a/code/crates/core-consensus/src/effect.rs +++ b/code/crates/core-consensus/src/effect.rs @@ -1,3 +1,5 @@ +#![allow(rustdoc::private_intra_doc_links)] + use derive_where::derive_where; use crate::types::*; @@ -12,16 +14,17 @@ use crate::types::*; /// /// ```rust,ignore /// fn effect_handler(effect: Effect) -> Result, Error> { -/// match effect { -/// Effect::ResetTimeouts(r) => { -/// reset_timeouts(); -/// Ok(r.resume_with(())) -/// } -/// Effect::GetValidatorSet(height, r) => {) -/// let validator_set = get_validator_set(height); -/// Ok(r.resume_with(validator_set)) -/// } -/// // ... +/// match effect { +/// Effect::ResetTimeouts(r) => { +/// reset_timeouts(); +/// Ok(r.resume_with(())) +/// } +/// Effect::GetValidatorSet(height, r) => { +/// let validator_set = get_validator_set(height); +/// Ok(r.resume_with(validator_set)) +/// } +/// // ... +/// } /// } /// ``` pub trait Resumable { @@ -80,7 +83,7 @@ where /// Because this operation may be asynchronous, this effect does not expect a resumption /// with a value, rather the application is expected to propose a value within the timeout duration. /// - /// The application MUST eventually feed a [`ProposeValue`][crate::input::Input::ProposeValue] + /// The application MUST eventually feed a [`Propose`][crate::input::Input::Propose] /// input to consensus within the specified timeout duration. /// /// Resume with: [`resume::Continue`] @@ -197,7 +200,7 @@ where ValidatorSet(Option), /// Resume execution with the validity of the signature - SignatureValidity(bool), + SignatureValidity(Validity), /// Resume execution with the signed vote SignedVote(SignedVote), @@ -238,7 +241,7 @@ pub mod resume { pub struct SignatureValidity; impl Resumable for SignatureValidity { - type Value = bool; + type Value = Validity; fn resume_with(self, value: Self::Value) -> Resume { Resume::SignatureValidity(value) diff --git a/code/crates/core-consensus/src/handle/proposal.rs b/code/crates/core-consensus/src/handle/proposal.rs index 4ae9f4a55..c861cb71d 100644 --- a/code/crates/core-consensus/src/handle/proposal.rs +++ b/code/crates/core-consensus/src/handle/proposal.rs @@ -167,7 +167,7 @@ where }; let signed_msg = signed_proposal.clone().map(ConsensusMsg::Proposal); - if !verify_signature(co, signed_msg, proposer).await? { + if !verify_signature(co, signed_msg, proposer).await?.is_valid() { warn!( consensus.height = %consensus_height, proposal.height = %proposal_height, diff --git a/code/crates/core-consensus/src/handle/signature.rs b/code/crates/core-consensus/src/handle/signature.rs index a1fdf35f4..1006cf778 100644 --- a/code/crates/core-consensus/src/handle/signature.rs +++ b/code/crates/core-consensus/src/handle/signature.rs @@ -6,16 +6,16 @@ pub async fn verify_signature( co: &Co, signed_msg: SignedMessage>, validator: &Ctx::Validator, -) -> Result> +) -> Result> where Ctx: Context, { - let valid = perform!(co, + let validity = perform!(co, Effect::VerifySignature(signed_msg, validator.public_key().clone(), Default::default()), Resume::SignatureValidity(valid) => valid ); - Ok(valid) + Ok(validity) } pub async fn sign_vote(co: &Co, vote: Ctx::Vote) -> Result, Error> diff --git a/code/crates/core-consensus/src/handle/vote.rs b/code/crates/core-consensus/src/handle/vote.rs index 7a0fee744..a89631f66 100644 --- a/code/crates/core-consensus/src/handle/vote.rs +++ b/code/crates/core-consensus/src/handle/vote.rs @@ -131,7 +131,10 @@ where }; let signed_msg = signed_vote.clone().map(ConsensusMsg::Vote); - if !verify_signature(co, signed_msg, validator).await? { + if !verify_signature(co, signed_msg, validator) + .await? + .is_valid() + { warn!( consensus.height = %consensus_height, vote.height = %vote_height, diff --git a/code/crates/core-types/src/proposal.rs b/code/crates/core-types/src/proposal.rs index 0c9c001c3..660d082fb 100644 --- a/code/crates/core-types/src/proposal.rs +++ b/code/crates/core-types/src/proposal.rs @@ -57,3 +57,9 @@ impl Validity { } } } + +impl From for Validity { + fn from(valid: bool) -> Self { + Validity::from_bool(valid) + } +} diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index e2fdf9b5b..7cf5de197 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -13,7 +13,7 @@ use malachitebft_core_consensus::types::{PeerId, SignedConsensusMsg}; use malachitebft_core_consensus::{Effect, Resumable, Resume}; use malachitebft_core_types::{ Context, Round, SignedExtension, SigningProvider, SigningProviderExt, Timeout, TimeoutKind, - ValidatorSet, ValueOrigin, + ValidatorSet, Validity, ValueOrigin, }; use malachitebft_metrics::Metrics; use malachitebft_sync::{ @@ -855,7 +855,7 @@ where .signature_verification_time .observe(start.elapsed().as_secs_f64()); - Ok(r.resume_with(valid)) + Ok(r.resume_with(Validity::from_bool(valid))) } Effect::VerifyCertificate(certificate, validator_set, thresholds, r) => { From ec5e163b3127a619a4f393f61d0874bab6418a39 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 24 Jan 2025 16:33:50 +0100 Subject: [PATCH 6/6] wip --- code/crates/consensus-async/src/lib.rs | 31 ++++++++++++++++++-------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/code/crates/consensus-async/src/lib.rs b/code/crates/consensus-async/src/lib.rs index 803122400..7335abecd 100644 --- a/code/crates/consensus-async/src/lib.rs +++ b/code/crates/consensus-async/src/lib.rs @@ -100,9 +100,18 @@ where loop { tokio::select! { - _ = timer_elapsed.recv() => { - // TODO + Ok(elapsed) = timer_elapsed.recv() => { + let Some(timeout) = self.timers.intercept_timer_msg(elapsed) else { + continue; + }; + + let (tx_query, _) = mpsc::channel(1); + let input = Input::TimeoutElapsed(timeout); + if let Err(e) = self.process(input, tx_query).await { + tracing::error!("Error: {e}"); + } } + input = self.rx_input.recv() => { match input { Some((input, tx_query)) => { @@ -248,18 +257,21 @@ where // query!(tx_query, resume, |reply| SigningQuery::SignVote( // vote, reply // )) + Ok(resume.resume_with(ctx.signing_provider().sign_vote(vote))) } Effect::SignProposal(proposal, resume) => { // query!(tx_query, resume, |reply| SigningQuery::SignProposal( // proposal, reply // )) + Ok(resume.resume_with(ctx.signing_provider().sign_proposal(proposal))) } Effect::VerifySignature(msg, public_key, resume) => { // query!(tx_query, resume, |reply| { // SigningQuery::VerifySignature(msg, public_key, reply) // }) + match msg.message { ConsensusMsg::Vote(vote) => { let valid = ctx.signing_provider().verify_signed_vote( @@ -288,13 +300,14 @@ where // threshold_params, // reply // )) - Ok( - resume.resume_with(ctx.signing_provider().verify_certificate( - &certificate, - &validator_set, - threshold_params, - )), - ) + + let result = ctx.signing_provider().verify_certificate( + &certificate, + &validator_set, + threshold_params, + ); + + Ok(resume.resume_with(result)) } } }