diff --git a/lib/src/sync.rs b/lib/src/sync.rs index bd160153cf..e1c4fef32e 100644 --- a/lib/src/sync.rs +++ b/lib/src/sync.rs @@ -21,13 +21,10 @@ //! > **Note**: While the above summary is a bit abstract, in practice it is almost always //! > done by exchanging messages through a peer-to-peer network. //! -//! Multiple strategies exist for syncing, one for each sub-module, and which one to employ -//! depends on the amount of information that is desired (e.g. is it required to know the header -//! and/or body of every single block, or can some blocks be skipped?) and the distance between -//! the highest block of the local chain and the highest block available on the remotes. -//! -//! The [`all`] module represents a good combination of all syncing strategies and should be the -//! default choice for most clients. +//! There exists multiple syncing strategies, depending on the amount of information that is. +//! desired and the distance between the highest block of the local chain and the highest block +//! available from the syncing sources. +//! The [`all`] module provides a state machine that combines several syncing strategies into one. //! //! # Security considerations //! @@ -70,6 +67,5 @@ pub mod all; pub mod all_forks; -pub mod optimistic; pub mod para; pub mod warp_sync; diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index a0be364405..520a616177 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -15,10 +15,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! All syncing strategies (optimistic, warp sync, all forks) grouped together. +//! All syncing strategies grouped together. //! -//! This state machine combines GrandPa warp syncing, optimistic syncing, and all forks syncing -//! into one state machine. +//! This state machine combines GrandPa warp syncing and all forks syncing into one state machine. //! //! # Overview //! @@ -35,7 +34,7 @@ use crate::{ executor::host, finality::decode, header, - sync::{all_forks, optimistic, warp_sync}, + sync::{all_forks, warp_sync}, trie::Nibble, verify, }; @@ -55,6 +54,8 @@ pub use warp_sync::{ WarpSyncFragment, }; +use super::all_forks::AllForksSync; + /// Configuration for the [`AllSync`]. // TODO: review these fields #[derive(Debug)] @@ -203,16 +204,16 @@ impl AllSync { )) => { // On error, `warp_sync` returns back the chain information that was // provided in its configuration. - AllSyncInner::Optimistic { - inner: optimistic::OptimisticSync::new(optimistic::Config { - chain_information, - block_number_bytes: config.block_number_bytes, - sources_capacity: config.sources_capacity, - blocks_capacity: config.blocks_capacity, - download_ahead_blocks: config.download_ahead_blocks, - download_bodies: false, - }), - } + AllSyncInner::AllForks(AllForksSync::new(all_forks::Config { + chain_information, + block_number_bytes: config.block_number_bytes, + sources_capacity: config.sources_capacity, + blocks_capacity: config.blocks_capacity, + download_bodies: false, + allow_unknown_consensus_engines: config.allow_unknown_consensus_engines, + max_disjoint_headers: config.max_disjoint_headers, + max_requests_per_block: config.max_requests_per_block, + })) } }, shared: Shared { @@ -240,7 +241,6 @@ impl AllSync { match &self.inner { AllSyncInner::AllForks(sync) => sync.as_chain_information(), AllSyncInner::WarpSync { inner, .. } => inner.as_chain_information(), - AllSyncInner::Optimistic { inner } => inner.as_chain_information(), AllSyncInner::Poisoned => unreachable!(), } } @@ -276,7 +276,6 @@ impl AllSync { finalized_block_number, }, }, - AllSyncInner::Optimistic { .. } => Status::Sync, // TODO: right now we don't differentiate between AllForks and Optimistic, as they're kind of similar anyway AllSyncInner::Poisoned => unreachable!(), } } @@ -285,7 +284,6 @@ impl AllSync { pub fn finalized_block_header(&self) -> header::HeaderRef { match &self.inner { AllSyncInner::AllForks(sync) => sync.finalized_block_header(), - AllSyncInner::Optimistic { inner } => inner.finalized_block_header(), AllSyncInner::WarpSync { inner, .. } => { inner.as_chain_information().as_ref().finalized_block_header } @@ -300,7 +298,6 @@ impl AllSync { pub fn best_block_header(&self) -> header::HeaderRef { match &self.inner { AllSyncInner::AllForks(sync) => sync.best_block_header(), - AllSyncInner::Optimistic { inner } => inner.best_block_header(), AllSyncInner::WarpSync { .. } => self.finalized_block_header(), AllSyncInner::Poisoned => unreachable!(), } @@ -313,7 +310,6 @@ impl AllSync { pub fn best_block_number(&self) -> u64 { match &self.inner { AllSyncInner::AllForks(sync) => sync.best_block_number(), - AllSyncInner::Optimistic { inner } => inner.best_block_number(), AllSyncInner::WarpSync { .. } => self.best_block_header().number, AllSyncInner::Poisoned => unreachable!(), } @@ -326,7 +322,6 @@ impl AllSync { pub fn best_block_hash(&self) -> [u8; 32] { match &self.inner { AllSyncInner::AllForks(sync) => sync.best_block_hash(), - AllSyncInner::Optimistic { inner } => inner.best_block_hash(), AllSyncInner::WarpSync { .. } => self .best_block_header() .hash(self.shared.block_number_bytes), @@ -337,8 +332,7 @@ impl AllSync { /// Returns consensus information about the current best block of the chain. pub fn best_block_consensus(&self) -> chain_information::ChainInformationConsensusRef { match &self.inner { - AllSyncInner::AllForks(_) => todo!(), // TODO: - AllSyncInner::Optimistic { inner } => inner.best_block_consensus(), + AllSyncInner::AllForks(_) => todo!(), // TODO: AllSyncInner::WarpSync { .. } => todo!(), // TODO: ?! AllSyncInner::Poisoned => unreachable!(), } @@ -352,11 +346,7 @@ impl AllSync { let iter = sync.non_finalized_blocks_unordered(); either::Left(iter) } - AllSyncInner::Optimistic { inner } => { - let iter = inner.non_finalized_blocks_unordered(); - either::Right(either::Left(iter)) - } - AllSyncInner::WarpSync { .. } => either::Right(either::Right(iter::empty())), + AllSyncInner::WarpSync { .. } => either::Right(iter::empty()), AllSyncInner::Poisoned => unreachable!(), } } @@ -371,11 +361,7 @@ impl AllSync { let iter = sync.non_finalized_blocks_ancestry_order(); either::Left(iter) } - AllSyncInner::Optimistic { inner } => { - let iter = inner.non_finalized_blocks_ancestry_order(); - either::Right(either::Left(iter)) - } - AllSyncInner::WarpSync { .. } => either::Right(either::Right(iter::empty())), + AllSyncInner::WarpSync { .. } => either::Right(iter::empty()), AllSyncInner::Poisoned => unreachable!(), } } @@ -387,7 +373,6 @@ impl AllSync { pub fn is_near_head_of_chain_heuristic(&self) -> bool { match &self.inner { AllSyncInner::AllForks(_) => true, - AllSyncInner::Optimistic { .. } => false, AllSyncInner::WarpSync { .. } => false, AllSyncInner::Poisoned => unreachable!(), } @@ -457,23 +442,6 @@ impl AllSync { self.inner = AllSyncInner::AllForks(all_forks); outer_source_id } - AllSyncInner::Optimistic { mut inner } => { - let outer_source_id_entry = self.shared.sources.vacant_entry(); - let outer_source_id = SourceId(outer_source_id_entry.key()); - - let source_id = inner.add_source( - OptimisticSourceExtra { - user_data, - outer_source_id, - best_block_hash, - }, - best_block_number, - ); - outer_source_id_entry.insert(SourceMapping::Optimistic(source_id)); - - self.inner = AllSyncInner::Optimistic { inner }; - outer_source_id - } AllSyncInner::Poisoned => unreachable!(), } } @@ -522,33 +490,6 @@ impl AllSync { (user_data.user_data, requests) } - (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(source_id)) => { - let (user_data, requests) = inner.remove_source(source_id); - // TODO: do properly - let self_requests = &mut self.shared.requests; - let requests = requests - .map(move |(_inner_request_id, request_inner_user_data)| { - debug_assert!( - self_requests.contains(request_inner_user_data.outer_request_id.0) - ); - let _removed = - self_requests.remove(request_inner_user_data.outer_request_id.0); - debug_assert!(matches!( - _removed, - RequestMapping::Optimistic(_inner_request_id) - )); - ( - request_inner_user_data.outer_request_id, - request_inner_user_data.user_data, - ) - }) - .collect::>() - .into_iter(); - - // TODO: also handle the "inline" requests - - (user_data.user_data, requests) - } (AllSyncInner::WarpSync { inner, .. }, SourceMapping::WarpSync(source_id)) => { let (user_data, requests) = inner.remove_source(source_id); @@ -586,10 +527,6 @@ impl AllSync { // other. (AllSyncInner::WarpSync { .. }, SourceMapping::AllForks(_)) => unreachable!(), (AllSyncInner::AllForks(_), SourceMapping::WarpSync(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::AllForks(_)) => unreachable!(), - (AllSyncInner::AllForks(_), SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::WarpSync { .. }, SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::WarpSync(_)) => unreachable!(), } } @@ -600,13 +537,9 @@ impl AllSync { let iter = inner.sources().map(move |id| inner[id].outer_source_id); either::Left(iter) } - AllSyncInner::Optimistic { inner: sync } => { - let iter = sync.sources().map(move |id| sync[id].outer_source_id); - either::Right(either::Left(iter)) - } AllSyncInner::AllForks(sync) => { let iter = sync.sources().map(move |id| sync[id].outer_source_id); - either::Right(either::Right(iter)) + either::Right(iter) } AllSyncInner::Poisoned => unreachable!(), } @@ -633,9 +566,6 @@ impl AllSync { (AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => { sync.source_num_ongoing_requests(*src) } - (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(src)) => { - inner.source_num_ongoing_requests(*src) - } (AllSyncInner::WarpSync { inner, .. }, SourceMapping::WarpSync(src)) => { inner.source_num_ongoing_requests(*src) } @@ -646,10 +576,6 @@ impl AllSync { // other. (AllSyncInner::WarpSync { .. }, SourceMapping::AllForks(_)) => unreachable!(), (AllSyncInner::AllForks(_), SourceMapping::WarpSync(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::AllForks(_)) => unreachable!(), - (AllSyncInner::AllForks(_), SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::WarpSync { .. }, SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::WarpSync(_)) => unreachable!(), }; num_inline + num_inner @@ -670,11 +596,6 @@ impl AllSync { (AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => { sync.source_best_block(*src) } - (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(src)) => { - let height = inner.source_best_block(*src); - let hash = &inner[*src].best_block_hash; - (height, hash) - } (AllSyncInner::WarpSync { inner, .. }, SourceMapping::WarpSync(src)) => { let ud = &inner[*src]; (ud.best_block_number, &ud.best_block_hash) @@ -686,10 +607,6 @@ impl AllSync { // other. (AllSyncInner::WarpSync { .. }, SourceMapping::AllForks(_)) => unreachable!(), (AllSyncInner::AllForks(_), SourceMapping::WarpSync(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::AllForks(_)) => unreachable!(), - (AllSyncInner::AllForks(_), SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::WarpSync { .. }, SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::WarpSync(_)) => unreachable!(), } } @@ -715,10 +632,6 @@ impl AllSync { (AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => { sync.source_knows_non_finalized_block(*src, height, hash) } - (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(src)) => { - // TODO: is this correct? - inner.source_best_block(*src) >= height - } (AllSyncInner::WarpSync { inner, .. }, SourceMapping::WarpSync(src)) => { assert!( height @@ -739,10 +652,6 @@ impl AllSync { // other. (AllSyncInner::WarpSync { .. }, SourceMapping::AllForks(_)) => unreachable!(), (AllSyncInner::AllForks(_), SourceMapping::WarpSync(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::AllForks(_)) => unreachable!(), - (AllSyncInner::AllForks(_), SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::WarpSync { .. }, SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::WarpSync(_)) => unreachable!(), } } @@ -786,15 +695,7 @@ impl AllSync { let iter = sync .knows_non_finalized_block(height, hash) .map(move |id| sync[id].outer_source_id); - either::Left(either::Left(iter)) - } - AllSyncInner::Optimistic { inner } => { - // TODO: is this correct? - let iter = inner - .sources() - .filter(move |source_id| inner.source_best_block(*source_id) >= height) - .map(move |source_id| inner[source_id].outer_source_id); - either::Left(either::Right(iter)) + either::Left(iter) } AllSyncInner::Poisoned => unreachable!(), } @@ -850,17 +751,6 @@ impl AllSync { }, ); - either::Left(either::Right(iter)) - } - AllSyncInner::Optimistic { inner } => { - let iter = inner.desired_requests().map(move |rq_detail| { - ( - inner[rq_detail.source_id].outer_source_id, - &inner[rq_detail.source_id].user_data, - optimistic_request_convert(rq_detail, self.shared.download_bodies), - ) - }); - either::Right(iter) } AllSyncInner::WarpSync { inner, .. } => { @@ -913,7 +803,7 @@ impl AllSync { ) }); - either::Left(either::Left(iter)) + either::Left(iter) } AllSyncInner::Poisoned => unreachable!(), } @@ -969,39 +859,6 @@ impl AllSync { request_mapping_entry.insert(RequestMapping::AllForks(inner_request_id)); return outer_request_id; } - ( - AllSyncInner::Optimistic { inner }, - RequestDetail::BlocksRequest { - ascending: true, // TODO: ? - first_block_height, - num_blocks, - .. - }, - ) => { - let inner_source_id = match self.shared.sources.get(source_id.0).unwrap() { - SourceMapping::Optimistic(inner_source_id) => *inner_source_id, - _ => unreachable!(), - }; - - let request_mapping_entry = self.shared.requests.vacant_entry(); - let outer_request_id = RequestId(request_mapping_entry.key()); - - let inner_request_id = inner.insert_request( - optimistic::RequestDetail { - source_id: inner_source_id, - block_height: NonZeroU64::new(*first_block_height).unwrap(), // TODO: correct to unwrap? - num_blocks: NonZeroU32::new(u32::try_from(num_blocks.get()).unwrap()) - .unwrap(), // TODO: don't unwrap - }, - OptimisticRequestExtra { - outer_request_id, - user_data, - }, - ); - - request_mapping_entry.insert(RequestMapping::Optimistic(inner_request_id)); - return outer_request_id; - } ( AllSyncInner::WarpSync { inner, .. }, RequestDetail::WarpSync { @@ -1122,7 +979,6 @@ impl AllSync { return outer_request_id; } (AllSyncInner::AllForks { .. }, _) => {} - (AllSyncInner::Optimistic { .. }, _) => {} (AllSyncInner::WarpSync { .. }, _) => {} (AllSyncInner::Poisoned, _) => unreachable!(), } @@ -1157,20 +1013,7 @@ impl AllSync { ); either::Left(iter) } - AllSyncInner::Optimistic { inner } => { - let iter = inner - .obsolete_requests() - .map(move |(_, rq)| rq.outer_request_id) - .chain( - self.shared - .requests - .iter() - .filter(|(_, rq)| matches!(rq, RequestMapping::Inline(..))) - .map(|(id, _)| RequestId(id)), - ); - either::Right(either::Left(iter)) - } - AllSyncInner::WarpSync { .. } => either::Right(either::Right(iter::empty())), // TODO: not implemented properly + AllSyncInner::WarpSync { .. } => either::Right(iter::empty()), // TODO: not implemented properly AllSyncInner::Poisoned => unreachable!(), } } @@ -1186,9 +1029,6 @@ impl AllSync { (AllSyncInner::AllForks(inner), RequestMapping::AllForks(rq)) => { inner[inner.request_source_id(*rq)].outer_source_id } - (AllSyncInner::Optimistic { inner }, RequestMapping::Optimistic(rq)) => { - inner[inner.request_source_id(*rq)].outer_source_id - } (AllSyncInner::WarpSync { inner, .. }, RequestMapping::WarpSync(rq)) => { inner[inner.request_source_id(*rq)].outer_source_id } @@ -1281,24 +1121,6 @@ impl AllSync { }) } }, - AllSyncInner::Optimistic { inner } => match inner.process_one() { - optimistic::ProcessOne::Idle { sync } => { - self.inner = AllSyncInner::Optimistic { inner: sync }; - ProcessOne::AllSync(self) - } - optimistic::ProcessOne::VerifyBlock(inner) => { - ProcessOne::VerifyBlock(BlockVerify { - inner: BlockVerifyInner::Optimistic(inner), - shared: self.shared, - }) - } - optimistic::ProcessOne::VerifyJustification(inner) => { - ProcessOne::VerifyFinalityProof(FinalityProofVerify { - inner: FinalityProofVerifyInner::Optimistic(inner), - shared: self.shared, - }) - } - }, AllSyncInner::Poisoned => unreachable!(), } } @@ -1336,21 +1158,6 @@ impl AllSync { } } } - (AllSyncInner::Optimistic { inner }, &SourceMapping::Optimistic(source_id)) => { - match header::decode(&announced_scale_encoded_header, inner.block_number_bytes()) { - Ok(header) => { - if is_best { - inner.raise_source_best_block(source_id, header.number); - inner[source_id].best_block_hash = - header::hash_from_scale_encoded_header( - &announced_scale_encoded_header, - ); - } - BlockAnnounceOutcome::Discarded - } - Err(err) => BlockAnnounceOutcome::InvalidHeader(err), - } - } (AllSyncInner::WarpSync { inner, .. }, &SourceMapping::WarpSync(source_id)) => { match header::decode( &announced_scale_encoded_header, @@ -1378,10 +1185,6 @@ impl AllSync { // other. (AllSyncInner::WarpSync { .. }, SourceMapping::AllForks(_)) => unreachable!(), (AllSyncInner::AllForks(_), SourceMapping::WarpSync(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::AllForks(_)) => unreachable!(), - (AllSyncInner::AllForks(_), SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::WarpSync { .. }, SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::WarpSync(_)) => unreachable!(), } } @@ -1402,7 +1205,6 @@ impl AllSync { (AllSyncInner::AllForks(sync), SourceMapping::AllForks(source_id)) => { sync.update_source_finality_state(*source_id, finalized_block_height) } - (AllSyncInner::Optimistic { .. }, _) => {} // TODO: the optimistic sync could get some help from the finalized block (AllSyncInner::WarpSync { inner, .. }, SourceMapping::WarpSync(source_id)) => { inner.set_source_finality_state(*source_id, finalized_block_height); } @@ -1448,7 +1250,6 @@ impl AllSync { inner.set_source_finality_state(*source_id, block_number); GrandpaCommitMessageOutcome::Discarded } - (AllSyncInner::Optimistic { .. }, _) => GrandpaCommitMessageOutcome::Discarded, // Invalid internal states. (AllSyncInner::AllForks(_), _) => unreachable!(), @@ -1577,41 +1378,6 @@ impl AllSync { debug_assert_eq!(request_user_data.outer_request_id, request_id); (request_user_data.user_data.unwrap(), outcome) } - (AllSyncInner::Optimistic { inner }, RequestMapping::Optimistic(inner_request_id)) => { - let (request_user_data, outcome) = if let Ok(blocks) = blocks { - let (request_user_data, outcome) = inner.finish_request_success( - inner_request_id, - blocks.map(|block| optimistic::RequestSuccessBlock { - scale_encoded_header: block.scale_encoded_header, - scale_encoded_justifications: block - .scale_encoded_justifications - .into_iter() - .map(|j| (j.engine_id, j.justification)) - .collect(), - scale_encoded_extrinsics: block.scale_encoded_extrinsics, - user_data: block.user_data, - }), - ); - - match outcome { - optimistic::FinishRequestOutcome::Obsolete => { - (request_user_data, ResponseOutcome::Outdated) - } - optimistic::FinishRequestOutcome::Queued => { - (request_user_data, ResponseOutcome::Queued) - } - } - } else { - // TODO: `ResponseOutcome::Queued` is a hack - ( - inner.finish_request_failed(inner_request_id), - ResponseOutcome::Queued, - ) - }; - - debug_assert_eq!(request_user_data.outer_request_id, request_id); - (request_user_data.user_data, outcome) - } _ => unreachable!(), } } @@ -1814,9 +1580,6 @@ impl ops::Index for AllSync { debug_assert!(self.shared.sources.contains(source_id.0)); match (&self.inner, self.shared.sources.get(source_id.0).unwrap()) { (AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => &sync[*src].user_data, - (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(src)) => { - &inner[*src].user_data - } (AllSyncInner::WarpSync { inner, .. }, SourceMapping::WarpSync(src)) => { &inner[*src].user_data } @@ -1827,10 +1590,6 @@ impl ops::Index for AllSync { // other. (AllSyncInner::WarpSync { .. }, SourceMapping::AllForks(_)) => unreachable!(), (AllSyncInner::AllForks(_), SourceMapping::WarpSync(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::AllForks(_)) => unreachable!(), - (AllSyncInner::AllForks(_), SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::WarpSync { .. }, SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::WarpSync(_)) => unreachable!(), } } } @@ -1846,9 +1605,6 @@ impl ops::IndexMut for AllSync { (AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => { &mut sync[*src].user_data } - (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(src)) => { - &mut inner[*src].user_data - } (AllSyncInner::WarpSync { inner, .. }, SourceMapping::WarpSync(src)) => { &mut inner[*src].user_data } @@ -1859,10 +1615,6 @@ impl ops::IndexMut for AllSync { // other. (AllSyncInner::WarpSync { .. }, SourceMapping::AllForks(_)) => unreachable!(), (AllSyncInner::AllForks(_), SourceMapping::WarpSync(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::AllForks(_)) => unreachable!(), - (AllSyncInner::AllForks(_), SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::WarpSync { .. }, SourceMapping::Optimistic(_)) => unreachable!(), - (AllSyncInner::Optimistic { .. }, SourceMapping::WarpSync(_)) => unreachable!(), } } } @@ -1874,7 +1626,6 @@ impl<'a, TRq, TSrc, TBl> ops::Index<(u64, &'a [u8; 32])> for AllSync &TBl { match &self.inner { AllSyncInner::AllForks(inner) => inner[(block_height, block_hash)].as_ref().unwrap(), - AllSyncInner::Optimistic { inner, .. } => &inner[block_hash], AllSyncInner::WarpSync { .. } => panic!("unknown block"), // No block is ever stored during the warp syncing. AllSyncInner::Poisoned => unreachable!(), } @@ -1886,7 +1637,6 @@ impl<'a, TRq, TSrc, TBl> ops::IndexMut<(u64, &'a [u8; 32])> for AllSync &mut TBl { match &mut self.inner { AllSyncInner::AllForks(inner) => inner[(block_height, block_hash)].as_mut().unwrap(), - AllSyncInner::Optimistic { inner, .. } => &mut inner[block_hash], AllSyncInner::WarpSync { .. } => panic!("unknown block"), // No block is ever stored during the warp syncing. AllSyncInner::Poisoned => unreachable!(), } @@ -2224,9 +1974,6 @@ enum BlockVerifyInner { AllForks( all_forks::BlockVerify, AllForksRequestExtra, AllForksSourceExtra>, ), - Optimistic( - optimistic::BlockVerify, OptimisticSourceExtra, TBl>, - ), } impl BlockVerify { @@ -2234,7 +1981,6 @@ impl BlockVerify { pub fn hash(&self) -> [u8; 32] { match &self.inner { BlockVerifyInner::AllForks(verify) => *verify.hash(), - BlockVerifyInner::Optimistic(verify) => verify.hash(), } } @@ -2245,12 +1991,7 @@ impl BlockVerify { &'_ self, ) -> Option + Clone + '_> + Clone + '_> { match &self.inner { - BlockVerifyInner::AllForks(verify) => verify - .scale_encoded_extrinsics() - .map(|iter| either::Left(iter.map(either::Left))), - BlockVerifyInner::Optimistic(verify) => verify - .scale_encoded_extrinsics() - .map(|iter| either::Right(iter.map(either::Right))), + BlockVerifyInner::AllForks(verify) => verify.scale_encoded_extrinsics(), } } @@ -2258,7 +1999,6 @@ impl BlockVerify { pub fn scale_encoded_header(&self) -> Vec { match &self.inner { BlockVerifyInner::AllForks(verify) => verify.scale_encoded_header(), - BlockVerifyInner::Optimistic(verify) => verify.scale_encoded_header().to_vec(), } } @@ -2304,31 +2044,6 @@ impl BlockVerify { } } } - BlockVerifyInner::Optimistic(verify) => { - let verified_block_hash = verify.hash(); - - match verify.verify_header(now_from_unix_epoch) { - optimistic::BlockVerification::NewBest { success, .. } => { - HeaderVerifyOutcome::Success { - is_new_best: true, - success: HeaderVerifySuccess { - inner: HeaderVerifySuccessInner::Optimistic(success), - shared: self.shared, - verified_block_hash, - }, - } - } - optimistic::BlockVerification::Reset { sync, .. } => { - HeaderVerifyOutcome::Error { - sync: AllSync { - inner: AllSyncInner::Optimistic { inner: sync }, - shared: self.shared, - }, - error: HeaderVerifyError::ConsensusMismatch, // TODO: dummy error cause /!\ - } - } - } - } } } } @@ -2377,13 +2092,6 @@ enum HeaderVerifySuccessInner { AllForksSourceExtra, >, ), - Optimistic( - optimistic::BlockVerifySuccess< - OptimisticRequestExtra, - OptimisticSourceExtra, - TBl, - >, - ), } impl HeaderVerifySuccess { @@ -2391,7 +2099,6 @@ impl HeaderVerifySuccess { pub fn height(&self) -> u64 { match &self.inner { HeaderVerifySuccessInner::AllForks(verify) => verify.height(), - HeaderVerifySuccessInner::Optimistic(verify) => verify.height(), } } @@ -2399,7 +2106,6 @@ impl HeaderVerifySuccess { pub fn hash(&self) -> [u8; 32] { match &self.inner { HeaderVerifySuccessInner::AllForks(verify) => *verify.hash(), - HeaderVerifySuccessInner::Optimistic(verify) => verify.hash(), } } @@ -2410,12 +2116,7 @@ impl HeaderVerifySuccess { &'_ self, ) -> Option + Clone + '_> + Clone + '_> { match &self.inner { - HeaderVerifySuccessInner::AllForks(verify) => verify - .scale_encoded_extrinsics() - .map(|iter| either::Left(iter.map(either::Left))), - HeaderVerifySuccessInner::Optimistic(verify) => verify - .scale_encoded_extrinsics() - .map(|iter| either::Right(iter.map(either::Right))), + HeaderVerifySuccessInner::AllForks(verify) => verify.scale_encoded_extrinsics(), } } @@ -2423,7 +2124,6 @@ impl HeaderVerifySuccess { pub fn parent_hash(&self) -> &[u8; 32] { match &self.inner { HeaderVerifySuccessInner::AllForks(verify) => verify.parent_hash(), - HeaderVerifySuccessInner::Optimistic(verify) => verify.parent_hash(), } } @@ -2434,7 +2134,6 @@ impl HeaderVerifySuccess { HeaderVerifySuccessInner::AllForks(verify) => { verify.parent_user_data().map(|ud| ud.as_ref().unwrap()) // TODO: don't unwrap } - HeaderVerifySuccessInner::Optimistic(verify) => verify.parent_user_data(), } } @@ -2442,7 +2141,6 @@ impl HeaderVerifySuccess { pub fn scale_encoded_header(&self) -> &[u8] { match &self.inner { HeaderVerifySuccessInner::AllForks(verify) => verify.scale_encoded_header(), - HeaderVerifySuccessInner::Optimistic(verify) => verify.scale_encoded_header(), } } @@ -2450,7 +2148,6 @@ impl HeaderVerifySuccess { pub fn parent_scale_encoded_header(&self) -> Vec { match &self.inner { HeaderVerifySuccessInner::AllForks(inner) => inner.parent_scale_encoded_header(), - HeaderVerifySuccessInner::Optimistic(inner) => inner.parent_scale_encoded_header(), } } @@ -2464,13 +2161,6 @@ impl HeaderVerifySuccess { shared: self.shared, } } - HeaderVerifySuccessInner::Optimistic(inner) => { - let sync = inner.cancel(); - AllSync { - inner: AllSyncInner::Optimistic { inner: sync }, - shared: self.shared, - } - } } } @@ -2484,13 +2174,6 @@ impl HeaderVerifySuccess { shared: self.shared, } } - HeaderVerifySuccessInner::Optimistic(inner) => { - let sync = inner.reject_bad_block(); - AllSync { - inner: AllSyncInner::Optimistic { inner: sync }, - shared: self.shared, - } - } } } @@ -2506,18 +2189,10 @@ impl HeaderVerifySuccess { shared: self.shared, } } - HeaderVerifySuccessInner::Optimistic(inner) => { - let sync = inner.finish(user_data); - AllSync { - inner: AllSyncInner::Optimistic { inner: sync }, - shared: self.shared, - } - } } } } -// TODO: should be used by the optimistic syncing as well pub struct FinalityProofVerify { inner: FinalityProofVerifyInner, shared: Shared, @@ -2531,13 +2206,6 @@ enum FinalityProofVerifyInner { AllForksSourceExtra, >, ), - Optimistic( - optimistic::JustificationVerify< - OptimisticRequestExtra, - OptimisticSourceExtra, - TBl, - >, - ), } impl FinalityProofVerify { @@ -2548,10 +2216,6 @@ impl FinalityProofVerify { let sender = inner.sender().1; (sender.outer_source_id, &sender.user_data) } - FinalityProofVerifyInner::Optimistic(inner) => { - let sender = inner.sender().1; - (sender.outer_source_id, &sender.user_data) - } } } @@ -2614,40 +2278,6 @@ impl FinalityProofVerify { outcome, ) } - FinalityProofVerifyInner::Optimistic(verify) => match verify.perform(randomness_seed) { - ( - inner, - optimistic::JustificationVerification::Finalized { - finalized_blocks_newest_to_oldest: finalized_blocks, - }, - ) => ( - // TODO: transition to all_forks - AllSync { - inner: AllSyncInner::Optimistic { inner }, - shared: self.shared, - }, - FinalityProofVerifyOutcome::NewFinalized { - finalized_blocks_newest_to_oldest: finalized_blocks - .into_iter() - .map(|b| Block { - header: b.header, - justifications: b.justifications, - user_data: b.user_data, - full: b.full.map(|b| BlockFull { body: b.body }), - }) - .collect(), - pruned_blocks: Vec::new(), - updates_best_block: false, - }, - ), - (inner, optimistic::JustificationVerification::Reset { error, .. }) => ( - AllSync { - inner: AllSyncInner::Optimistic { inner }, - shared: self.shared, - }, - FinalityProofVerifyOutcome::JustificationError(error), - ), - }, } } } @@ -2802,13 +2432,6 @@ enum AllSyncInner { inner: warp_sync::WarpSync, WarpSyncRequestExtra>, ready_to_transition: Option, }, - Optimistic { - inner: optimistic::OptimisticSync< - OptimisticRequestExtra, - OptimisticSourceExtra, - TBl, - >, - }, // TODO: we store an `Option` instead of `TBl` due to API issues; the all.rs doesn't let you insert user datas for pending blocks while the AllForksSync lets you; `None` is stored while a block is pending AllForks( all_forks::AllForksSync, AllForksRequestExtra, AllForksSourceExtra>, @@ -2826,17 +2449,6 @@ struct AllForksRequestExtra { user_data: Option, // TODO: why option? } -struct OptimisticSourceExtra { - user_data: TSrc, - best_block_hash: [u8; 32], - outer_source_id: SourceId, -} - -struct OptimisticRequestExtra { - outer_request_id: RequestId, - user_data: TRq, -} - struct WarpSyncSourceExtra { outer_source_id: SourceId, user_data: TSrc, @@ -3011,7 +2623,6 @@ impl Shared { enum RequestMapping { Inline(SourceId, RequestDetail, TRq), AllForks(all_forks::RequestId), - Optimistic(optimistic::RequestId), WarpSync(warp_sync::RequestId), } @@ -3019,7 +2630,6 @@ enum RequestMapping { enum SourceMapping { WarpSync(warp_sync::SourceId), AllForks(all_forks::SourceId), - Optimistic(optimistic::SourceId), } fn all_forks_request_convert( @@ -3036,18 +2646,3 @@ fn all_forks_request_convert( request_justification: true, } } - -fn optimistic_request_convert( - rq_params: optimistic::RequestDetail, - download_body: bool, -) -> DesiredRequest { - DesiredRequest::BlocksRequest { - ascending: true, // Hardcoded based on the logic of the optimistic syncing. - first_block_hash: None, - first_block_height: rq_params.block_height.get(), - num_blocks: rq_params.num_blocks.into(), - request_bodies: download_body, - request_headers: true, - request_justification: true, - } -} diff --git a/lib/src/sync/optimistic.rs b/lib/src/sync/optimistic.rs deleted file mode 100644 index 3164189a99..0000000000 --- a/lib/src/sync/optimistic.rs +++ /dev/null @@ -1,1220 +0,0 @@ -// Smoldot -// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Optimistic header and body syncing. -//! -//! This state machine builds, from a set of sources, a fully verified chain of blocks headers -//! and bodies. -//! -//! # Overview -//! -//! The algorithm used by this state machine is called "optimistic syncing". It consists in -//! sending requests for blocks to a certain list of sources, aggregating the answers, and -//! verifying them. -//! -//! The [`OptimisticSync`] struct holds a list of sources, a list of pending block requests, -//! a chain, and a list of blocks received as answers and waiting to be verified. -//! -//! The requests are emitted ahead of time, so that they can be answered asynchronously while -//! blocks in the verification queue are being processed. -//! -//! The syncing is said to be *optimistic* because it is assumed that all sources will provide -//! correct blocks. -//! In the case where the verification of a block fails, the state machine jumps back to the -//! latest known finalized block and resumes syncing from there, possibly using different sources -//! this time. -//! -//! The *optimism* aspect comes from the fact that, while a bad source can't corrupt the state of -//! the local chain, and can't stall the syncing process (unless there isn't any other source -//! available), it can still slow it down. - -// TODO: document better -// TODO: this entire module needs clean up - -use crate::{ - chain::{blocks_tree, chain_information}, - header, -}; - -use alloc::{ - boxed::Box, - collections::BTreeSet, - vec::{self, Vec}, -}; -use core::{ - cmp, fmt, iter, mem, - num::{NonZeroU32, NonZeroU64}, - ops, - time::Duration, -}; -use hashbrown::HashMap; - -mod verification_queue; - -/// Configuration for the [`OptimisticSync`]. -#[derive(Debug)] -pub struct Config { - /// Information about the latest finalized block and its ancestors. - pub chain_information: chain_information::ValidChainInformation, - - /// Number of bytes used when encoding/decoding the block number. Influences how various data - /// structures should be parsed. - pub block_number_bytes: usize, - - /// Pre-allocated capacity for the number of block sources. - pub sources_capacity: usize, - - /// Pre-allocated capacity for the number of blocks between the finalized block and the head - /// of the chain. - /// - /// Should be set to the maximum number of block between two consecutive justifications. - pub blocks_capacity: usize, - - /// Number of blocks to download ahead of the best block. - /// - /// Whenever the latest best block is updated, the state machine will start block - /// requests for the block `best_block_height + download_ahead_blocks` and all its - /// ancestors. Considering that requesting blocks has some latency, downloading blocks ahead - /// of time ensures that verification isn't blocked waiting for a request to be finished. - /// - /// The ideal value here depends on the speed of blocks verification speed and latency of - /// block requests. - pub download_ahead_blocks: NonZeroU32, - - /// If `true`, the downloaded block bodies are stored in the state machine. - pub download_bodies: bool, -} - -/// Identifier for an ongoing request in the [`OptimisticSync`]. -#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] -pub struct RequestId(u64); - -impl RequestId { - /// Returns a value that compares inferior or equal to any other [`RequestId`]. - pub fn min_value() -> Self { - Self(u64::min_value()) - } - - /// Returns a value that compares superior or equal to any other [`RequestId`]. - pub fn max_value() -> Self { - Self(u64::max_value()) - } -} - -/// Identifier for a source in the [`OptimisticSync`]. -#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] -pub struct SourceId(u64); - -/// Optimistic headers-only syncing. -pub struct OptimisticSync { - /// Data structure containing the blocks. - /// - /// The user data, [`Block`], isn't used internally but stores information later reported - /// to the user. - chain: blocks_tree::NonFinalizedTree>, - - /// Extra fields. In a separate structure in order to be moved around. - /// - /// A `Box` is used in order to minimize the impact of moving the value around, and to reduce - /// the size of the [`OptimisticSync`]. - inner: Box>, -} - -/// Extra fields. In a separate structure in order to be moved around. -struct OptimisticSyncInner { - /// Configuration for the actual finalized block of the chain. - /// Used if the `chain` field needs to be recreated. - finalized_chain_information: blocks_tree::Config, - - /// See [`Config::download_bodies`]. - download_bodies: bool, - - /// See [`Config::download_ahead_blocks`]. - download_ahead_blocks: NonZeroU32, - - /// List of sources of blocks. - sources: HashMap, fnv::FnvBuildHasher>, - - /// Next [`SourceId`] to allocate. - /// `SourceIds` are unique so that the source in the [`verification_queue::VerificationQueue`] - /// doesn't accidentally collide with a new source. - next_source_id: SourceId, - - /// Queue of block requests, either waiting to be started, in progress, or completed. - verification_queue: - verification_queue::VerificationQueue<(RequestId, TRq), RequestSuccessBlock>, - - /// Justifications, if any, of the block that has just been verified. - // TODO: clean up when a source is removed - pending_encoded_justifications: vec::IntoIter<([u8; 4], Vec, SourceId)>, - - /// Identifier to assign to the next request. - next_request_id: RequestId, - - /// Requests that have been started but whose answers are no longer desired. - obsolete_requests: HashMap, - - /// Same as [`OptimisticSyncInner::obsolete_requests`], but ordered differently. - obsolete_requests_by_source: BTreeSet<(SourceId, RequestId)>, -} - -impl OptimisticSyncInner { - fn make_requests_obsolete(&mut self, chain: &blocks_tree::NonFinalizedTree>) { - let former_queue = mem::replace( - &mut self.verification_queue, - verification_queue::VerificationQueue::new(chain.best_block_header().number + 1), - ); - - for ((request_id, user_data), source) in former_queue.into_requests() { - let _was_in = self - .obsolete_requests - .insert(request_id, (source, user_data)); - debug_assert!(_was_in.is_none()); - let _was_inserted = self - .obsolete_requests_by_source - .insert((source, request_id)); - debug_assert!(_was_inserted); - debug_assert_eq!( - self.obsolete_requests.len(), - self.obsolete_requests_by_source.len() - ); - } - } - - fn with_requests_obsoleted( - mut self: Box, - chain: &blocks_tree::NonFinalizedTree>, - ) -> Box { - self.make_requests_obsolete(chain); - self - } -} - -struct Source { - /// Opaque value passed to [`OptimisticSync::add_source`]. - user_data: TSrc, - - /// Best block that the source has reported having. - best_block_number: u64, - - /// Number of requests that use this source. - num_ongoing_requests: u32, -} - -// TODO: doc -pub struct Block { - /// Header of the block. - pub header: header::Header, - - /// SCALE-encoded justification of this block, if any. - pub justifications: Vec<([u8; 4], Vec)>, - - /// User data associated to the block. - pub user_data: TBl, - - /// Extra fields for full block verifications. - pub full: Option, -} - -// TODO: doc -pub struct BlockFull { - /// List of SCALE-encoded extrinsics that form the block's body. - pub body: Vec>, -} - -impl OptimisticSync { - /// Builds a new [`OptimisticSync`]. - pub fn new(config: Config) -> Self { - let blocks_tree_config = blocks_tree::Config { - chain_information: config.chain_information, - block_number_bytes: config.block_number_bytes, - blocks_capacity: config.blocks_capacity, - // Considering that we rely on justifications to sync, there is no drawback in - // accepting blocks with unrecognized consensus engines. While this could lead to - // accepting blocks that wouldn't otherwise be accepted, it is already the case that - // a malicious node could send non-finalized blocks. Accepting blocks with an - // unrecognized consensus engine doesn't add any additional risk. - allow_unknown_consensus_engines: true, - }; - - let chain = blocks_tree::NonFinalizedTree::new(blocks_tree_config.clone()); - let best_block_header_num = chain.best_block_header().number; - - OptimisticSync { - chain, - inner: Box::new(OptimisticSyncInner { - finalized_chain_information: blocks_tree_config, - download_bodies: config.download_bodies, - sources: HashMap::with_capacity_and_hasher( - config.sources_capacity, - Default::default(), - ), - next_source_id: SourceId(0), - verification_queue: verification_queue::VerificationQueue::new( - best_block_header_num + 1, - ), - pending_encoded_justifications: Vec::new().into_iter(), - download_ahead_blocks: config.download_ahead_blocks, - next_request_id: RequestId(0), - obsolete_requests: HashMap::with_capacity_and_hasher(0, Default::default()), - obsolete_requests_by_source: BTreeSet::new(), - }), - } - } - - /// Returns the value that was initially passed in [`Config::block_number_bytes`]. - pub fn block_number_bytes(&self) -> usize { - self.chain.block_number_bytes() - } - - /// Builds a [`chain_information::ChainInformationRef`] struct corresponding to the current - /// latest finalized block. Can later be used to reconstruct a chain. - pub fn as_chain_information(&self) -> chain_information::ValidChainInformationRef { - self.chain.as_chain_information() - } - - /// Returns the header of the finalized block. - pub fn finalized_block_header(&self) -> header::HeaderRef { - self.inner - .finalized_chain_information - .chain_information - .as_ref() - .finalized_block_header - } - - /// Returns the header of the best block. - /// - /// > **Note**: This value is provided only for informative purposes. Keep in mind that this - /// > best block might be reverted in the future. - pub fn best_block_header(&self) -> header::HeaderRef { - self.chain.best_block_header() - } - - /// Returns the number of the best block. - /// - /// > **Note**: This value is provided only for informative purposes. Keep in mind that this - /// > best block might be reverted in the future. - pub fn best_block_number(&self) -> u64 { - self.chain.best_block_header().number - } - - /// Returns the hash of the best block. - /// - /// > **Note**: This value is provided only for informative purposes. Keep in mind that this - /// > best block might be reverted in the future. - pub fn best_block_hash(&self) -> [u8; 32] { - self.chain.best_block_hash() - } - - /// Returns consensus information about the current best block of the chain. - pub fn best_block_consensus(&self) -> chain_information::ChainInformationConsensusRef { - self.chain.best_block_consensus() - } - - /// Returns the header of all known non-finalized blocks in the chain without any specific - /// order. - pub fn non_finalized_blocks_unordered( - &'_ self, - ) -> impl Iterator> + '_ { - self.chain.iter_unordered() - } - - /// Returns the header of all known non-finalized blocks in the chain. - /// - /// The returned items are guaranteed to be in an order in which the parents are found before - /// their children. - pub fn non_finalized_blocks_ancestry_order( - &'_ self, - ) -> impl Iterator> + '_ { - self.chain.iter_ancestry_order() - } - - /// Disassembles the state machine into its raw components. - pub fn disassemble(self) -> Disassemble { - Disassemble { - chain_information: self.inner.finalized_chain_information.chain_information, - sources: self - .inner - .sources - .into_iter() - .map(|(id, source)| DisassembleSource { - id, - user_data: source.user_data, - best_block_number: source.best_block_number, - }) - .collect(), - requests: self - .inner - .verification_queue - .into_requests() - .map(|((request_id, user_data), _)| (request_id, user_data)) - .collect(), - } - } - - /// Inform the [`OptimisticSync`] of a new potential source of blocks. - pub fn add_source(&mut self, source: TSrc, best_block_number: u64) -> SourceId { - let new_id = { - let id = self.inner.next_source_id; - self.inner.next_source_id.0 += 1; - id - }; - - self.inner.sources.insert( - new_id, - Source { - user_data: source, - best_block_number, - num_ongoing_requests: 0, - }, - ); - - new_id - } - - /// Returns the current best block of the given source. - /// - /// This corresponds either the latest call to [`OptimisticSync::raise_source_best_block`], - /// or to the parameter passed to [`OptimisticSync::add_source`]. - /// - /// # Panic - /// - /// Panics if the [`SourceId`] is invalid. - /// - pub fn source_best_block(&self, source_id: SourceId) -> u64 { - self.inner - .sources - .get(&source_id) - .unwrap() - .best_block_number - } - - /// Updates the best known block of the source. - /// - /// Has no effect if the previously-known best block is lower than the new one. - /// - /// # Panic - /// - /// Panics if the [`SourceId`] is invalid. - /// - pub fn raise_source_best_block(&mut self, id: SourceId, best_block_number: u64) { - let current = &mut self.inner.sources.get_mut(&id).unwrap().best_block_number; - if *current < best_block_number { - *current = best_block_number; - } - } - - /// Inform the [`OptimisticSync`] that a source of blocks is no longer available. - /// - /// This automatically cancels all the requests that have been emitted for this source. - /// This list of requests is returned as part of this function. - /// - /// # Panic - /// - /// Panics if the [`SourceId`] is invalid. - /// - pub fn remove_source( - &'_ mut self, - source_id: SourceId, - ) -> (TSrc, impl Iterator + '_) { - let obsolete_requests_to_remove = self - .inner - .obsolete_requests_by_source - .range((source_id, RequestId::min_value())..=(source_id, RequestId::max_value())) - .map(|(_, id)| *id) - .collect::>(); - let mut obsolete_requests = Vec::with_capacity(obsolete_requests_to_remove.len()); - for rq_id in obsolete_requests_to_remove { - let (_, user_data) = self.inner.obsolete_requests.remove(&rq_id).unwrap(); - obsolete_requests.push((rq_id, user_data)); - let _was_in = self - .inner - .obsolete_requests_by_source - .remove(&(source_id, rq_id)); - debug_assert!(_was_in); - } - - debug_assert_eq!( - self.inner.obsolete_requests.len(), - self.inner.obsolete_requests_by_source.len() - ); - - let src_user_data = self.inner.sources.remove(&source_id).unwrap().user_data; - let drain = RequestsDrain { - iter: self.inner.verification_queue.drain_source(source_id), - }; - (src_user_data, drain.chain(obsolete_requests)) - } - - /// Returns the list of sources in this state machine. - pub fn sources(&'_ self) -> impl ExactSizeIterator + '_ { - self.inner.sources.keys().copied() - } - - /// Returns the number of ongoing requests that concern this source. - /// - /// # Panic - /// - /// Panics if the [`SourceId`] is invalid. - /// - pub fn source_num_ongoing_requests(&self, source_id: SourceId) -> usize { - let num_obsolete = self - .inner - .obsolete_requests_by_source - .range((source_id, RequestId::min_value())..=(source_id, RequestId::max_value())) - .count(); - let num_regular = self - .inner - .verification_queue - .source_num_ongoing_requests(source_id); - num_obsolete + num_regular - } - - /// Returns an iterator that yields all the requests whose outcome is no longer desired. - pub fn obsolete_requests(&'_ self) -> impl Iterator + '_ { - self.inner - .obsolete_requests - .iter() - .map(|(id, (_, ud))| (*id, ud)) - } - - /// Returns an iterator that yields all requests that could be started. - pub fn desired_requests(&'_ self) -> impl Iterator + '_ { - let sources = &self.inner.sources; - self.inner - .verification_queue - .desired_requests(self.inner.download_ahead_blocks) - .flat_map(move |e| sources.iter().map(move |s| (e, s))) - .filter_map(|((block_height, num_blocks), (source_id, source))| { - let source_avail_blocks = NonZeroU32::new( - u32::try_from(source.best_block_number.checked_sub(block_height.get())? + 1) - .unwrap(), - ) - .unwrap(); - Some(RequestDetail { - block_height, - num_blocks: cmp::min(source_avail_blocks, num_blocks), - source_id: *source_id, - }) - }) - } - - /// Updates the [`OptimisticSync`] with the fact that a request has been started. - /// - /// Returns the identifier for the request that must later be passed back to - /// [`OptimisticSync::finish_request_success`] or [`OptimisticSync::finish_request_failed`]. - /// - /// # Panic - /// - /// Panics if the [`SourceId`] is invalid. - /// - pub fn insert_request(&mut self, detail: RequestDetail, user_data: TRq) -> RequestId { - self.inner - .sources - .get_mut(&detail.source_id) - .unwrap() - .num_ongoing_requests += 1; - - let request_id = self.inner.next_request_id; - self.inner.next_request_id.0 += 1; - - match self.inner.verification_queue.insert_request( - detail.block_height, - detail.num_blocks, - detail.source_id, - (request_id, user_data), - ) { - Ok(()) => {} - Err((_, user_data)) => { - self.inner - .obsolete_requests - .insert(request_id, (detail.source_id, user_data)); - let _was_inserted = self - .inner - .obsolete_requests_by_source - .insert((detail.source_id, request_id)); - debug_assert!(_was_inserted); - debug_assert_eq!( - self.inner.obsolete_requests.len(), - self.inner.obsolete_requests_by_source.len() - ); - } - } - - request_id - } - - /// Update the [`OptimisticSync`] with the successful outcome of a request. - /// - /// Returns the user data that was associated to that request. - /// - /// If [`Config::download_bodies`] was `false`, then the values of - /// [`RequestSuccessBlock::scale_encoded_extrinsics`] are silently ignored. - /// - /// > **Note**: If [`Config::download_bodies`] is `false`, you are encouraged to not request - /// > the block's body from the source altogether, and to fill the - /// > [`RequestSuccessBlock::scale_encoded_extrinsics`] fields with `Vec::new()`. - /// - /// # Panic - /// - /// Panics if the [`RequestId`] is invalid. - /// - pub fn finish_request_success( - &mut self, - request_id: RequestId, - blocks: impl Iterator>, - ) -> (TRq, FinishRequestOutcome) { - if let Some((source_id, user_data)) = self.inner.obsolete_requests.remove(&request_id) { - self.inner.obsolete_requests.shrink_to_fit(); - let _was_in = self - .inner - .obsolete_requests_by_source - .remove(&(source_id, request_id)); - debug_assert!(_was_in); - debug_assert_eq!( - self.inner.obsolete_requests.len(), - self.inner.obsolete_requests_by_source.len() - ); - self.inner - .sources - .get_mut(&source_id) - .unwrap() - .num_ongoing_requests -= 1; - return (user_data, FinishRequestOutcome::Obsolete); - } - - // TODO: important /!\ should check whether the block bodies match the extrinsics root in the headers, in order to differentiate invalid blocks from malicious peers - - let ((_, user_data), source_id) = self - .inner - .verification_queue - .finish_request(|(rq, _)| *rq == request_id, Ok(blocks)); - - self.inner - .sources - .get_mut(&source_id) - .unwrap() - .num_ongoing_requests -= 1; - - (user_data, FinishRequestOutcome::Queued) - } - - /// Update the [`OptimisticSync`] with the information that the given request has failed. - /// - /// Returns the user data that was associated to that request. - /// - /// # Panic - /// - /// Panics if the [`RequestId`] is invalid. - /// - pub fn finish_request_failed(&mut self, request_id: RequestId) -> TRq { - if let Some((source_id, user_data)) = self.inner.obsolete_requests.remove(&request_id) { - self.inner.obsolete_requests.shrink_to_fit(); - let _was_in = self - .inner - .obsolete_requests_by_source - .remove(&(source_id, request_id)); - debug_assert!(_was_in); - debug_assert_eq!( - self.inner.obsolete_requests.len(), - self.inner.obsolete_requests_by_source.len() - ); - self.inner - .sources - .get_mut(&source_id) - .unwrap() - .num_ongoing_requests -= 1; - return user_data; - } - - let ((_, user_data), source_id) = self.inner.verification_queue.finish_request( - |(rq, _)| *rq == request_id, - Result::, _>::Err(()), - ); - - self.inner - .sources - .get_mut(&source_id) - .unwrap() - .num_ongoing_requests -= 1; - - user_data - } - - /// Returns the [`SourceId`] that is expected to fulfill the given request. - /// - /// # Panic - /// - /// Panics if the [`RequestId`] is invalid. - /// - pub fn request_source_id(&self, request_id: RequestId) -> SourceId { - if let Some((src, _)) = self.inner.obsolete_requests.get(&request_id) { - *src - } else { - self.inner - .verification_queue - .requests() - .find(|(rq, _)| rq.0 == request_id) - .unwrap() - .1 - } - } - - /// Process the next block in the queue of verification. - /// - /// This method takes ownership of the [`OptimisticSync`]. The [`OptimisticSync`] is yielded - /// back in the returned value. - pub fn process_one(self) -> ProcessOne { - if !self - .inner - .pending_encoded_justifications - .as_slice() - .is_empty() - { - return ProcessOne::VerifyJustification(JustificationVerify { - chain: self.chain, - inner: self.inner, - }); - } - - // The block isn't immediately extracted. A `Verify` struct is built, whose existence - // confirms that a block is ready. If the `Verify` is dropped without `start` being called, - // the block stays in the list. - if self.inner.verification_queue.blocks_ready() { - ProcessOne::VerifyBlock(BlockVerify { - inner: self.inner, - chain: self.chain, - }) - } else { - ProcessOne::Idle { sync: self } - } - } -} - -impl ops::Index for OptimisticSync { - type Output = TSrc; - - #[track_caller] - fn index(&self, source_id: SourceId) -> &TSrc { - &self.inner.sources.get(&source_id).unwrap().user_data - } -} - -impl ops::IndexMut for OptimisticSync { - #[track_caller] - fn index_mut(&mut self, source_id: SourceId) -> &mut TSrc { - &mut self.inner.sources.get_mut(&source_id).unwrap().user_data - } -} - -impl<'a, TRq, TSrc, TBl> ops::Index<&'a [u8; 32]> for OptimisticSync { - type Output = TBl; - - #[track_caller] - fn index(&self, block_hash: &'a [u8; 32]) -> &TBl { - &self.chain[block_hash].user_data - } -} - -impl<'a, TRq, TSrc, TBl> ops::IndexMut<&'a [u8; 32]> for OptimisticSync { - #[track_caller] - fn index_mut(&mut self, block_hash: &'a [u8; 32]) -> &mut TBl { - &mut self.chain[block_hash].user_data - } -} - -pub struct RequestSuccessBlock { - pub scale_encoded_header: Vec, - pub scale_encoded_justifications: Vec<([u8; 4], Vec)>, - pub scale_encoded_extrinsics: Vec>, - pub user_data: TBl, -} - -/// State of the processing of blocks. -pub enum ProcessOne { - /// No processing is necessary. - /// - /// Calling [`OptimisticSync::process_one`] again is unnecessary. - Idle { - /// The state machine. - /// The [`OptimisticSync::process_one`] method takes ownership of the - /// [`OptimisticSync`]. This field yields it back. - sync: OptimisticSync, - }, - - VerifyBlock(BlockVerify), - - VerifyJustification(JustificationVerify), -} - -/// Start the processing of a block verification. -pub struct BlockVerify { - inner: Box>, - chain: blocks_tree::NonFinalizedTree>, -} - -impl BlockVerify { - /// Returns the hash of the block about to be verified. - pub fn hash(&self) -> [u8; 32] { - header::hash_from_scale_encoded_header(self.scale_encoded_header()) - } - - /// Returns the list of SCALE-encoded extrinsics of the block to verify. - /// - /// This is `Some` if and only if [`Config::download_bodies`] is `true` - pub fn scale_encoded_extrinsics( - &'_ self, - ) -> Option + Clone + '_> + Clone + '_> { - if self.inner.download_bodies { - let block = self.inner.verification_queue.first_block().unwrap(); - Some(block.scale_encoded_extrinsics.iter()) - } else { - None - } - } - - /// Returns the SCALE-encoded header of the block about to be verified. - pub fn scale_encoded_header(&self) -> &[u8] { - &self - .inner - .verification_queue - .first_block() - .unwrap() - .scale_encoded_header - } - - /// Verify the header of the block. - /// - /// Must be passed the current UNIX time in order to verify that the block doesn't pretend to - /// come from the future. - pub fn verify_header( - mut self, - now_from_unix_epoch: Duration, - ) -> BlockVerification { - // Extract the block to process. We are guaranteed that a block is available because a - // `Verify` is built only when that is the case. - // Be aware that `source_id` might refer to an obsolete source. - let (block, source_id) = self.inner.verification_queue.pop_first_block().unwrap(); - - debug_assert!(self - .inner - .pending_encoded_justifications - .as_slice() - .is_empty()); - self.inner.pending_encoded_justifications = block - .scale_encoded_justifications - .clone() - .into_iter() - .map(|(e, j)| (e, j, source_id)) - .collect::>() - .into_iter(); - - let outcome = match self - .chain - .verify_header(block.scale_encoded_header, now_from_unix_epoch) - { - Ok(blocks_tree::HeaderVerifySuccess::Verified { - verified_header, - is_new_best: true, - .. - }) => Ok(verified_header), - Ok( - blocks_tree::HeaderVerifySuccess::Duplicate - | blocks_tree::HeaderVerifySuccess::Verified { - is_new_best: false, .. - }, - ) => Err(ResetCause::NonCanonical), - Err(err) => Err(ResetCause::HeaderError(err)), - }; - - match outcome { - Ok(verified_header) => { - let new_best_hash = self.chain.best_block_hash(); - let new_best_number = self.chain.best_block_header().number; - - BlockVerification::NewBest { - success: BlockVerifySuccess { - parent: OptimisticSync { - inner: self.inner, - chain: self.chain, - }, - scale_encoded_extrinsics: block.scale_encoded_extrinsics, - verified_header, - scale_encoded_justifications: block.scale_encoded_justifications, - }, - new_best_hash, - new_best_number, - } - } - Err(reason) => { - self.inner.make_requests_obsolete(&self.chain); - - let previous_best_height = self.chain.best_block_header().number; - BlockVerification::Reset { - sync: OptimisticSync { - inner: self.inner, - chain: self.chain, - }, - previous_best_height, - reason, - } - } - } - } -} - -/// State of the processing of blocks. -pub enum BlockVerification { - /// An issue happened when verifying the block or its justification, resulting in resetting - /// the chain to the latest finalized block. - // TODO: unclear what happens to the block, are they kept? discarded? - Reset { - /// The state machine. - /// The [`OptimisticSync::process_one`] method takes ownership of the - /// [`OptimisticSync`]. This field yields it back. - sync: OptimisticSync, - - /// Height of the best block before the reset. - previous_best_height: u64, - - /// Problem that happened and caused the reset. - reason: ResetCause, - }, - - /// Processing of the block is over. - /// - /// There might be more blocks remaining. Call [`OptimisticSync::process_one`] again. - NewBest { - /// The state machine. - /// The [`OptimisticSync::process_one`] method takes ownership of the - /// [`OptimisticSync`]. This field yields it back. - success: BlockVerifySuccess, - - new_best_number: u64, - new_best_hash: [u8; 32], - }, -} - -/// Block verification successful. -/// -/// Internally holds the [`OptimisticSync`]. -pub struct BlockVerifySuccess { - parent: OptimisticSync, - verified_header: blocks_tree::VerifiedHeader, - scale_encoded_extrinsics: Vec>, - scale_encoded_justifications: Vec<([u8; 4], Vec)>, -} - -impl BlockVerifySuccess { - /// Returns the height of the block that was verified. - pub fn height(&self) -> u64 { - header::decode( - self.scale_encoded_header(), - self.parent.chain.block_number_bytes(), - ) - .unwrap() - .number - } - - /// Returns the hash of the block that was verified. - pub fn hash(&self) -> [u8; 32] { - header::hash_from_scale_encoded_header(self.scale_encoded_header()) - } - - /// Returns the list of SCALE-encoded extrinsics of the block to verify. - /// - /// This is `Some` if and only if [`Config::download_bodies`] is `true` - pub fn scale_encoded_extrinsics( - &'_ self, - ) -> Option + Clone + '_> + Clone + '_> { - if self.parent.inner.download_bodies { - Some(self.scale_encoded_extrinsics.iter()) - } else { - None - } - } - - /// Returns the hash of the parent of the block that was verified. - pub fn parent_hash(&self) -> &[u8; 32] { - header::decode( - self.scale_encoded_header(), - self.parent.chain.block_number_bytes(), - ) - .unwrap() - .parent_hash - } - - /// Returns the user data of the parent of the block to be verified, or `None` if the parent - /// is the finalized block. - pub fn parent_user_data(&self) -> Option<&TBl> { - let parent_hash = self.parent_hash(); - // TODO: optimize? - if self.parent.chain.contains_non_finalized_block(parent_hash) { - Some(&self.parent.chain[parent_hash].user_data) - } else { - None - } - } - - /// Returns the SCALE-encoded header of the block that was verified. - pub fn scale_encoded_header(&self) -> &[u8] { - self.verified_header.scale_encoded_header() - } - - /// Returns the SCALE-encoded header of the parent of the block. - pub fn parent_scale_encoded_header(&self) -> Vec { - // TODO: return &[u8] - self.parent - .chain - .best_block_header() - .scale_encoding_vec(self.parent.chain.block_number_bytes()) - } - - /// Cancel the block verification. - pub fn cancel(self) -> OptimisticSync { - self.parent - } - - /// Reject the block and mark it as bad. - pub fn reject_bad_block(mut self) -> OptimisticSync { - self.parent.inner.make_requests_obsolete(&self.parent.chain); - self.parent - } - - /// Finish inserting the block header. - pub fn finish(mut self, user_data: TBl) -> OptimisticSync { - // TODO: don't copy the header - let header = header::decode( - self.verified_header.scale_encoded_header(), - self.parent.chain.block_number_bytes(), - ) - .unwrap() - .into(); - - self.parent.chain.insert_verified_header( - self.verified_header, - Block { - header, - justifications: self.scale_encoded_justifications, - user_data, - full: None, - }, - ); - - self.parent - } -} - -/// Start the processing of a justification verification. -pub struct JustificationVerify { - inner: Box>, - chain: blocks_tree::NonFinalizedTree>, -} - -impl JustificationVerify { - /// Returns the source the justification was obtained from. - pub fn sender(&self) -> (SourceId, &TSrc) { - let (_, _, source_id) = self - .inner - .pending_encoded_justifications - .as_slice() - .first() - .unwrap(); - (*source_id, &self.inner.sources[source_id].user_data) - } - - /// Verify the justification. - /// - /// A randomness seed must be provided and will be used during the verification. Note that the - /// verification is nonetheless deterministic. - pub fn perform( - mut self, - randomness_seed: [u8; 32], - ) -> ( - OptimisticSync, - JustificationVerification, - ) { - let (consensus_engine_id, justification, _) = - self.inner.pending_encoded_justifications.next().unwrap(); - - let mut apply = match self.chain.verify_justification( - consensus_engine_id, - &justification, - randomness_seed, - ) { - Ok(a) => a, - Err(error) => { - let chain = blocks_tree::NonFinalizedTree::new( - self.inner.finalized_chain_information.clone(), - ); - - let inner = self.inner.with_requests_obsoleted(&chain); - let previous_best_height = chain.best_block_header().number; - return ( - OptimisticSync { chain, inner }, - JustificationVerification::Reset { - previous_best_height, - error, - }, - ); - } - }; - - assert!(apply.is_current_best_block()); // TODO: can legitimately fail in case of malicious node - - // As part of the finalization, put the justification in the chain that's - // going to be reported to the user. - apply - .block_user_data() - .justifications - .push((consensus_engine_id, justification)); - - // Applying the finalization and iterating over the now-finalized block. - let finalized_blocks_newest_to_oldest = apply - .apply() - .filter(|b| matches!(b.ty, blocks_tree::RemovedBlockType::Finalized)) - .map(|b| b.user_data) - .collect(); - - // Since the best block is now the finalized block, reset the storage - // diff. - debug_assert!(self.chain.is_empty()); - - self.inner.finalized_chain_information.chain_information = - self.chain.as_chain_information().into(); - - ( - OptimisticSync { - chain: self.chain, - inner: self.inner, - }, - JustificationVerification::Finalized { - finalized_blocks_newest_to_oldest, - }, - ) - } -} - -/// Outcome of the verification of a justification. -pub enum JustificationVerification { - /// An issue happened when verifying the justification, resulting in resetting the chain to - /// the latest finalized block. - Reset { - /// Height of the best block before the reset. - previous_best_height: u64, - - /// Problem that happened and caused the reset. - error: blocks_tree::JustificationVerifyError, - }, - - /// Processing of the justification is over. The best block has now been finalized. - /// - /// There might be more blocks remaining. Call [`OptimisticSync::process_one`] again. - Finalized { - /// Blocks that have been finalized, in decreasing block number. - finalized_blocks_newest_to_oldest: Vec>, - }, -} - -/// Request that should be emitted towards a certain source. -#[derive(Debug)] -pub struct RequestDetail { - /// Source where to request blocks from. - pub source_id: SourceId, - /// Height of the block to request. - pub block_height: NonZeroU64, - /// Number of blocks to request. This might be equal to `u32::max_value()` in case no upper - /// bound is required. The API user is responsible for clamping this value to a reasonable - /// limit. - pub num_blocks: NonZeroU32, -} - -pub enum FinishRequestOutcome { - Obsolete, - Queued, -} - -/// Iterator that drains requests after a source has been removed. -pub struct RequestsDrain<'a, TRq, TBl> { - iter: verification_queue::SourceDrain<'a, (RequestId, TRq), TBl>, -} - -impl<'a, TRq, TBl> Iterator for RequestsDrain<'a, TRq, TBl> { - type Item = (RequestId, TRq); - - fn next(&mut self) -> Option { - self.iter.next() - } - - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } -} - -impl<'a, TRq, TBl> fmt::Debug for RequestsDrain<'a, TRq, TBl> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("RequestsDrain").finish() - } -} - -impl<'a, TRq, TBl> Drop for RequestsDrain<'a, TRq, TBl> { - fn drop(&mut self) { - // Drain all remaining elements even if the iterator is dropped eagerly. - // This is the reason why a custom iterator type is needed. - for _ in self {} - } -} - -/// Problem that happened and caused the reset. -#[derive(Debug, derive_more::Display)] -pub enum ResetCause { - /// Error while decoding a header. - #[display(fmt = "Failed to decode header: {_0}")] - InvalidHeader(header::Error), - /// Error while verifying a header. - #[display(fmt = "{_0}")] - HeaderError(blocks_tree::HeaderVerifyError), - /// Received block isn't a child of the current best block. - NonCanonical, -} - -/// Output of [`OptimisticSync::disassemble`]. -#[derive(Debug)] -pub struct Disassemble { - /// Information about the latest finalized block and its ancestors. - pub chain_information: chain_information::ValidChainInformation, - - /// List of sources that were within the state machine. - pub sources: Vec>, - - /// List of the requests that were active. - pub requests: Vec<(RequestId, TRq)>, - // TODO: add non-finalized blocks? -} - -/// See [`Disassemble::sources`]. -#[derive(Debug)] -pub struct DisassembleSource { - /// Identifier that the source had. - pub id: SourceId, - - /// Opaque value passed to [`OptimisticSync::add_source`]. - pub user_data: TSrc, - - /// Best block that the source has reported having. - pub best_block_number: u64, -} diff --git a/lib/src/sync/optimistic/verification_queue.rs b/lib/src/sync/optimistic/verification_queue.rs deleted file mode 100644 index 025bf1e050..0000000000 --- a/lib/src/sync/optimistic/verification_queue.rs +++ /dev/null @@ -1,564 +0,0 @@ -// Smoldot -// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Implementation detail of the optimistic syncing. Provides a queue of block requests and blocks -//! that are ready to be verified. - -use alloc::collections::VecDeque; -use core::{ - cmp, fmt, iter, mem, - num::{NonZeroU32, NonZeroU64}, -}; -use itertools::Itertools as _; - -use super::SourceId; // TODO: ? - -/// Queue of block requests, either waiting to be started, in progress, or completed. -pub(super) struct VerificationQueue { - /// Actual queue. Never empty. - /// - /// Must always end with an entry of type [`VerificationQueueEntryTy::Missing`]. - /// - /// Must never contain two [`VerificationQueueEntryTy::Missing`] entries in a row. - verification_queue: VecDeque>, -} - -impl VerificationQueue { - /// Creates a new queue. - pub fn new(base_block_number: u64) -> Self { - let mut verification_queue = VecDeque::new(); - verification_queue.push_back(VerificationQueueEntry { - block_height: NonZeroU64::new(base_block_number).unwrap(), - ty: VerificationQueueEntryTy::Missing, - }); - - VerificationQueue { verification_queue } - } - - /// Returns true if the queue starts with ready blocks. - /// - /// This is equivalent to calling `is_some` on the `Option` returned by - /// [`VerificationQueue::first_block`]. - pub fn blocks_ready(&self) -> bool { - matches!( - self.verification_queue.front().unwrap().ty, - VerificationQueueEntryTy::Queued { .. } - ) - } - - /// If the queue starts with ready blocks, returns the first block that is ready. - /// - /// Returns `Some` if and only if [`VerificationQueue::blocks_ready`] returns `true`. - pub fn first_block(&self) -> Option<&TBl> { - match &self.verification_queue.front().unwrap().ty { - VerificationQueueEntryTy::Queued { blocks, .. } => Some(blocks.front().unwrap()), - _ => None, - } - } - - /// If the queue starts with ready blocks, returns the first block that is ready and removes - /// it. - /// - /// Returns `Some` if and only if [`VerificationQueue::blocks_ready`] returns `true`. - pub fn pop_first_block(&mut self) -> Option<(TBl, SourceId)> { - let verif_queue_front = self.verification_queue.get_mut(0).unwrap(); - - let block; - let blocks_now_empty; - let source_id; - - match &mut verif_queue_front.ty { - VerificationQueueEntryTy::Queued { blocks, source } => { - block = blocks.pop_front().unwrap(); - blocks_now_empty = blocks.is_empty(); - source_id = *source; - } - _ => return None, - }; - - verif_queue_front.block_height = verif_queue_front.block_height.checked_add(1).unwrap(); - - if blocks_now_empty { - self.verification_queue.pop_front().unwrap(); - debug_assert!(!self.verification_queue.is_empty()); - debug_assert!(matches!( - self.verification_queue.back().unwrap().ty, - VerificationQueueEntryTy::Missing - )); - } - - Some((block, source_id)) - } - - /// Returns the list of ranges of blocks in the queue that need to be requested, as tuples of - /// `(block height, number of blocks)`. - /// - /// Use [`VerificationQueue::insert_request`] to update the queue with a request, so that this - /// function no longer returns it. - /// - /// Must be passed the highest number of blocks between the first block queued in this queue - /// and the highest block in the requests. - pub fn desired_requests( - &'_ self, - download_ahead_blocks: NonZeroU32, - ) -> impl Iterator + '_ { - // Highest block number to request. - let max_block_number = self.verification_queue.front().unwrap().block_height.get() - + u64::from(download_ahead_blocks.get()); - - let iter1 = self - .verification_queue - .iter() - .tuple_windows::<(_, _)>() - .filter(|(e, _)| matches!(e.ty, VerificationQueueEntryTy::Missing)) - .filter(move |(entry, _)| entry.block_height.get() <= max_block_number) - .map(move |(entry, next_entry)| { - let max = cmp::min(max_block_number, next_entry.block_height.get()); - ( - entry.block_height, - NonZeroU32::new(u32::try_from(max - entry.block_height.get()).unwrap()) - .unwrap(), - ) - }); - - let verif_queue_last = self.verification_queue.back().unwrap(); - let iter2 = if verif_queue_last.block_height.get() < max_block_number { - either::Left(iter::once(( - verif_queue_last.block_height, - NonZeroU32::new(u32::max_value()).unwrap(), - ))) - } else { - either::Right(iter::empty()) - }; - - iter1.chain(iter2) - } - - /// Updates the queue with the fact that a request has been started. - /// - /// Returns `Ok` if the request has updated the queue, and `Err` if the request isn't relevant - /// to anything in the queue and has been silently discarded. - pub fn insert_request( - &mut self, - block_height: NonZeroU64, - num_blocks: NonZeroU32, - source: SourceId, - user_data: TRq, - ) -> Result<(), TRq> { - debug_assert!(!self.verification_queue.is_empty()); - - // Find the entry where the request can be inserted. - let insert_pos = { - let pos_after = self - .verification_queue - .iter() - .position(|entry| entry.block_height > block_height); - match pos_after { - Some(0) => return Err(user_data), - Some(n) => n - 1, - None => self.verification_queue.len() - 1, - } - }; - debug_assert!(self.verification_queue[insert_pos].block_height <= block_height); - - // If the entry concerned by this request has already been started (or finished), discard - // it. It is possible that the end of the newly-started request overlaps with a `Missing` - // entry later in the queue, but this situation is explicitly not handled. - if !matches!( - self.verification_queue[insert_pos].ty, - VerificationQueueEntryTy::Missing - ) { - return Err(user_data); - } - - // If `block_height` doesn't exactly match an entry in the queue, insert a new one - // in-between. This might update `insert_pos`. - // This temporarily cause the queue to have two `Missing` entries in a row. - let insert_pos = { - let insert_pos_block_height = self.verification_queue[insert_pos].block_height; - if insert_pos_block_height < block_height { - self.verification_queue[insert_pos].block_height = block_height; - - self.verification_queue.insert( - insert_pos, - VerificationQueueEntry { - block_height: insert_pos_block_height, - ty: VerificationQueueEntryTy::Missing, - }, - ); - - insert_pos + 1 - } else { - insert_pos - } - }; - - // Now update the state of the queue. - self.verification_queue[insert_pos].ty = - VerificationQueueEntryTy::Requested { source, user_data }; - - // Check that there's no two "Missing" in a row. - debug_assert!(!self - .verification_queue - .iter() - .tuple_windows::<(_, _)>() - .any(|(a, b)| matches!(a.ty, VerificationQueueEntryTy::Missing) - && matches!(b.ty, VerificationQueueEntryTy::Missing))); - - // `verification_queue` must always end with an entry of type `Missing`. Add it, if - // necessary. - if insert_pos == self.verification_queue.len() - 1 { - self.verification_queue.push_back(VerificationQueueEntry { - block_height: block_height - .checked_add(u64::from(num_blocks.get())) - .unwrap(), - ty: VerificationQueueEntryTy::Missing, - }); - } - debug_assert!(matches!( - self.verification_queue.back().unwrap().ty, - VerificationQueueEntryTy::Missing - )); - - // If `num_blocks` is < gap between `insert_pos` and `insert_pos + 1`, we have to either - // adjust `insert_pos + 1` or insert an entry in between. - // - // Note that the case where `num_blocks` is strictly superior to the distance to the next - // entry isn't handled. The worst that can happen is the same blocks being requested - // multiple times. - debug_assert!(self.verification_queue.get(insert_pos + 1).is_some()); - match (self.verification_queue[insert_pos + 1].block_height.get() - block_height.get()) - .checked_sub(u64::from(num_blocks.get())) - { - Some(0) => {} - Some(n) => { - if matches!( - self.verification_queue[insert_pos + 1].ty, - VerificationQueueEntryTy::Missing - ) { - self.verification_queue[insert_pos + 1].block_height = NonZeroU64::new( - self.verification_queue[insert_pos + 1].block_height.get() - n, - ) - .unwrap(); - } else { - self.verification_queue.insert( - insert_pos + 1, - VerificationQueueEntry { - block_height: block_height.checked_add(n).unwrap(), - ty: VerificationQueueEntryTy::Missing, - }, - ); - } - } - None => unreachable!(), - } - - // Check again the internal state of the queue. - debug_assert!(!self - .verification_queue - .iter() - .tuple_windows::<(_, _)>() - .any(|(a, b)| matches!(a.ty, VerificationQueueEntryTy::Missing) - && matches!(b.ty, VerificationQueueEntryTy::Missing))); - debug_assert!(matches!( - self.verification_queue.back().unwrap().ty, - VerificationQueueEntryTy::Missing - )); - - Ok(()) - } - - /// Marks a request previously inserted with [`VerificationQueue::insert_request`] as done. - /// - /// The `request_find` closure is used to find which request is concerned. - /// - /// The number of blocks in the result doesn't have to match the number of blocks that was - /// passed to [`VerificationQueue::insert_request`]. - /// - /// # Panic - /// - /// Panics if no request could be found. - /// - pub fn finish_request( - &mut self, - request_find: impl Fn(&TRq) -> bool, - result: Result, ()>, - ) -> (TRq, SourceId) { - // Find the position of that request in the queue. - let (index, source_id) = self - .verification_queue - .iter() - .enumerate() - .find_map(|(index, entry)| match &entry.ty { - VerificationQueueEntryTy::Requested { - source, user_data, .. - } if request_find(user_data) => Some((index, *source)), - _ => None, - }) - .unwrap(); - - let prev_value; - if let Ok(blocks) = result { - let gap_with_next = self.verification_queue[index + 1].block_height.get() - - self.verification_queue[index].block_height.get(); - - let blocks: VecDeque<_> = blocks - .take(usize::try_from(gap_with_next).unwrap_or(usize::max_value())) - .collect(); - let num_blocks = blocks.len(); - - prev_value = mem::replace( - &mut self.verification_queue[index].ty, - VerificationQueueEntryTy::Queued { - source: source_id, - blocks, - }, - ); - - // If `num_blocks` is < gap between `index` and `index + 1`, we have to either adjust - // `index + 1` or insert an entry in between. - match gap_with_next.checked_sub(u64::try_from(num_blocks).unwrap()) { - Some(0) => {} - Some(n) => { - if matches!( - self.verification_queue[index + 1].ty, - VerificationQueueEntryTy::Missing - ) { - self.verification_queue[index + 1].block_height = NonZeroU64::new( - self.verification_queue[index + 1].block_height.get() - n, - ) - .unwrap(); - } else { - self.verification_queue.insert( - index + 1, - VerificationQueueEntry { - block_height: self.verification_queue[index] - .block_height - .checked_add(n) - .unwrap(), - ty: VerificationQueueEntryTy::Missing, - }, - ); - } - } - None => unreachable!(), - } - - // We just put a `Queued` at `index`. If `index` is the last element in the list, add a - // `Missing` at the end. - if index == self.verification_queue.len() - 1 { - let back = self.verification_queue.back().unwrap(); - let next_block_height = back - .block_height - .checked_add( - u64::try_from(match &back.ty { - VerificationQueueEntryTy::Queued { blocks, .. } => blocks.len(), - _ => unreachable!(), - }) - .unwrap(), - ) - .unwrap(); - self.verification_queue.push_back(VerificationQueueEntry { - block_height: next_block_height, - ty: VerificationQueueEntryTy::Missing, - }); - } - } else { - prev_value = mem::replace( - &mut self.verification_queue[index].ty, - VerificationQueueEntryTy::Missing, - ); - - // We just put a `Missing` at `index`. If there is a `Missing` immediately following - // (i.e. at `index + 1`), then merge the two. - if matches!( - self.verification_queue[index + 1].ty, - VerificationQueueEntryTy::Missing - ) { - // Check that `index + 2` isn't also `Missing`. - debug_assert!(self - .verification_queue - .get(index + 2) - .map_or(true, |e| !matches!(e.ty, VerificationQueueEntryTy::Missing))); - - self.verification_queue.remove(index + 1); - } - - // We must also check whether `index - 1` was not also `Missing`. - // This is done after having checking `index + 1`, otherwise the indices would be - // wrong. - if index != 0 - && matches!( - self.verification_queue[index - 1].ty, - VerificationQueueEntryTy::Missing - ) - { - self.verification_queue.remove(index); - } - }; - - // Check the internal state of the queue. - debug_assert!(!self - .verification_queue - .iter() - .tuple_windows::<(_, _)>() - .any(|(a, b)| matches!(a.ty, VerificationQueueEntryTy::Missing) - && matches!(b.ty, VerificationQueueEntryTy::Missing))); - debug_assert!(self - .verification_queue - .back() - .map_or(true, |e| matches!(e.ty, VerificationQueueEntryTy::Missing))); - - ( - match prev_value { - VerificationQueueEntryTy::Requested { user_data, .. } => user_data, - _ => unreachable!(), - }, - source_id, - ) - } - - /// Returns an iterator to all the requests that are inside of the queue. - pub fn requests(&'_ self) -> impl Iterator + '_ { - self.verification_queue.iter().filter_map(|queue_elem| { - if let VerificationQueueEntryTy::Requested { user_data, source } = &queue_elem.ty { - Some((user_data, *source)) - } else { - None - } - }) - } - - /// Consumes the queue and returns an iterator to all the requests that were inside of it. - pub fn into_requests(self) -> impl Iterator { - self.verification_queue - .into_iter() - .filter_map(|queue_elem| { - if let VerificationQueueEntryTy::Requested { user_data, source } = queue_elem.ty { - Some((user_data, source)) - } else { - None - } - }) - } - - /// Returns an iterator that removes from the queue all requests belonging to a certain source. - pub fn drain_source(&'_ mut self, source_id: SourceId) -> SourceDrain<'_, TRq, TBl> { - SourceDrain { - queue: self, - source_id, - } - } - - /// Returns the number of ongoing requests that concern this source. - pub fn source_num_ongoing_requests(&self, source_id: SourceId) -> usize { - self.verification_queue - .iter() - .filter(|elem| matches!(elem.ty, VerificationQueueEntryTy::Requested { source, .. } if source == source_id)) - .count() - } -} - -/// See [`VerificationQueue::drain_source`]. -pub(super) struct SourceDrain<'a, TRq, TBl> { - queue: &'a mut VerificationQueue, - source_id: SourceId, -} - -impl<'a, TRq, TBl> Iterator for SourceDrain<'a, TRq, TBl> { - type Item = TRq; - - fn next(&mut self) -> Option { - // TODO: unoptimized - let source_id = self.source_id; - let entry = self.queue.verification_queue.iter_mut().find(|entry| { - matches!(entry.ty, - VerificationQueueEntryTy::Requested { source, .. } if source == source_id) - })?; - - match mem::replace(&mut entry.ty, VerificationQueueEntryTy::Missing) { - VerificationQueueEntryTy::Requested { user_data, .. } => Some(user_data), - _ => unreachable!(), - } - } - - fn size_hint(&self) -> (usize, Option) { - (0, Some(self.queue.verification_queue.len())) - } -} - -impl<'a, TRq, TBl> fmt::Debug for SourceDrain<'a, TRq, TBl> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("SourceDrain").finish() - } -} - -impl<'a, TRq, TBl> Drop for SourceDrain<'a, TRq, TBl> { - fn drop(&mut self) { - // At a conclusion to the iteration, we merge consecutive `Missing` entries of the queue. - // Note: it is possible for this destructor to not run if the user `mem::forget`s the - // iterator, which could leave the collection in an inconsistent state. Since no unsafety - // is involved anymore, we don't care about this problem. - for index in (1..self.queue.verification_queue.len()).rev() { - if matches!( - self.queue.verification_queue[index].ty, - VerificationQueueEntryTy::Missing - ) && matches!( - self.queue.verification_queue[index - 1].ty, - VerificationQueueEntryTy::Missing - ) { - self.queue.verification_queue.remove(index); - } - } - - // Check the internal state of the queue. - debug_assert!(!self - .queue - .verification_queue - .iter() - .tuple_windows::<(_, _)>() - .any(|(a, b)| matches!(a.ty, VerificationQueueEntryTy::Missing) - && matches!(b.ty, VerificationQueueEntryTy::Missing))); - debug_assert!(self - .queue - .verification_queue - .back() - .map_or(true, |e| matches!(e.ty, VerificationQueueEntryTy::Missing))); - } -} - -struct VerificationQueueEntry { - block_height: NonZeroU64, - ty: VerificationQueueEntryTy, -} - -enum VerificationQueueEntryTy { - Missing, - Requested { - /// User-chosen data for this request. - user_data: TRq, - // Index of this source within [`OptimisticSyncInner::sources`]. - source: SourceId, - }, - Queued { - source: SourceId, - /// Must never be empty. - blocks: VecDeque, - }, -} - -// TODO: tests