Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions code/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"crates/core-types",
"crates/core-votekeeper",
"crates/engine",
"crates/host",
"crates/metrics",
"crates/network",
"crates/peer",
Expand Down Expand Up @@ -82,6 +83,7 @@ malachitebft-core-state-machine = { version = "0.6.0-pre", package = "informalsy
malachitebft-core-types = { version = "0.6.0-pre", package = "informalsystems-malachitebft-core-types", path = "crates/core-types" }
malachitebft-core-votekeeper = { version = "0.6.0-pre", package = "informalsystems-malachitebft-core-votekeeper", path = "crates/core-votekeeper" }
malachitebft-discovery = { version = "0.6.0-pre", package = "informalsystems-malachitebft-discovery", path = "crates/discovery" }
malachitebft-host = { version = "0.6.0-pre", package = "informalsystems-malachitebft-host", path = "crates/host" }
malachitebft-network = { version = "0.6.0-pre", package = "informalsystems-malachitebft-network", path = "crates/network" }
malachitebft-metrics = { version = "0.6.0-pre", package = "informalsystems-malachitebft-metrics", path = "crates/metrics" }
malachitebft-peer = { version = "0.6.0-pre", package = "informalsystems-malachitebft-peer", path = "crates/peer", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions code/crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ malachitebft-core-driver.workspace = true
malachitebft-core-state-machine.workspace = true
malachitebft-core-votekeeper.workspace = true
malachitebft-core-types.workspace = true
malachitebft-host.workspace = true
malachitebft-network.workspace = true
malachitebft-metrics.workspace = true
malachitebft-signing.workspace = true
Expand Down
194 changes: 1 addition & 193 deletions code/crates/engine/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,193 +1 @@
use bytes::Bytes;
use std::time::Duration;

use derive_where::derive_where;
use ractor::{ActorRef, RpcReplyPort};

use malachitebft_core_consensus::{Role, VoteExtensionError};
use malachitebft_core_types::{CommitCertificate, Context, Round, ValueId, VoteExtensions};
use malachitebft_sync::{PeerId, RawDecidedValue};

use crate::util::streaming::StreamMessage;

pub use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue};

/// A reference to the host actor.
pub type HostRef<Ctx> = ActorRef<HostMsg<Ctx>>;

/// What to do next after a decision.
#[derive_where(Debug)]
pub enum Next<Ctx: Context> {
/// Start at the given height with the given validator set.
Start(Ctx::Height, Ctx::ValidatorSet),

/// Restart at the given height with the given validator set.
Restart(Ctx::Height, Ctx::ValidatorSet),
}

/// Messages that need to be handled by the host actor.
#[derive_where(Debug)]
pub enum HostMsg<Ctx: Context> {
/// Notifies the application that consensus is ready.
///
/// The application MUST reply with a message to instruct
/// consensus to start at a given height.
ConsensusReady {
/// Use this reply port to instruct consensus to start the first height.
reply_to: RpcReplyPort<(Ctx::Height, Ctx::ValidatorSet)>,
},

/// Consensus has started a new round.
StartedRound {
/// The height at which the round started.
height: Ctx::Height,
/// The round number that started.
round: Round,
/// The address of the proposer for this round.
proposer: Ctx::Address,
/// The role of the node in this round.
role: Role,
/// Use this reply port to send the undecided values that were already seen for this
/// round. This is needed when recovering from a crash.
///
/// The application MUST reply immediately with the values it has, or with an empty vector.
reply_to: RpcReplyPort<Vec<ProposedValue<Ctx>>>,
},

/// Request to build a local value to propose
///
/// The application MUST reply to this message with the requested value
/// within the specified timeout duration.
GetValue {
/// The height at which the value should be proposed.
height: Ctx::Height,
/// The round in which the value should be proposed.
round: Round,
/// The amount of time the application has to build the value.
timeout: Duration,
/// Use this reply port to send the value that was built.
reply_to: RpcReplyPort<LocallyProposedValue<Ctx>>,
},

/// ExtendVote allows the application to extend the pre-commit vote with arbitrary data.
///
/// When consensus is preparing to send a pre-commit vote, it first calls `ExtendVote`.
/// The application then returns a blob of data called a vote extension.
/// This data is opaque to the consensus algorithm but can contain application-specific information.
/// The proposer of the next block will receive all vote extensions along with the commit certificate.
ExtendVote {
/// The height at which the vote is being extended.
height: Ctx::Height,
/// The round in which the vote is being extended.
round: Round,
/// The ID of the value that is being voted on.
value_id: ValueId<Ctx>,
/// The vote extension to be added to the vote, if any.
reply_to: RpcReplyPort<Option<Ctx::Extension>>,
},

/// Verify a vote extension
///
/// If the vote extension is deemed invalid, the vote it was part of
/// will be discarded altogether.
VerifyVoteExtension {
/// The height for which the vote is.
height: Ctx::Height,
/// The round for which the vote is.
round: Round,
/// The ID of the value that the vote extension is for.
value_id: ValueId<Ctx>,
/// The vote extension to verify.
extension: Ctx::Extension,
/// Use this reply port to send the result of the verification.
reply_to: RpcReplyPort<Result<(), VoteExtensionError>>,
},

/// Requests the application to re-stream a proposal that it has already seen.
///
/// The application MUST re-publish again all the proposal parts pertaining
/// to that value by sending [`NetworkMsg::PublishProposalPart`] messages through
/// the [`Channels::network`] channel.
RestreamValue {
/// The height at which the value was proposed.
height: Ctx::Height,
/// The round in which the value was proposed.
round: Round,
/// The round in which the value was valid.
valid_round: Round,
/// The address of the proposer of the value.
address: Ctx::Address,
/// The ID of the value to restream.
value_id: ValueId<Ctx>,
},

/// Requests the earliest height available in the history maintained by the application.
///
/// The application MUST respond with its earliest available height.
GetHistoryMinHeight { reply_to: RpcReplyPort<Ctx::Height> },

/// Notifies the application that consensus has received a proposal part over the network.
///
/// If this part completes the full proposal, the application MUST respond
/// with the complete proposed value. Otherwise, it MUST respond with `None`.
ReceivedProposalPart {
from: PeerId,
part: StreamMessage<Ctx::ProposalPart>,
reply_to: RpcReplyPort<ProposedValue<Ctx>>,
},

/// Notifies the application that consensus has decided on a value.
///
/// This message includes a commit certificate containing the ID of
/// the value that was decided on, the height and round at which it was decided,
/// and the aggregated signatures of the validators that committed to it.
/// It also includes to the vote extensions received for that height.
///
/// In response to this message, the application MUST send a [`Next`]
/// message back to consensus, instructing it to either start the next height if
/// the application was able to commit the decided value, or to restart the current height
/// otherwise.
///
/// If the application does not reply, consensus will stall.
Decided {
/// The commit certificate containing the ID of the value that was decided on,
/// the the height and round at which it was decided, and the aggregated signatures
/// of the validators that committed to it.
certificate: CommitCertificate<Ctx>,

/// Vote extensions that were received for this height.
extensions: VoteExtensions<Ctx>,

/// Use this reply port to instruct consensus to start the next height.
reply_to: RpcReplyPort<Next<Ctx>>,
},

/// Requests a previously decided value from the application's storage.
///
/// The application MUST respond with that value if available, or `None` otherwise.
GetDecidedValue {
/// Height of the decided value to retrieve
height: Ctx::Height,
/// Channel for sending back the decided value
reply_to: RpcReplyPort<Option<RawDecidedValue<Ctx>>>,
},

/// Notifies the application that a value has been synced from the network.
/// This may happen when the node is catching up with the network.
///
/// If a value can be decoded from the bytes provided, then the application MUST reply
/// to this message with the decoded value. Otherwise, it MUST reply with `None`.
ProcessSyncedValue {
/// Height of the synced value
height: Ctx::Height,
/// Round of the synced value
round: Round,
/// Address of the original proposer
proposer: Ctx::Address,
/// Raw encoded value data
value_bytes: Bytes,
/// Channel for sending back the proposed value, if successfully decoded
/// or `None` if the value could not be decoded
reply_to: RpcReplyPort<ProposedValue<Ctx>>,
},
}
pub use malachitebft_host::host::*;
1 change: 0 additions & 1 deletion code/crates/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub mod consensus;
pub mod host;
pub mod network;
pub mod node;
mod ser;
pub mod sync;
pub mod util;
pub mod wal;
97 changes: 1 addition & 96 deletions code/crates/engine/src/util/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1 @@
use core::fmt;

use bytes::Bytes;

pub type Sequence = u64;

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct StreamId(pub(crate) Bytes);

impl StreamId {
pub fn new(bytes: Bytes) -> Self {
Self(bytes)
}

pub fn to_bytes(&self) -> Bytes {
self.0.clone()
}
}

impl fmt::Display for StreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for byte in &self.0 {
write!(f, "{byte:02x}")?;
}
Ok(())
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "borsh",
derive(::borsh::BorshSerialize, ::borsh::BorshDeserialize)
)]
pub struct StreamMessage<T> {
/// Receivers identify streams by (sender, stream_id).
/// This means each node can allocate stream_ids independently
/// and that many streams can be sent on a single network topic.
pub stream_id: StreamId,

/// Identifies the sequence of each message in the stream starting from 0.
pub sequence: Sequence,

/// The content of this stream message
pub content: StreamContent<T>,
}

impl<T> StreamMessage<T> {
pub fn new(stream_id: StreamId, sequence: Sequence, content: StreamContent<T>) -> Self {
Self {
stream_id,
sequence,
content,
}
}

pub fn is_first(&self) -> bool {
self.sequence == 0
}

pub fn is_fin(&self) -> bool {
self.content.is_fin()
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "borsh",
derive(borsh::BorshSerialize, borsh::BorshDeserialize)
)]
pub enum StreamContent<T> {
/// Serialized content.
Data(T),

/// Indicates the end of the stream.
Fin,
}

impl<T> StreamContent<T> {
pub fn as_data(&self) -> Option<&T> {
match self {
Self::Data(data) => Some(data),
_ => None,
}
}

pub fn into_data(self) -> Option<T> {
match self {
Self::Data(data) => Some(data),
_ => None,
}
}

pub fn is_fin(&self) -> bool {
matches!(self, Self::Fin)
}
}
pub use malachitebft_host::streaming::*;
29 changes: 29 additions & 0 deletions code/crates/host/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "informalsystems-malachitebft-host"
description = "Implementation of the Malachite BFT consensus engine"
version.workspace = true
edition.workspace = true
repository.workspace = true
license.workspace = true
rust-version.workspace = true
publish.workspace = true
readme = "../../../README.md"

[package.metadata.docs.rs]
all-features = true

[features]
borsh = ["dep:borsh"]

[lints]
workspace = true

[dependencies]
malachitebft-core-consensus.workspace = true
malachitebft-core-types.workspace = true
malachitebft-sync.workspace = true

borsh = { workspace = true, optional = true }
bytes = { workspace = true, features = ["serde"] }
derive-where = { workspace = true }
ractor = { workspace = true }
Loading