diff --git a/Cargo.lock b/Cargo.lock index 189c65e8..0d88a33a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.45.3" +version = "0.45.4" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 3e0e708b..c2a8be1d 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.45.3" +version = "0.45.4" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/consensus/handler.rs b/consensus/src/consensus/handler.rs new file mode 100644 index 00000000..5a22b526 --- /dev/null +++ b/consensus/src/consensus/handler.rs @@ -0,0 +1,221 @@ +use crate::{ + alerts::{Alert, ForkingNotification}, + collection::Salt, + consensus::LOG_TARGET, + dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest}, + dissemination::{Addressed, DisseminationMessage, Responder, TaskManager, TaskManagerStatus}, + extension::Ordering, + units::{UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, Validator}, + Data, DelayConfig, Hasher, MultiKeychain, NodeIndex, UnitFinalizationHandler, +}; +use log::{debug, trace}; +use std::{ + cmp::max, + fmt::{Display, Formatter, Result as FmtResult}, + time::Duration, +}; + +/// The main logic of the consensus, minus all the asynchronous components. +pub struct Consensus +where + UFH: UnitFinalizationHandler, + MK: MultiKeychain, +{ + store: UnitStore>, + dag: Dag, + responder: Responder, + ordering: Ordering, + task_manager: TaskManager, +} + +/// The status of the consensus, for logging purposes. +pub struct Status { + task_manager_status: TaskManagerStatus, + dag_status: DagStatus, + store_status: UnitStoreStatus, +} + +impl Status { + fn short_report(&self) -> String { + let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round()) + - self.store_status.top_round(); + match rounds_behind { + (0..=2) => "healthy".to_string(), + (3..) => format!("behind by {rounds_behind} rounds"), + } + } +} + +impl Display for Status { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "{}", self.short_report())?; + write!(f, ";reconstructed DAG: {}", self.store_status)?; + write!(f, ";additional information: {}", self.dag_status)?; + write!(f, ";task manager: {}", self.task_manager_status)?; + Ok(()) + } +} + +type AddressedDisseminationMessage = Addressed>; + +/// The result of some operation within the consensus, requiring either other components should get +/// informed about it, or messages should be sent to the network. +pub struct ConsensusResult { + /// Units that should be sent for backup saving. + pub units: Vec>, + /// Alerts that should be sent to the alerting component. + pub alerts: Vec>, + /// Messages that should be sent to other committee members. + pub messages: Vec>, +} + +impl ConsensusResult { + fn noop() -> Self { + ConsensusResult { + units: Vec::new(), + alerts: Vec::new(), + messages: Vec::new(), + } + } +} + +impl Consensus +where + UFH: UnitFinalizationHandler, + MK: MultiKeychain, +{ + /// Create a new Consensus. + pub fn new( + keychain: MK, + validator: Validator, + finalization_handler: UFH, + delay_config: DelayConfig, + ) -> Self { + let n_members = keychain.node_count(); + let index = keychain.index(); + Consensus { + store: UnitStore::new(n_members), + dag: Dag::new(validator), + responder: Responder::new(keychain), + ordering: Ordering::new(finalization_handler), + task_manager: TaskManager::new(index, n_members, delay_config), + } + } + + fn handle_dag_result( + &mut self, + result: DagResult, + ) -> ConsensusResult { + let DagResult { + units, + alerts, + requests, + } = result; + for request in requests { + self.task_manager.add_request(request); + } + let messages = self.trigger_tasks(); + ConsensusResult { + units, + alerts, + messages, + } + } + + /// Process a unit received (usually) from the network. + pub fn process_incoming_unit( + &mut self, + unit: UncheckedSignedUnit, + ) -> ConsensusResult { + let result = self.dag.add_unit(unit, &self.store); + self.handle_dag_result(result) + } + + /// Process a request received from the network. + pub fn process_request( + &mut self, + request: ReconstructionRequest, + node_id: NodeIndex, + ) -> Option> { + match self.responder.handle_request(request, &self.store) { + Ok(response) => Some(Addressed::addressed_to(response.into(), node_id)), + Err(err) => { + debug!(target: LOG_TARGET, "Not answering request from node {:?}: {}.", node_id, err); + None + } + } + } + + /// Process a parents response. + pub fn process_parents( + &mut self, + u_hash: ::Hash, + parents: Vec>, + ) -> ConsensusResult { + if self.store.unit(&u_hash).is_some() { + trace!(target: LOG_TARGET, "We got parents response but already imported the unit."); + return ConsensusResult::noop(); + } + let result = self.dag.add_parents(u_hash, parents, &self.store); + self.handle_dag_result(result) + } + + /// Process a newest unit request. + pub fn process_newest_unit_request( + &mut self, + salt: Salt, + node_id: NodeIndex, + ) -> AddressedDisseminationMessage { + Addressed::addressed_to( + self.responder + .handle_newest_unit_request(node_id, salt, &self.store) + .into(), + node_id, + ) + } + + /// Process a forking notification. + pub fn process_forking_notification( + &mut self, + notification: ForkingNotification, + ) -> ConsensusResult { + let result = self + .dag + .process_forking_notification(notification, &self.store); + self.handle_dag_result(result) + } + + /// What to do once a unit has been securely backed up on disk. + pub fn on_unit_backup_saved( + &mut self, + unit: DagUnit, + ) -> Option> { + let unit_hash = unit.hash(); + self.store.insert(unit.clone()); + self.dag.finished_processing(&unit_hash); + self.ordering.add_unit(unit.clone()); + self.task_manager.add_unit(&unit) + } + + /// When should `trigger_tasks` be called next. + pub fn next_tick(&self) -> Duration { + self.task_manager.next_tick() + } + + /// Trigger all the ready tasks and get all the messages that should be sent now. + pub fn trigger_tasks( + &mut self, + ) -> Vec> { + self.task_manager + .trigger_tasks(&self.store, self.dag.processing_units()) + } + + /// The status of the consensus handler, for logging purposes. + pub fn status(&self) -> Status { + Status { + dag_status: self.dag.status(), + store_status: self.store.status(), + task_manager_status: self.task_manager.status(), + } + } +} diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 108b26d9..5cc00145 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -2,7 +2,10 @@ use crate::{ alerts::{Handler as AlertHandler, Service as AlertService, IO as AlertIO}, backup::{BackupLoader, BackupSaver}, collection::initial_unit_collection, - consensus::service::{Config as ServiceConfig, Service}, + consensus::{ + handler::Consensus, + service::{Service, IO as ConsensusIO}, + }, creation, handle_task_termination, interface::LocalIO, network::{Hub as NetworkHub, NetworkData}, @@ -16,6 +19,7 @@ use futures::{ }; use log::{debug, error, info}; +mod handler; mod service; const LOG_TARGET: &str = "AlephBFT-consensus"; @@ -185,13 +189,16 @@ pub async fn run_session< pin_mut!(starting_round_handle); debug!(target: LOG_TARGET, "Initial unit collection spawned."); - //TODO: just put the Service here debug!(target: LOG_TARGET, "Spawning consensus service."); + let consensus = Consensus::new( + keychain.clone(), + validator.clone(), + finalization_handler, + config.delay_config().clone(), + ); let service_handle = spawn_handle .spawn_essential("consensus/service", { - let service_config = ServiceConfig { - delay_config: config.delay_config().clone(), - finalization_handler, + let consensus_io = ConsensusIO { backup_units_for_saver, backup_units_from_saver, alerts_for_alerter, @@ -203,9 +210,7 @@ pub async fn run_session< new_units_from_creator, }; let service_terminator = terminator.add_offspring_connection("service"); - let validator = validator.clone(); - let keychain = keychain.clone(); - let service = Service::new(service_config, keychain, validator); + let service = Service::new(consensus, consensus_io); async move { service.run(loaded_units, service_terminator).await } }) diff --git a/consensus/src/consensus/service.rs b/consensus/src/consensus/service.rs index 2206f155..3d0bb95d 100644 --- a/consensus/src/consensus/service.rs +++ b/consensus/src/consensus/service.rs @@ -1,34 +1,27 @@ use crate::{ alerts::{Alert, ForkingNotification}, collection::CollectionResponse, - config::DelayConfig, - consensus::LOG_TARGET, - dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest}, - dissemination::{Addressed, DisseminationMessage, Responder, TaskManager, TaskManagerStatus}, - extension::Ordering, + consensus::{ + handler::{Consensus, ConsensusResult}, + LOG_TARGET, + }, + dag::DagUnit, + dissemination::{Addressed, DisseminationMessage}, network::{UnitMessage, UnitMessageTo}, - units::{SignedUnit, UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, Validator}, - Hasher, Index, MultiKeychain, Receiver, Sender, Terminator, UnitFinalizationHandler, + units::{SignedUnit, UncheckedSignedUnit, Unit}, + Data, Hasher, Index, MultiKeychain, Receiver, Sender, Terminator, UnitFinalizationHandler, }; use futures::{FutureExt, StreamExt}; use futures_timer::Delay; use log::{debug, error, info, trace, warn}; -use std::{ - cmp::max, - fmt::{Display, Formatter, Result as FmtResult}, - time::Duration, -}; +use std::time::Duration; pub struct Service where UFH: UnitFinalizationHandler, MK: MultiKeychain, { - store: UnitStore>, - dag: Dag, - ordering: Ordering, - responder: Responder, - task_manager: TaskManager, + handler: Consensus, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, @@ -42,47 +35,16 @@ where exiting: bool, } -struct Status { - task_manager_status: TaskManagerStatus, - dag_status: DagStatus, - store_status: UnitStoreStatus, -} - -impl Status { - fn short_report(&self) -> String { - let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round()) - - self.store_status.top_round(); - match rounds_behind { - (0..=2) => "healthy".to_string(), - (3..) => format!("behind by {rounds_behind} rounds"), - } - } -} - -impl Display for Status { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - write!(f, "{}", self.short_report())?; - write!(f, ";task manager: {}", self.task_manager_status)?; - write!(f, ";reconstructed DAG: {}", self.store_status)?; - write!(f, ";additional information: {}", self.dag_status)?; - write!(f, ".")?; - Ok(()) - } -} - -pub struct Config { - pub delay_config: DelayConfig, - pub finalization_handler: UFH, - pub backup_units_for_saver: Sender>, - pub backup_units_from_saver: Receiver>, - pub alerts_for_alerter: Sender>, - pub notifications_from_alerter: - Receiver>, - pub unit_messages_for_network: Sender>, - pub unit_messages_from_network: Receiver>, - pub responses_for_collection: Sender>, - pub parents_for_creator: Sender>, - pub new_units_from_creator: Receiver>, +pub struct IO { + pub backup_units_for_saver: Sender>, + pub backup_units_from_saver: Receiver>, + pub alerts_for_alerter: Sender>, + pub notifications_from_alerter: Receiver>, + pub unit_messages_for_network: Sender>, + pub unit_messages_from_network: Receiver>, + pub responses_for_collection: Sender>, + pub parents_for_creator: Sender>, + pub new_units_from_creator: Receiver>, } impl Service @@ -90,12 +52,8 @@ where UFH: UnitFinalizationHandler, MK: MultiKeychain, { - pub fn new(config: Config, keychain: MK, validator: Validator) -> Self { - let n_members = keychain.node_count(); - let own_id = keychain.index(); - let Config { - delay_config, - finalization_handler, + pub fn new(handler: Consensus, io: IO) -> Self { + let IO { backup_units_for_saver, backup_units_from_saver, alerts_for_alerter, @@ -105,18 +63,10 @@ where responses_for_collection, parents_for_creator, new_units_from_creator, - } = config; - let store = UnitStore::new(n_members); - let dag = Dag::new(validator); - let ordering = Ordering::new(finalization_handler); - let task_manager = TaskManager::new(own_id, n_members, delay_config); + } = io; Service { - store, - dag, - ordering, - task_manager, - responder: Responder::new(keychain), + handler, alerts_for_alerter, notifications_from_alerter, unit_messages_from_network, @@ -135,31 +85,31 @@ where self.exiting = true; } - fn handle_dag_result(&mut self, result: DagResult) { - let DagResult { + fn handle_result(&mut self, result: ConsensusResult) { + let ConsensusResult { units, - requests, alerts, + messages, } = result; for unit in units { self.on_unit_reconstructed(unit); } - for request in requests { - self.on_reconstruction_request(request); - } for alert in alerts { if self.alerts_for_alerter.unbounded_send(alert).is_err() { self.crucial_channel_closed("Alerter"); } } + for message in messages { + self.send_message_for_network(message.into()) + } } fn on_unit_received( &mut self, unit: UncheckedSignedUnit, ) { - let result = self.dag.add_unit(unit, &self.store); - self.handle_dag_result(result); + let result = self.handler.process_incoming_unit(unit); + self.handle_result(result); } fn on_unit_message( @@ -172,29 +122,21 @@ where trace!(target: LOG_TARGET, "New unit received {:?}.", &u); self.on_unit_received(u) } - Request(node_id, request) => { trace!(target: LOG_TARGET, "Request {:?} received from {:?}.", request, node_id); - match self.responder.handle_request(request, &self.store) { - Ok(response) => self.send_message_for_network( - Addressed::addressed_to(response, node_id).into(), - ), - Err(err) => { - debug!(target: LOG_TARGET, "Not answering request from node {:?}: {}.", node_id, err) - } + if let Some(message) = self.handler.process_request(request, node_id) { + self.send_message_for_network(message.into()); } } - ParentsResponse(u_hash, parents) => { trace!(target: LOG_TARGET, "Response parents received for unit {:?}.", u_hash); - self.on_parents_response(u_hash, parents) + let result = self.handler.process_parents(u_hash, parents); + self.handle_result(result); } NewestUnitRequest(node_id, salt) => { trace!(target: LOG_TARGET, "Newest unit request received from {:?}.", node_id); - let response = - self.responder - .handle_newest_unit_request(node_id, salt, &self.store); - self.send_message_for_network(Addressed::addressed_to(response, node_id).into()) + let message = self.handler.process_newest_unit_request(salt, node_id); + self.send_message_for_network(message.into()) } NewestUnitResponse(response) => { trace!(target: LOG_TARGET, "Response newest unit received from {:?}.", response.index()); @@ -209,39 +151,16 @@ where } } - fn on_parents_response( - &mut self, - u_hash: ::Hash, - parents: Vec>, - ) { - if self.store.unit(&u_hash).is_some() { - trace!(target: LOG_TARGET, "We got parents response but already imported the unit."); - return; - } - let result = self.dag.add_parents(u_hash, parents, &self.store); - self.handle_dag_result(result); - } - fn on_forking_notification( &mut self, notification: ForkingNotification, ) { - let result = self - .dag - .process_forking_notification(notification, &self.store); - self.handle_dag_result(result); - } - - fn on_reconstruction_request(&mut self, request: ReconstructionRequest) { - self.task_manager.add_request(request); - self.trigger_tasks(); + let result = self.handler.process_forking_notification(notification); + self.handle_result(result); } fn trigger_tasks(&mut self) { - for message in self - .task_manager - .trigger_tasks(&self.store, self.dag.processing_units()) - { + for message in self.handler.trigger_tasks() { self.send_message_for_network(message); } } @@ -254,12 +173,6 @@ where } fn on_unit_backup_saved(&mut self, unit: DagUnit) { - let unit_hash = unit.hash(); - self.store.insert(unit.clone()); - self.dag.finished_processing(&unit_hash); - if let Some(message) = self.task_manager.add_unit(&unit) { - self.send_message_for_network(message); - } if self .parents_for_creator .unbounded_send(unit.clone()) @@ -267,7 +180,9 @@ where { self.crucial_channel_closed("Creator"); } - self.ordering.add_unit(unit.clone()); + if let Some(message) = self.handler.on_unit_backup_saved(unit) { + self.send_message_for_network(message); + } } fn send_message_for_network( @@ -285,16 +200,8 @@ where } } - fn status(&self) -> Status { - Status { - task_manager_status: self.task_manager.status(), - dag_status: self.dag.status(), - store_status: self.store.status(), - } - } - fn status_report(&self) { - info!(target: LOG_TARGET, "Consensus status report: {}", self.status()); + info!(target: LOG_TARGET, "Consensus status report: {}.", self.handler.status()); } pub async fn run( @@ -304,7 +211,7 @@ where ) { let status_ticker_delay = Duration::from_secs(10); let mut status_ticker = Delay::new(status_ticker_delay).fuse(); - let mut task_ticker = Delay::new(self.task_manager.next_tick()).fuse(); + let mut task_ticker = Delay::new(self.handler.next_tick()).fuse(); for unit in data_from_backup { self.on_unit_received(unit); @@ -314,7 +221,9 @@ where loop { futures::select! { signed_unit = self.new_units_from_creator.next() => match signed_unit { - Some(signed_unit) => self.on_unit_received(signed_unit.into()), + Some(signed_unit) => { + self.on_unit_received(signed_unit.into()) + }, None => { error!(target: LOG_TARGET, "Creation stream closed."); break; @@ -349,7 +258,7 @@ where _ = &mut task_ticker => { self.trigger_tasks(); - task_ticker = Delay::new(self.task_manager.next_tick()).fuse(); + task_ticker = Delay::new(self.handler.next_tick()).fuse(); }, _ = &mut status_ticker => {