diff --git a/code/Cargo.lock b/code/Cargo.lock index d39f77665..4c379bcf5 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2329,6 +2329,7 @@ dependencies = [ "informalsystems-malachitebft-core-state-machine", "informalsystems-malachitebft-core-types", "informalsystems-malachitebft-core-votekeeper", + "informalsystems-malachitebft-host", "informalsystems-malachitebft-metrics", "informalsystems-malachitebft-network", "informalsystems-malachitebft-signing", @@ -2368,6 +2369,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "informalsystems-malachitebft-host" +version = "0.6.0-pre" +dependencies = [ + "borsh", + "bytes", + "derive-where", + "informalsystems-malachitebft-core-consensus", + "informalsystems-malachitebft-core-types", + "informalsystems-malachitebft-sync", + "ractor", +] + [[package]] name = "informalsystems-malachitebft-metrics" version = "0.6.0-pre" diff --git a/code/Cargo.toml b/code/Cargo.toml index 4f2c8bd36..eddb9fb69 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -12,6 +12,7 @@ members = [ "crates/core-types", "crates/core-votekeeper", "crates/engine", + "crates/host", "crates/metrics", "crates/network", "crates/peer", @@ -82,6 +83,7 @@ malachitebft-core-state-machine = { version = "0.6.0-pre", package = "informalsy malachitebft-core-types = { version = "0.6.0-pre", package = "informalsystems-malachitebft-core-types", path = "crates/core-types" } malachitebft-core-votekeeper = { version = "0.6.0-pre", package = "informalsystems-malachitebft-core-votekeeper", path = "crates/core-votekeeper" } malachitebft-discovery = { version = "0.6.0-pre", package = "informalsystems-malachitebft-discovery", path = "crates/discovery" } +malachitebft-host = { version = "0.6.0-pre", package = "informalsystems-malachitebft-host", path = "crates/host" } malachitebft-network = { version = "0.6.0-pre", package = "informalsystems-malachitebft-network", path = "crates/network" } malachitebft-metrics = { version = "0.6.0-pre", package = "informalsystems-malachitebft-metrics", path = "crates/metrics" } malachitebft-peer = { version = "0.6.0-pre", package = "informalsystems-malachitebft-peer", path = "crates/peer", default-features = false } diff --git a/code/crates/engine/Cargo.toml b/code/crates/engine/Cargo.toml index b1c43c2f8..04f9e1356 100644 --- a/code/crates/engine/Cargo.toml +++ b/code/crates/engine/Cargo.toml @@ -26,6 +26,7 @@ malachitebft-core-driver.workspace = true malachitebft-core-state-machine.workspace = true malachitebft-core-votekeeper.workspace = true malachitebft-core-types.workspace = true +malachitebft-host.workspace = true malachitebft-network.workspace = true malachitebft-metrics.workspace = true malachitebft-signing.workspace = true diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index d9c6e66f9..f721149c5 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -1,193 +1 @@ -use bytes::Bytes; -use std::time::Duration; - -use derive_where::derive_where; -use ractor::{ActorRef, RpcReplyPort}; - -use malachitebft_core_consensus::{Role, VoteExtensionError}; -use malachitebft_core_types::{CommitCertificate, Context, Round, ValueId, VoteExtensions}; -use malachitebft_sync::{PeerId, RawDecidedValue}; - -use crate::util::streaming::StreamMessage; - -pub use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue}; - -/// A reference to the host actor. -pub type HostRef = ActorRef>; - -/// What to do next after a decision. -#[derive_where(Debug)] -pub enum Next { - /// Start at the given height with the given validator set. - Start(Ctx::Height, Ctx::ValidatorSet), - - /// Restart at the given height with the given validator set. - Restart(Ctx::Height, Ctx::ValidatorSet), -} - -/// Messages that need to be handled by the host actor. -#[derive_where(Debug)] -pub enum HostMsg { - /// Notifies the application that consensus is ready. - /// - /// The application MUST reply with a message to instruct - /// consensus to start at a given height. - ConsensusReady { - /// Use this reply port to instruct consensus to start the first height. - reply_to: RpcReplyPort<(Ctx::Height, Ctx::ValidatorSet)>, - }, - - /// Consensus has started a new round. - StartedRound { - /// The height at which the round started. - height: Ctx::Height, - /// The round number that started. - round: Round, - /// The address of the proposer for this round. - proposer: Ctx::Address, - /// The role of the node in this round. - role: Role, - /// Use this reply port to send the undecided values that were already seen for this - /// round. This is needed when recovering from a crash. - /// - /// The application MUST reply immediately with the values it has, or with an empty vector. - reply_to: RpcReplyPort>>, - }, - - /// Request to build a local value to propose - /// - /// The application MUST reply to this message with the requested value - /// within the specified timeout duration. - GetValue { - /// The height at which the value should be proposed. - height: Ctx::Height, - /// The round in which the value should be proposed. - round: Round, - /// The amount of time the application has to build the value. - timeout: Duration, - /// Use this reply port to send the value that was built. - reply_to: RpcReplyPort>, - }, - - /// ExtendVote allows the application to extend the pre-commit vote with arbitrary data. - /// - /// When consensus is preparing to send a pre-commit vote, it first calls `ExtendVote`. - /// The application then returns a blob of data called a vote extension. - /// This data is opaque to the consensus algorithm but can contain application-specific information. - /// The proposer of the next block will receive all vote extensions along with the commit certificate. - ExtendVote { - /// The height at which the vote is being extended. - height: Ctx::Height, - /// The round in which the vote is being extended. - round: Round, - /// The ID of the value that is being voted on. - value_id: ValueId, - /// The vote extension to be added to the vote, if any. - reply_to: RpcReplyPort>, - }, - - /// Verify a vote extension - /// - /// If the vote extension is deemed invalid, the vote it was part of - /// will be discarded altogether. - VerifyVoteExtension { - /// The height for which the vote is. - height: Ctx::Height, - /// The round for which the vote is. - round: Round, - /// The ID of the value that the vote extension is for. - value_id: ValueId, - /// The vote extension to verify. - extension: Ctx::Extension, - /// Use this reply port to send the result of the verification. - reply_to: RpcReplyPort>, - }, - - /// Requests the application to re-stream a proposal that it has already seen. - /// - /// The application MUST re-publish again all the proposal parts pertaining - /// to that value by sending [`NetworkMsg::PublishProposalPart`] messages through - /// the [`Channels::network`] channel. - RestreamValue { - /// The height at which the value was proposed. - height: Ctx::Height, - /// The round in which the value was proposed. - round: Round, - /// The round in which the value was valid. - valid_round: Round, - /// The address of the proposer of the value. - address: Ctx::Address, - /// The ID of the value to restream. - value_id: ValueId, - }, - - /// Requests the earliest height available in the history maintained by the application. - /// - /// The application MUST respond with its earliest available height. - GetHistoryMinHeight { reply_to: RpcReplyPort }, - - /// Notifies the application that consensus has received a proposal part over the network. - /// - /// If this part completes the full proposal, the application MUST respond - /// with the complete proposed value. Otherwise, it MUST respond with `None`. - ReceivedProposalPart { - from: PeerId, - part: StreamMessage, - reply_to: RpcReplyPort>, - }, - - /// Notifies the application that consensus has decided on a value. - /// - /// This message includes a commit certificate containing the ID of - /// the value that was decided on, the height and round at which it was decided, - /// and the aggregated signatures of the validators that committed to it. - /// It also includes to the vote extensions received for that height. - /// - /// In response to this message, the application MUST send a [`Next`] - /// message back to consensus, instructing it to either start the next height if - /// the application was able to commit the decided value, or to restart the current height - /// otherwise. - /// - /// If the application does not reply, consensus will stall. - Decided { - /// The commit certificate containing the ID of the value that was decided on, - /// the the height and round at which it was decided, and the aggregated signatures - /// of the validators that committed to it. - certificate: CommitCertificate, - - /// Vote extensions that were received for this height. - extensions: VoteExtensions, - - /// Use this reply port to instruct consensus to start the next height. - reply_to: RpcReplyPort>, - }, - - /// Requests a previously decided value from the application's storage. - /// - /// The application MUST respond with that value if available, or `None` otherwise. - GetDecidedValue { - /// Height of the decided value to retrieve - height: Ctx::Height, - /// Channel for sending back the decided value - reply_to: RpcReplyPort>>, - }, - - /// Notifies the application that a value has been synced from the network. - /// This may happen when the node is catching up with the network. - /// - /// If a value can be decoded from the bytes provided, then the application MUST reply - /// to this message with the decoded value. Otherwise, it MUST reply with `None`. - ProcessSyncedValue { - /// Height of the synced value - height: Ctx::Height, - /// Round of the synced value - round: Round, - /// Address of the original proposer - proposer: Ctx::Address, - /// Raw encoded value data - value_bytes: Bytes, - /// Channel for sending back the proposed value, if successfully decoded - /// or `None` if the value could not be decoded - reply_to: RpcReplyPort>, - }, -} +pub use malachitebft_host::host::*; diff --git a/code/crates/engine/src/lib.rs b/code/crates/engine/src/lib.rs index 96be00af8..5d0384ddb 100644 --- a/code/crates/engine/src/lib.rs +++ b/code/crates/engine/src/lib.rs @@ -2,7 +2,6 @@ pub mod consensus; pub mod host; pub mod network; pub mod node; -mod ser; pub mod sync; pub mod util; pub mod wal; diff --git a/code/crates/engine/src/util/streaming.rs b/code/crates/engine/src/util/streaming.rs index 1eca8f1d3..780bb9980 100644 --- a/code/crates/engine/src/util/streaming.rs +++ b/code/crates/engine/src/util/streaming.rs @@ -1,96 +1 @@ -use core::fmt; - -use bytes::Bytes; - -pub type Sequence = u64; - -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct StreamId(pub(crate) Bytes); - -impl StreamId { - pub fn new(bytes: Bytes) -> Self { - Self(bytes) - } - - pub fn to_bytes(&self) -> Bytes { - self.0.clone() - } -} - -impl fmt::Display for StreamId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for byte in &self.0 { - write!(f, "{byte:02x}")?; - } - Ok(()) - } -} - -#[derive(Clone, Debug, PartialEq, Eq)] -#[cfg_attr( - feature = "borsh", - derive(::borsh::BorshSerialize, ::borsh::BorshDeserialize) -)] -pub struct StreamMessage { - /// Receivers identify streams by (sender, stream_id). - /// This means each node can allocate stream_ids independently - /// and that many streams can be sent on a single network topic. - pub stream_id: StreamId, - - /// Identifies the sequence of each message in the stream starting from 0. - pub sequence: Sequence, - - /// The content of this stream message - pub content: StreamContent, -} - -impl StreamMessage { - pub fn new(stream_id: StreamId, sequence: Sequence, content: StreamContent) -> Self { - Self { - stream_id, - sequence, - content, - } - } - - pub fn is_first(&self) -> bool { - self.sequence == 0 - } - - pub fn is_fin(&self) -> bool { - self.content.is_fin() - } -} - -#[derive(Clone, Debug, PartialEq, Eq)] -#[cfg_attr( - feature = "borsh", - derive(borsh::BorshSerialize, borsh::BorshDeserialize) -)] -pub enum StreamContent { - /// Serialized content. - Data(T), - - /// Indicates the end of the stream. - Fin, -} - -impl StreamContent { - pub fn as_data(&self) -> Option<&T> { - match self { - Self::Data(data) => Some(data), - _ => None, - } - } - - pub fn into_data(self) -> Option { - match self { - Self::Data(data) => Some(data), - _ => None, - } - } - - pub fn is_fin(&self) -> bool { - matches!(self, Self::Fin) - } -} +pub use malachitebft_host::streaming::*; diff --git a/code/crates/host/Cargo.toml b/code/crates/host/Cargo.toml new file mode 100644 index 000000000..8981033ea --- /dev/null +++ b/code/crates/host/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "informalsystems-malachitebft-host" +description = "Implementation of the Malachite BFT consensus engine" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true +publish.workspace = true +readme = "../../../README.md" + +[package.metadata.docs.rs] +all-features = true + +[features] +borsh = ["dep:borsh"] + +[lints] +workspace = true + +[dependencies] +malachitebft-core-consensus.workspace = true +malachitebft-core-types.workspace = true +malachitebft-sync.workspace = true + +borsh = { workspace = true, optional = true } +bytes = { workspace = true, features = ["serde"] } +derive-where = { workspace = true } +ractor = { workspace = true } diff --git a/code/crates/host/src/host.rs b/code/crates/host/src/host.rs new file mode 100644 index 000000000..44102721c --- /dev/null +++ b/code/crates/host/src/host.rs @@ -0,0 +1,193 @@ +use bytes::Bytes; +use std::time::Duration; + +use derive_where::derive_where; +use ractor::{ActorRef, RpcReplyPort}; + +use malachitebft_core_consensus::{Role, VoteExtensionError}; +use malachitebft_core_types::{CommitCertificate, Context, Round, ValueId, VoteExtensions}; +use malachitebft_sync::{PeerId, RawDecidedValue}; + +pub use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue}; + +use crate::streaming::StreamMessage; + +/// A reference to the host actor. +pub type HostRef = ActorRef>; + +/// What to do next after a decision. +#[derive_where(Debug)] +pub enum Next { + /// Start at the given height with the given validator set. + Start(Ctx::Height, Ctx::ValidatorSet), + + /// Restart at the given height with the given validator set. + Restart(Ctx::Height, Ctx::ValidatorSet), +} + +/// Messages that need to be handled by the host actor. +#[derive_where(Debug)] +pub enum HostMsg { + /// Notifies the application that consensus is ready. + /// + /// The application MUST reply with a message to instruct + /// consensus to start at a given height. + ConsensusReady { + /// Use this reply port to instruct consensus to start the first height. + reply_to: RpcReplyPort<(Ctx::Height, Ctx::ValidatorSet)>, + }, + + /// Consensus has started a new round. + StartedRound { + /// The height at which the round started. + height: Ctx::Height, + /// The round number that started. + round: Round, + /// The address of the proposer for this round. + proposer: Ctx::Address, + /// The role of the node in this round. + role: Role, + /// Use this reply port to send the undecided values that were already seen for this + /// round. This is needed when recovering from a crash. + /// + /// The application MUST reply immediately with the values it has, or with an empty vector. + reply_to: RpcReplyPort>>, + }, + + /// Request to build a local value to propose + /// + /// The application MUST reply to this message with the requested value + /// within the specified timeout duration. + GetValue { + /// The height at which the value should be proposed. + height: Ctx::Height, + /// The round in which the value should be proposed. + round: Round, + /// The amount of time the application has to build the value. + timeout: Duration, + /// Use this reply port to send the value that was built. + reply_to: RpcReplyPort>, + }, + + /// ExtendVote allows the application to extend the pre-commit vote with arbitrary data. + /// + /// When consensus is preparing to send a pre-commit vote, it first calls `ExtendVote`. + /// The application then returns a blob of data called a vote extension. + /// This data is opaque to the consensus algorithm but can contain application-specific information. + /// The proposer of the next block will receive all vote extensions along with the commit certificate. + ExtendVote { + /// The height at which the vote is being extended. + height: Ctx::Height, + /// The round in which the vote is being extended. + round: Round, + /// The ID of the value that is being voted on. + value_id: ValueId, + /// The vote extension to be added to the vote, if any. + reply_to: RpcReplyPort>, + }, + + /// Verify a vote extension + /// + /// If the vote extension is deemed invalid, the vote it was part of + /// will be discarded altogether. + VerifyVoteExtension { + /// The height for which the vote is. + height: Ctx::Height, + /// The round for which the vote is. + round: Round, + /// The ID of the value that the vote extension is for. + value_id: ValueId, + /// The vote extension to verify. + extension: Ctx::Extension, + /// Use this reply port to send the result of the verification. + reply_to: RpcReplyPort>, + }, + + /// Requests the application to re-stream a proposal that it has already seen. + /// + /// The application MUST re-publish again all the proposal parts pertaining + /// to that value by sending [`NetworkMsg::PublishProposalPart`] messages through + /// the [`Channels::network`] channel. + RestreamValue { + /// The height at which the value was proposed. + height: Ctx::Height, + /// The round in which the value was proposed. + round: Round, + /// The round in which the value was valid. + valid_round: Round, + /// The address of the proposer of the value. + address: Ctx::Address, + /// The ID of the value to restream. + value_id: ValueId, + }, + + /// Requests the earliest height available in the history maintained by the application. + /// + /// The application MUST respond with its earliest available height. + GetHistoryMinHeight { reply_to: RpcReplyPort }, + + /// Notifies the application that consensus has received a proposal part over the network. + /// + /// If this part completes the full proposal, the application MUST respond + /// with the complete proposed value. Otherwise, it MUST respond with `None`. + ReceivedProposalPart { + from: PeerId, + part: StreamMessage, + reply_to: RpcReplyPort>, + }, + + /// Notifies the application that consensus has decided on a value. + /// + /// This message includes a commit certificate containing the ID of + /// the value that was decided on, the height and round at which it was decided, + /// and the aggregated signatures of the validators that committed to it. + /// It also includes to the vote extensions received for that height. + /// + /// In response to this message, the application MUST send a [`Next`] + /// message back to consensus, instructing it to either start the next height if + /// the application was able to commit the decided value, or to restart the current height + /// otherwise. + /// + /// If the application does not reply, consensus will stall. + Decided { + /// The commit certificate containing the ID of the value that was decided on, + /// the the height and round at which it was decided, and the aggregated signatures + /// of the validators that committed to it. + certificate: CommitCertificate, + + /// Vote extensions that were received for this height. + extensions: VoteExtensions, + + /// Use this reply port to instruct consensus to start the next height. + reply_to: RpcReplyPort>, + }, + + /// Requests a previously decided value from the application's storage. + /// + /// The application MUST respond with that value if available, or `None` otherwise. + GetDecidedValue { + /// Height of the decided value to retrieve + height: Ctx::Height, + /// Channel for sending back the decided value + reply_to: RpcReplyPort>>, + }, + + /// Notifies the application that a value has been synced from the network. + /// This may happen when the node is catching up with the network. + /// + /// If a value can be decoded from the bytes provided, then the application MUST reply + /// to this message with the decoded value. Otherwise, it MUST reply with `None`. + ProcessSyncedValue { + /// Height of the synced value + height: Ctx::Height, + /// Round of the synced value + round: Round, + /// Address of the original proposer + proposer: Ctx::Address, + /// Raw encoded value data + value_bytes: Bytes, + /// Channel for sending back the proposed value, if successfully decoded + /// or `None` if the value could not be decoded + reply_to: RpcReplyPort>, + }, +} diff --git a/code/crates/host/src/lib.rs b/code/crates/host/src/lib.rs new file mode 100644 index 000000000..511bd9945 --- /dev/null +++ b/code/crates/host/src/lib.rs @@ -0,0 +1,3 @@ +pub mod host; +mod ser; +pub mod streaming; diff --git a/code/crates/engine/src/ser.rs b/code/crates/host/src/ser.rs similarity index 100% rename from code/crates/engine/src/ser.rs rename to code/crates/host/src/ser.rs diff --git a/code/crates/engine/src/ser/borsh.rs b/code/crates/host/src/ser/borsh.rs similarity index 91% rename from code/crates/engine/src/ser/borsh.rs rename to code/crates/host/src/ser/borsh.rs index 8b56f5a66..657e21309 100644 --- a/code/crates/engine/src/ser/borsh.rs +++ b/code/crates/host/src/ser/borsh.rs @@ -1,4 +1,4 @@ -use crate::util::streaming::StreamId; +use crate::streaming::StreamId; impl borsh::BorshSerialize for StreamId { fn serialize(&self, writer: &mut W) -> borsh::io::Result<()> { diff --git a/code/crates/host/src/streaming.rs b/code/crates/host/src/streaming.rs new file mode 100644 index 000000000..1eca8f1d3 --- /dev/null +++ b/code/crates/host/src/streaming.rs @@ -0,0 +1,96 @@ +use core::fmt; + +use bytes::Bytes; + +pub type Sequence = u64; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct StreamId(pub(crate) Bytes); + +impl StreamId { + pub fn new(bytes: Bytes) -> Self { + Self(bytes) + } + + pub fn to_bytes(&self) -> Bytes { + self.0.clone() + } +} + +impl fmt::Display for StreamId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for byte in &self.0 { + write!(f, "{byte:02x}")?; + } + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr( + feature = "borsh", + derive(::borsh::BorshSerialize, ::borsh::BorshDeserialize) +)] +pub struct StreamMessage { + /// Receivers identify streams by (sender, stream_id). + /// This means each node can allocate stream_ids independently + /// and that many streams can be sent on a single network topic. + pub stream_id: StreamId, + + /// Identifies the sequence of each message in the stream starting from 0. + pub sequence: Sequence, + + /// The content of this stream message + pub content: StreamContent, +} + +impl StreamMessage { + pub fn new(stream_id: StreamId, sequence: Sequence, content: StreamContent) -> Self { + Self { + stream_id, + sequence, + content, + } + } + + pub fn is_first(&self) -> bool { + self.sequence == 0 + } + + pub fn is_fin(&self) -> bool { + self.content.is_fin() + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr( + feature = "borsh", + derive(borsh::BorshSerialize, borsh::BorshDeserialize) +)] +pub enum StreamContent { + /// Serialized content. + Data(T), + + /// Indicates the end of the stream. + Fin, +} + +impl StreamContent { + pub fn as_data(&self) -> Option<&T> { + match self { + Self::Data(data) => Some(data), + _ => None, + } + } + + pub fn into_data(self) -> Option { + match self { + Self::Data(data) => Some(data), + _ => None, + } + } + + pub fn is_fin(&self) -> bool { + matches!(self, Self::Fin) + } +}