diff --git a/full-node/src/consensus_service.rs b/full-node/src/consensus_service.rs index b7a9ca272e..95b7cbb0b2 100644 --- a/full-node/src/consensus_service.rs +++ b/full-node/src/consensus_service.rs @@ -35,6 +35,7 @@ use futures_util::{ SinkExt as _, StreamExt as _, }; use hashbrown::HashSet; +use rand::seq::IteratorRandom; use smol::lock::Mutex; use smoldot::{ author, @@ -59,7 +60,7 @@ use std::{ num::{NonZeroU64, NonZeroUsize}, pin::Pin, sync::Arc, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + time::{Duration, Instant, SystemTime}, }; /// Configuration for a [`ConsensusService`]. @@ -311,7 +312,7 @@ impl ConsensusService { // the chain and the machine of the user. NonZeroU32::new(2000).unwrap() }, - full_mode: true, + download_bodies: true, code_trie_node_hint: None, }); @@ -351,6 +352,9 @@ impl ConsensusService { pending_notification: None, from_network_service: config.network_events_receiver, database: config.database, + database_catch_up_download: DatabaseCatchUpDownload::NoDownloadInProgress, + database_catch_up_download_block_verification: + DatabaseCatchUpDownloadBlockVerification::None, peers_source_id_map: Default::default(), sub_tasks: FuturesUnordered::new(), log_callback: config.log_callback, @@ -672,6 +676,14 @@ struct SyncBackground { /// See [`Config::database`]. database: Arc, + /// Whether an old block or storage item from an old block is currently being downloaded or + /// must be downloaded. + database_catch_up_download: DatabaseCatchUpDownload, + + /// Whether an old block or storage item from an old block is currently being downloaded or + /// must be downloaded. + database_catch_up_download_block_verification: DatabaseCatchUpDownloadBlockVerification, + /// How to report events about blocks. jaeger_service: Arc, } @@ -724,6 +736,36 @@ enum SubtaskFinished { }, } +#[derive(Debug, Clone)] +enum DatabaseCatchUpDownload { + /// No download currently in progress. + NoDownloadInProgress, + /// No download currently in progress, and we know that nothing is missing from the database. + NothingToDownloadCache, + /// Currently downloading. + InProgress(all::RequestId), +} + +#[derive(Debug, Clone)] +enum DatabaseCatchUpDownloadBlockVerification { + /// No download is required. + None, + /// A call proof download should be started next. + CallProofDesired { + block_hash: [u8; 32], + block_number: u64, + function_name: String, + parameter: Vec, + }, + /// A storage proof download should be started next. + CodeStorageProofDesired { + block_hash: [u8; 32], + block_number: u64, + }, + /// Currently downloading. + InProgress(all::RequestId), +} + impl SyncBackground { async fn run(mut self) { let mut process_sync = true; @@ -734,7 +776,11 @@ impl SyncBackground { FrontendEvent(ToBackground), FrontendClosed, SendPendingNotification(Notification), - StartNetworkRequest(all::SourceId, all::DesiredRequest), + StartNetworkRequest { + source_id: all::SourceId, + request: all::DesiredRequest, + database_catch_up_type: DbCatchUpType, + }, NetworkEvent(network_service::Event), NetworkLocalChainUpdate, AnnounceBlock(Vec, [u8; 32], u64), @@ -742,12 +788,20 @@ impl SyncBackground { SyncProcess, } + enum DbCatchUpType { + No, + BlockVerification, + Database, + } + let wake_up_reason: WakeUpReason = { // Creating the block authoring state and prepare a future that is ready when something // related to the block authoring is ready. // TODO: refactor as a separate task? + // TODO: restore block authoring after https://github.com/smol-dot/smoldot/issues/1109 let authoring_ready_future = { - // TODO: overhead to call best_block_consensus() multiple times + future::pending::() + /*// TODO: overhead to call best_block_consensus() multiple times let local_authorities = { let namespace_filter = match self.sync.best_block_consensus() { chain_information::ChainInformationConsensusRef::Aura { .. } => { @@ -832,7 +886,7 @@ impl SyncBackground { delay, ))) } - } + }*/ }; async { @@ -879,6 +933,8 @@ impl SyncBackground { .or({ async { // TODO: handle obsolete requests + // Ask the sync state machine whether any new network request should + // be started. // `desired_requests()` returns, in decreasing order of priority, the // requests that should be started in order for the syncing to proceed. We // simply pick the first request, but enforce one ongoing request per @@ -923,18 +979,208 @@ impl SyncBackground { } }, ); + if let Some((source_id, _, request)) = request_to_start { + return WakeUpReason::StartNetworkRequest { + source_id, + request, + database_catch_up_type: DbCatchUpType::No, + }; + } - let Some((source_id, _, request_info)) = request_to_start else { - future::pending().await - }; - WakeUpReason::StartNetworkRequest(source_id, request_info) + match self.database_catch_up_download_block_verification.clone() { + _ if !matches!( + self.database_catch_up_download, + DatabaseCatchUpDownload::NoDownloadInProgress + | DatabaseCatchUpDownload::NothingToDownloadCache + ) => {} + DatabaseCatchUpDownloadBlockVerification::None => {} + DatabaseCatchUpDownloadBlockVerification::InProgress(_) => {} + DatabaseCatchUpDownloadBlockVerification::CallProofDesired { + block_hash, + block_number, + function_name, + parameter, + } => { + // Choose which source to query. We have to use an `if` because + // `knows_non_finalized_block` panics if the parameter is inferior + // or equal to the finalized block number. + let source_id = if block_number + <= self.sync.finalized_block_header().number + { + self.sync + .sources() + .filter(|s| *s != self.block_author_sync_source) + .choose(&mut rand::thread_rng()) + } else { + self.sync + .knows_non_finalized_block(block_number, &block_hash) + .filter(|source_id| { + *source_id != self.block_author_sync_source + && self.sync.source_num_ongoing_requests(*source_id) + == 0 + }) + .choose(&mut rand::thread_rng()) + }; + + if let Some(source_id) = source_id { + return WakeUpReason::StartNetworkRequest { + source_id, + request: all::DesiredRequest::RuntimeCallMerkleProof { + block_hash, + function_name: function_name.into(), + parameter_vectored: parameter.into(), + }, + database_catch_up_type: DbCatchUpType::BlockVerification, + }; + } + } + DatabaseCatchUpDownloadBlockVerification::CodeStorageProofDesired { + block_hash, + block_number, + } => { + // Choose which source to query. We have to use an `if` because + // `knows_non_finalized_block` panics if the parameter is inferior + // or equal to the finalized block number. + let source_id = if block_number + <= self.sync.finalized_block_header().number + { + self.sync + .sources() + .filter(|s| *s != self.block_author_sync_source) + .choose(&mut rand::thread_rng()) + } else { + self.sync + .knows_non_finalized_block(block_number, &block_hash) + .filter(|source_id| { + *source_id != self.block_author_sync_source + && self.sync.source_num_ongoing_requests(*source_id) + == 0 + }) + .choose(&mut rand::thread_rng()) + }; + + if let Some(source_id) = source_id { + return WakeUpReason::StartNetworkRequest { + source_id, + request: all::DesiredRequest::StorageGetMerkleProof { + block_hash, + state_trie_root: [0; 32], // TODO: wrong, but field value unused so it's fine temporarily + keys: vec![":code".into(), ":heappages".into()], + }, + database_catch_up_type: DbCatchUpType::BlockVerification, + }; + } + } + } + + // If the sync state machine doesn't require any additional request, ask + // the database whether any storage item is missing. + if matches!( + self.database_catch_up_download, + DatabaseCatchUpDownload::NoDownloadInProgress + ) { + // TODO: this has a O(n^2) complexity; in case all sources are busy, we iterate a lot + let missing_items = self + .database + .with_database(|db| { + db.finalized_and_above_missing_trie_nodes_unordered() + }) + .await + .unwrap(); + if missing_items.is_empty() { + self.database_catch_up_download = + DatabaseCatchUpDownload::NothingToDownloadCache; + } + + for missing_item in missing_items + .into_iter() + .flat_map(|item| item.blocks.into_iter()) + { + // Since the database and sync state machine are supposed to have the + // same finalized block, it is guaranteed that the missing item are + // in the finalized block or above. + debug_assert!( + missing_item.number + >= self.sync.finalized_block_header().number + ); + + // Choose which source to query. We have to use an `if` because + // `knows_non_finalized_block` panics if the parameter is inferior + // or equal to the finalized block number. + let source_id = if missing_item.number + <= self.sync.finalized_block_header().number + { + let Some(source_id) = self + .sync + .sources() + .filter(|s| *s != self.block_author_sync_source) + .choose(&mut rand::thread_rng()) + else { + break; + }; + source_id + } else { + let Some(source_id) = self + .sync + .knows_non_finalized_block( + missing_item.number, + &missing_item.hash, + ) + .filter(|source_id| { + *source_id != self.block_author_sync_source + && self.sync.source_num_ongoing_requests(*source_id) + == 0 + }) + .choose(&mut rand::thread_rng()) + else { + continue; + }; + source_id + }; + + return WakeUpReason::StartNetworkRequest { + source_id, + request: all::DesiredRequest::StorageGetMerkleProof { + block_hash: missing_item.hash, + state_trie_root: [0; 32], // TODO: wrong, but field value unused so it's fine temporarily + keys: vec![trie::nibbles_to_bytes_suffix_extend( + missing_item + .trie_node_key_nibbles + .into_iter() + // In order to download more than one item at a time, + // we add some randomly-generated nibbles to the + // requested key. The request will target the missing + // key plus a few other random keys. + .chain((0..32).map(|_| { + rand::Rng::gen_range( + &mut rand::thread_rng(), + 0..16, + ) + })) + .map(|n| trie::Nibble::try_from(n).unwrap()), + ) + .collect::>()], + }, + database_catch_up_type: DbCatchUpType::Database, + }; + } + } + + // No network request to start. + future::pending().await } }) - .or(async { - if !process_sync { - future::pending().await + .or({ + let is_downloading = matches!( + self.database_catch_up_download_block_verification, + DatabaseCatchUpDownloadBlockVerification::None + ); + async move { + if !process_sync || !is_downloading { + future::pending().await + } + WakeUpReason::SyncProcess } - WakeUpReason::SyncProcess }) .await }; @@ -1216,10 +1462,13 @@ impl SyncBackground { // Different chain index. } - WakeUpReason::StartNetworkRequest( + WakeUpReason::StartNetworkRequest { source_id, - request_info @ all::DesiredRequest::BlocksRequest { .. }, - ) if source_id == self.block_author_sync_source => { + request: request_info @ all::DesiredRequest::BlocksRequest { .. }, + database_catch_up_type, + } if source_id == self.block_author_sync_source => { + debug_assert!(matches!(database_catch_up_type, DbCatchUpType::No)); + self.log_callback.log( LogLevel::Debug, "queue-locally-authored-block-for-import".to_string(), @@ -1232,7 +1481,6 @@ impl SyncBackground { // Create a request that is immediately answered right below. let request_id = self.sync.add_request(source_id, request_info.into(), ()); - // TODO: announce the block on the network, but only after it's been imported self.sync.blocks_request_response( request_id, @@ -1245,18 +1493,20 @@ impl SyncBackground { ); } - WakeUpReason::StartNetworkRequest( + WakeUpReason::StartNetworkRequest { source_id, - all::DesiredRequest::BlocksRequest { - first_block_hash, - first_block_height, - ascending, - num_blocks, - request_headers, - request_bodies, - request_justification, - }, - ) => { + request: + all::DesiredRequest::BlocksRequest { + first_block_hash, + first_block_height, + ascending, + num_blocks, + request_headers, + request_bodies, + request_justification, + }, + database_catch_up_type, + } => { // Before notifying the syncing of the request, clamp the number of blocks to // the number of blocks we expect to receive. let num_blocks = NonZeroU64::new(cmp::min(num_blocks.get(), 64)).unwrap(); @@ -1289,7 +1539,7 @@ impl SyncBackground { network::codec::BlocksRequestDirection::Descending }, fields: network::codec::BlocksRequestFields { - header: request_headers, + header: true, // TODO: always set to true due to unwrapping the header when the response comes body: request_bodies, justifications: request_justification, }, @@ -1311,6 +1561,22 @@ impl SyncBackground { (), ); + match database_catch_up_type { + DbCatchUpType::No => {} + DbCatchUpType::Database => { + debug_assert!(matches!( + self.database_catch_up_download, + DatabaseCatchUpDownload::NoDownloadInProgress + )); + self.database_catch_up_download = + DatabaseCatchUpDownload::InProgress(request_id); + } + DbCatchUpType::BlockVerification => { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::InProgress(request_id); + } + } + self.sub_tasks.push(Box::pin(async move { let result = request.await; SubtaskFinished::BlocksRequestFinished { @@ -1321,12 +1587,14 @@ impl SyncBackground { })); } - WakeUpReason::StartNetworkRequest( + WakeUpReason::StartNetworkRequest { source_id, - all::DesiredRequest::WarpSync { - sync_start_block_hash, - }, - ) => { + request: + all::DesiredRequest::WarpSync { + sync_start_block_hash, + }, + database_catch_up_type, + } => { // TODO: don't unwrap? could this target the virtual sync source? let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); // TODO: why does this require cloning? weird borrow chk issue @@ -1344,6 +1612,22 @@ impl SyncBackground { (), ); + match database_catch_up_type { + DbCatchUpType::No => {} + DbCatchUpType::Database => { + debug_assert!(matches!( + self.database_catch_up_download, + DatabaseCatchUpDownload::NoDownloadInProgress + )); + self.database_catch_up_download = + DatabaseCatchUpDownload::InProgress(request_id); + } + DbCatchUpType::BlockVerification => { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::InProgress(request_id); + } + } + self.sub_tasks.push(Box::pin(async move { let result = request.await; SubtaskFinished::WarpSyncRequestFinished { @@ -1354,12 +1638,14 @@ impl SyncBackground { })); } - WakeUpReason::StartNetworkRequest( + WakeUpReason::StartNetworkRequest { source_id, - all::DesiredRequest::StorageGetMerkleProof { - block_hash, keys, .. - }, - ) => { + request: + all::DesiredRequest::StorageGetMerkleProof { + block_hash, keys, .. + }, + database_catch_up_type, + } => { // TODO: don't unwrap? could this target the virtual sync source? let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); // TODO: why does this require cloning? weird borrow chk issue @@ -1378,6 +1664,22 @@ impl SyncBackground { (), ); + match database_catch_up_type { + DbCatchUpType::No => {} + DbCatchUpType::Database => { + debug_assert!(matches!( + self.database_catch_up_download, + DatabaseCatchUpDownload::NoDownloadInProgress + )); + self.database_catch_up_download = + DatabaseCatchUpDownload::InProgress(request_id); + } + DbCatchUpType::BlockVerification => { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::InProgress(request_id); + } + } + self.sub_tasks.push(Box::pin(async move { let result = request.await; SubtaskFinished::StorageRequestFinished { @@ -1388,14 +1690,16 @@ impl SyncBackground { })); } - WakeUpReason::StartNetworkRequest( + WakeUpReason::StartNetworkRequest { source_id, - all::DesiredRequest::RuntimeCallMerkleProof { - block_hash, - function_name, - parameter_vectored, - }, - ) => { + request: + all::DesiredRequest::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, + database_catch_up_type, + } => { // TODO: don't unwrap? could this target the virtual sync source? let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); // TODO: why does this require cloning? weird borrow chk issue @@ -1419,6 +1723,22 @@ impl SyncBackground { (), ); + match database_catch_up_type { + DbCatchUpType::No => {} + DbCatchUpType::Database => { + debug_assert!(matches!( + self.database_catch_up_download, + DatabaseCatchUpDownload::NoDownloadInProgress + )); + self.database_catch_up_download = + DatabaseCatchUpDownload::InProgress(request_id); + } + DbCatchUpType::BlockVerification => { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::InProgress(request_id); + } + } + self.sub_tasks.push(Box::pin(async move { let result = request.await; SubtaskFinished::CallProofRequestFinished { @@ -1434,6 +1754,19 @@ impl SyncBackground { source_id, result: Ok(blocks), }) => { + if matches!(self.database_catch_up_download, DatabaseCatchUpDownload::InProgress(r) if r == request_id) + { + self.database_catch_up_download = + DatabaseCatchUpDownload::NoDownloadInProgress; + } + if matches!(self.database_catch_up_download_block_verification, DatabaseCatchUpDownloadBlockVerification::InProgress(r) if r == request_id) + { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::None; + } + + // TODO: insert blocks in database if they are referenced through a parent_hash? + let _ = self.sync.blocks_request_response( request_id, Ok(blocks @@ -1477,6 +1810,17 @@ impl SyncBackground { source_id, result: Err(_), }) => { + if matches!(self.database_catch_up_download, DatabaseCatchUpDownload::InProgress(r) if r == request_id) + { + self.database_catch_up_download = + DatabaseCatchUpDownload::NoDownloadInProgress; + } + if matches!(self.database_catch_up_download_block_verification, DatabaseCatchUpDownloadBlockVerification::InProgress(r) if r == request_id) + { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::None; + } + // Note that we perform the ban even if the source is now disconnected. let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); self.network_service @@ -1515,6 +1859,17 @@ impl SyncBackground { source_id, result: Ok(result), }) => { + if matches!(self.database_catch_up_download, DatabaseCatchUpDownload::InProgress(r) if r == request_id) + { + self.database_catch_up_download = + DatabaseCatchUpDownload::NoDownloadInProgress; + } + if matches!(self.database_catch_up_download_block_verification, DatabaseCatchUpDownloadBlockVerification::InProgress(r) if r == request_id) + { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::None; + } + let decoded = result.decode(); let fragments = decoded .fragments @@ -1553,6 +1908,17 @@ impl SyncBackground { source_id, result: Err(_), }) => { + if matches!(self.database_catch_up_download, DatabaseCatchUpDownload::InProgress(r) if r == request_id) + { + self.database_catch_up_download = + DatabaseCatchUpDownload::NoDownloadInProgress; + } + if matches!(self.database_catch_up_download_block_verification, DatabaseCatchUpDownloadBlockVerification::InProgress(r) if r == request_id) + { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::None; + } + // Note that we perform the ban even if the source is now disconnected. let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); self.network_service @@ -1589,6 +1955,69 @@ impl SyncBackground { source_id, result, }) => { + if matches!(self.database_catch_up_download, DatabaseCatchUpDownload::InProgress(r) if r == request_id) + { + self.database_catch_up_download = + DatabaseCatchUpDownload::NoDownloadInProgress; + } + if matches!(self.database_catch_up_download_block_verification, DatabaseCatchUpDownloadBlockVerification::InProgress(r) if r == request_id) + { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::None; + } + + if let Ok(result) = &result { + let result = result.clone(); + self.database + .with_database(move |database| { + if let Ok(decoded) = trie::proof_decode::decode_and_verify_proof( + trie::proof_decode::Config { + proof: result.decode(), + }, + ) { + for (_, entry) in decoded.iter_ordered() { + // TODO: check the state root hash; while this can't lead to a vulnerability, it can bloat the database + database.insert_trie_nodes( + iter::once(full_sqlite::InsertTrieNode { + merkle_value: Cow::Borrowed(entry.merkle_value), + partial_key_nibbles: Cow::Owned(entry.partial_key_nibbles.into_iter().map(|n| u8::from(n)).collect()), + children_merkle_values: std::array::from_fn(|n| entry.trie_node_info.children.child(trie::Nibble::try_from(u8::try_from(n).unwrap()).unwrap()).merkle_value().map(Cow::Borrowed)), + storage_value: match entry.trie_node_info.storage_value { + trie::proof_decode::StorageValue::HashKnownValueMissing( + _, + ) => return, + trie::proof_decode::StorageValue::None => { + full_sqlite::InsertTrieNodeStorageValue::NoValue + } + trie::proof_decode::StorageValue::Known { + value, .. + } => full_sqlite::InsertTrieNodeStorageValue::Value { + value: Cow::Borrowed(value), + references_merkle_value: false, // TODO: + }, + }, + }), + match entry.trie_node_info.storage_value { + trie::proof_decode::StorageValue::None => 0, // TODO: ?! + trie::proof_decode::StorageValue::HashKnownValueMissing( + .. + ) => return, + trie::proof_decode::StorageValue::Known { + inline: true, + .. + } => 0, + trie::proof_decode::StorageValue::Known { + inline: false, + .. + } => 1, + }, + ).unwrap(); + } + } + }) + .await; + } + if result.is_err() { // Note that we perform the ban even if the source is now disconnected. let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); @@ -1629,6 +2058,70 @@ impl SyncBackground { source_id, result, }) => { + if matches!(self.database_catch_up_download, DatabaseCatchUpDownload::InProgress(r) if r == request_id) + { + self.database_catch_up_download = + DatabaseCatchUpDownload::NoDownloadInProgress; + } + if matches!(self.database_catch_up_download_block_verification, DatabaseCatchUpDownloadBlockVerification::InProgress(r) if r == request_id) + { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::None; + } + + // TODO: DRY with above + if let Ok(result) = &result { + let result = result.clone(); + self.database + .with_database(move |database| { + if let Ok(decoded) = trie::proof_decode::decode_and_verify_proof( + trie::proof_decode::Config { + proof: result.decode(), + }, + ) { + for (_, entry) in decoded.iter_ordered() { + // TODO: check the state root hash; while this can't lead to a vulnerability, it can bloat the database + database.insert_trie_nodes( + iter::once(full_sqlite::InsertTrieNode { + merkle_value: Cow::Borrowed(entry.merkle_value), + partial_key_nibbles: Cow::Owned(entry.partial_key_nibbles.into_iter().map(|n| u8::from(n)).collect()), + children_merkle_values: std::array::from_fn(|n| entry.trie_node_info.children.child(trie::Nibble::try_from(u8::try_from(n).unwrap()).unwrap()).merkle_value().map(Cow::Borrowed)), + storage_value: match entry.trie_node_info.storage_value { + trie::proof_decode::StorageValue::HashKnownValueMissing( + _, + ) => return, + trie::proof_decode::StorageValue::None => { + full_sqlite::InsertTrieNodeStorageValue::NoValue + } + trie::proof_decode::StorageValue::Known { + value, .. + } => full_sqlite::InsertTrieNodeStorageValue::Value { + value: Cow::Borrowed(value), + references_merkle_value: false, // TODO: + }, + }, + }), + match entry.trie_node_info.storage_value { + trie::proof_decode::StorageValue::None => 0, // TODO: ?! + trie::proof_decode::StorageValue::HashKnownValueMissing( + .. + ) => return, + trie::proof_decode::StorageValue::Known { + inline: true, + .. + } => 0, + trie::proof_decode::StorageValue::Known { + inline: false, + .. + } => 1, + }, + ).unwrap(); + } + } + }) + .await; + } + if result.is_err() { // Note that we perform the ban even if the source is now disconnected. let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); @@ -1672,6 +2165,14 @@ impl SyncBackground { // Similarly, verifying a block might generate a block announce. debug_assert!(self.pending_block_announce.is_none()); + // Given that a block verification might require downloading some storage + // items due to missing storage items, and that we only want one download at + // a time, we don't verify blocks if a download is in progress. + debug_assert!(matches!( + self.database_catch_up_download_block_verification, + DatabaseCatchUpDownloadBlockVerification::None + )); + let (new_self, maybe_more_to_process) = self.process_blocks().await; process_sync = maybe_more_to_process; self = new_self; @@ -1861,6 +2362,7 @@ impl SyncBackground { ) }) .await + // TODO: don't panic in case of incomplete storage .expect("database access error"); block_authoring = req.inject_value(value.as_ref().map(|(val, vers)| { @@ -1890,6 +2392,7 @@ impl SyncBackground { ) }) .await + // TODO: don't panic in case of incomplete storage .expect("database access error"); block_authoring = @@ -1922,6 +2425,7 @@ impl SyncBackground { ) }) .await + // TODO: don't panic in case of incomplete storage .expect("database access error"); block_authoring = req @@ -2084,9 +2588,48 @@ impl SyncBackground { } (self, true) } - all::ProcessOne::WarpSyncFinished { .. } => { - // Warp syncing is currently disabled for the full node. - unimplemented!() + all::ProcessOne::WarpSyncFinished { + sync, + finalized_body: Some(finalized_body), + finalized_block_runtime, + .. + } => { + self.sync = sync; + + // Destory all existing subscriptions due to the gap in the chain. + self.pending_notification = None; + self.blocks_notifications.clear(); + + self.finalized_runtime = Arc::new(finalized_block_runtime); + let chain_info: chain_information::ValidChainInformation = + self.sync.as_chain_information().into(); + self.database + .with_database(move |database| { + database + .reset( + chain_info.as_ref(), + finalized_body.iter().map(|e| &e[..]), + None, + ) + .unwrap(); + }) + .await; + // TODO: what is known about the finalized storage into the database is currently done when a proof is downloaded; however if the proof download finished code no longer inserts entries related to unknown blocks, then we should do it here instead + + if matches!( + self.database_catch_up_download, + DatabaseCatchUpDownload::NothingToDownloadCache + ) { + self.database_catch_up_download = DatabaseCatchUpDownload::NoDownloadInProgress; + } + + (self, true) + } + all::ProcessOne::WarpSyncFinished { + finalized_body: None, + .. + } => { + unreachable!() } all::ProcessOne::VerifyBlock(verify) => { // TODO: ban peer in case of verification failure @@ -2151,7 +2694,56 @@ impl SyncBackground { { Ok(success) => success, Err(ExecuteBlockError::VerificationFailure( - ExecuteBlockVerificationFailureError::DatabaseParentAccess(error), + ExecuteBlockVerificationFailureError::DatabaseParentAccess { + error: full_sqlite::StorageAccessError::IncompleteStorage, + context, + }, + )) => { + // The block verification failed because the storage of the parent + // is still being downloaded from the network. + self.log_callback.log( + LogLevel::Debug, + format!( + "block-verification-incomplete-storage; hash={}; height={}; \ + time_before_interrupt={:?}", + HashDisplay(&hash_to_verify), + header_verification_success.height(), + when_verification_started.elapsed() + ), + ); + + debug_assert!(matches!( + self.database_catch_up_download_block_verification, + DatabaseCatchUpDownloadBlockVerification::None + )); + match context { + ExecuteBlockDatabaseAccessFailureContext::ParentRuntimeAccess => { + self.database_catch_up_download_block_verification = DatabaseCatchUpDownloadBlockVerification::CodeStorageProofDesired { + block_hash: *header_verification_success.parent_hash(), + block_number: header_verification_success.height() - 1, + }; + } + ExecuteBlockDatabaseAccessFailureContext::FunctionCall { + function_name, + parameter, + } => { + self.database_catch_up_download_block_verification = + DatabaseCatchUpDownloadBlockVerification::CallProofDesired { + block_hash: *header_verification_success.parent_hash(), + block_number: header_verification_success.height() - 1, + function_name: function_name.to_owned(), + parameter, + }; + } + } + + self.sync = header_verification_success.cancel(); + return (self, true); + } + Err(ExecuteBlockError::VerificationFailure( + ExecuteBlockVerificationFailureError::DatabaseParentAccess { + error, .. + }, )) => { panic!("corrupted database: {error}") } @@ -2424,7 +3016,7 @@ pub async fn execute_block_and_insert( let mut call = runtime_host::run(runtime_host::Config { virtual_machine: parent_runtime, function_to_call: call_function, - parameter: iter::once(call_parameter), + parameter: iter::once(&call_parameter), storage_main_trie_changes: storage_changes.into_main_trie_diff(), max_log_level: 0, calculate_trie_changes: true, @@ -2487,15 +3079,26 @@ pub async fn execute_block_and_insert( key.iter().copied(), ) }) - .await - .map_err(ExecuteBlockVerificationFailureError::DatabaseParentAccess)?; - let value = match value.as_ref() { - Some((val, vers)) => Some(( + .await; + let value = match value { + Ok(Some((ref val, vers))) => Some(( iter::once(&val[..]), - runtime_host::TrieEntryVersion::try_from(*vers) + runtime_host::TrieEntryVersion::try_from(vers) .map_err(|_| ExecuteBlockVerificationFailureError::DatabaseInvalidStateTrieVersion)? )), - None => None, + Ok(None) => None, + Err(error) => { + return Err(ExecuteBlockError::VerificationFailure( + ExecuteBlockVerificationFailureError::DatabaseParentAccess { + error, + context: + ExecuteBlockDatabaseAccessFailureContext::FunctionCall { + function_name: call_function, + parameter: call_parameter, + }, + }, + )) + } }; database_accesses_duration += when_database_access_started.elapsed(); @@ -2521,8 +3124,22 @@ pub async fn execute_block_and_insert( key_nibbles.iter().copied(), ) }) - .await - .map_err(ExecuteBlockVerificationFailureError::DatabaseParentAccess)?; + .await; + let merkle_value = match merkle_value { + Ok(mv) => mv, + Err(error) => { + return Err(ExecuteBlockError::VerificationFailure( + ExecuteBlockVerificationFailureError::DatabaseParentAccess { + error, + context: + ExecuteBlockDatabaseAccessFailureContext::FunctionCall { + function_name: call_function, + parameter: call_parameter, + }, + }, + )) + } + }; database_accesses_duration += when_database_access_started.elapsed(); call = req.inject_merkle_value(merkle_value.as_ref().map(|v| &v[..])); @@ -2555,8 +3172,22 @@ pub async fn execute_block_and_insert( branch_nodes, ) }) - .await - .map_err(ExecuteBlockVerificationFailureError::DatabaseParentAccess)?; + .await; + let next_key = match next_key { + Ok(k) => k, + Err(error) => { + return Err(ExecuteBlockError::VerificationFailure( + ExecuteBlockVerificationFailureError::DatabaseParentAccess { + error, + context: + ExecuteBlockDatabaseAccessFailureContext::FunctionCall { + function_name: call_function, + parameter: call_parameter, + }, + }, + )) + } + }; database_accesses_duration += when_database_access_started.elapsed(); call = req.inject_key( @@ -2601,7 +3232,7 @@ pub async fn execute_block_and_insert( } None => { let parent_block_hash = *parent_block_hash; - database + let access = database .with_database(move |db| { db.block_storage_get( &parent_block_hash, @@ -2609,32 +3240,55 @@ pub async fn execute_block_and_insert( trie::bytes_to_nibbles(b":code".into_iter().copied()).map(u8::from), ) }) - .await - .map_err(ExecuteBlockVerificationFailureError::DatabaseParentAccess)? - .map(|(v, _)| Cow::Owned(v)) - .ok_or(ExecuteBlockVerificationFailureError::ParentCodeEmptyInDatabase)? + .await; + match access { + Ok(Some((v, _))) => Cow::Owned(v), + Ok(None) => { + return Err(ExecuteBlockError::VerificationFailure( + ExecuteBlockVerificationFailureError::ParentCodeEmptyInDatabase, + )) + } + Err(error) => return Err(ExecuteBlockError::VerificationFailure( + ExecuteBlockVerificationFailureError::DatabaseParentAccess { + error, + context: + ExecuteBlockDatabaseAccessFailureContext::ParentRuntimeAccess, + }, + )), + } } }; let new_heap_pages = match new_heap_pages { Some(c) => executor::storage_heap_pages_to_value(c) .map_err(ExecuteBlockInvalidBlockError::InvalidNewHeapPages)?, - None => executor::storage_heap_pages_to_value({ - let parent_block_hash = *parent_block_hash; - database - .with_database(move |db| { - db.block_storage_get( - &parent_block_hash, - iter::empty::>(), - trie::bytes_to_nibbles(b":heappages".into_iter().copied()) - .map(u8::from), - ) - }) - .await - .map_err(ExecuteBlockVerificationFailureError::DatabaseParentAccess)? - .as_ref() - .map(|(v, _)| &v[..]) - }) + None => executor::storage_heap_pages_to_value( + { + let parent_block_hash = *parent_block_hash; + let access = database + .with_database(move |db| { + db.block_storage_get( + &parent_block_hash, + iter::empty::>(), + trie::bytes_to_nibbles(b":heappages".into_iter().copied()) + .map(u8::from), + ) + }) + .await; + match access { + Ok(Some((v, _))) => Some(v), + Ok(None) => None, + Err(error) => return Err(ExecuteBlockError::VerificationFailure( + ExecuteBlockVerificationFailureError::DatabaseParentAccess { + error, + context: + ExecuteBlockDatabaseAccessFailureContext::ParentRuntimeAccess, + }, + )), + } + } + .as_deref(), + ) .map_err(ExecuteBlockVerificationFailureError::InvaliParentHeapPagesInDatabase)?, }; @@ -2783,7 +3437,13 @@ pub enum ExecuteBlockVerificationFailureError { /// Error starting the runtime execution. RuntimeStartError(executor::host::StartErr), /// Error while accessing the parent block in the database. - DatabaseParentAccess(full_sqlite::StorageAccessError), + #[display(fmt = "Error while accessing the parent block in the database: {error}")] + DatabaseParentAccess { + /// Error that happened. + error: full_sqlite::StorageAccessError, + /// In which context the error hapened. + context: ExecuteBlockDatabaseAccessFailureContext, + }, /// State trie version stored in database is invalid. DatabaseInvalidStateTrieVersion, /// Runtime has tried to call a forbidden host function. @@ -2794,6 +3454,20 @@ pub enum ExecuteBlockVerificationFailureError { InvaliParentHeapPagesInDatabase(executor::InvalidHeapPagesError), } +/// See [`ExecuteBlockVerificationFailureError::DatabaseParentAccess`]. +#[derive(Debug)] +pub enum ExecuteBlockDatabaseAccessFailureContext { + /// Error while accessing the `:code` or `:heappages` key of the parent. + ParentRuntimeAccess, + /// Error while performing a runtime call. + FunctionCall { + /// Name of the function that was being called during the access error. + function_name: &'static str, + /// Parameter to the function that was being called during the access error. + parameter: Vec, + }, +} + /// See [`ExecuteBlockError::InvalidBlock`]. #[derive(Debug, derive_more::Display)] pub enum ExecuteBlockInvalidBlockError { diff --git a/full-node/tests/author.rs b/full-node/tests/author.rs index ad89e34ca5..24b5f71578 100644 --- a/full-node/tests/author.rs +++ b/full-node/tests/author.rs @@ -19,6 +19,7 @@ use smoldot::json_rpc; use std::sync::Arc; #[test] +#[ignore] // TODO: restore after https://github.com/smol-dot/smoldot/issues/1109 fn basic_block_generated() { smol::block_on(async move { let client = smoldot_full_node::start(smoldot_full_node::Config { diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index 99c4a275e8..a0be364405 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -111,10 +111,9 @@ pub struct Config { /// block requests. pub download_ahead_blocks: NonZeroU32, - /// If `true`, the block bodies and storage are also synchronized and the block bodies are - /// verified. - // TODO: change this now that we don't verify block bodies here - pub full_mode: bool, + /// If true, the body of a block is downloaded (if necessary) before a + /// [`ProcessOne::VerifyBlock`] is generated. + pub download_bodies: bool, /// Known valid Merkle value and storage value combination for the `:code` key. /// @@ -181,58 +180,45 @@ impl AllSync { /// Initializes a new state machine. pub fn new(config: Config) -> Self { AllSync { - inner: if config.full_mode { - AllSyncInner::Optimistic { - inner: optimistic::OptimisticSync::new(optimistic::Config { - chain_information: config.chain_information, - block_number_bytes: config.block_number_bytes, - sources_capacity: config.sources_capacity, - blocks_capacity: config.blocks_capacity, - download_ahead_blocks: config.download_ahead_blocks, - download_bodies: config.full_mode, - }), - } - } else { - match warp_sync::start_warp_sync(warp_sync::Config { - start_chain_information: config.chain_information, - block_number_bytes: config.block_number_bytes, - sources_capacity: config.sources_capacity, - requests_capacity: config.sources_capacity, // TODO: ?! add as config? - code_trie_node_hint: config.code_trie_node_hint, - num_download_ahead_fragments: 128, // TODO: make configurable? - // TODO: make configurable? - // TODO: temporarily 0 before https://github.com/smol-dot/smoldot/issues/1109, as otherwise the warp syncing would take a long time if the starting point is too recent - warp_sync_minimum_gap: 0, - download_block_body: config.full_mode, - }) { - Ok(inner) => AllSyncInner::WarpSync { - inner, - ready_to_transition: None, - }, - Err(( - chain_information, - warp_sync::WarpSyncInitError::NotGrandpa - | warp_sync::WarpSyncInitError::UnknownConsensus, - )) => { - // On error, `warp_sync` returns back the chain information that was - // provided in its configuration. - AllSyncInner::Optimistic { - inner: optimistic::OptimisticSync::new(optimistic::Config { - chain_information, - block_number_bytes: config.block_number_bytes, - sources_capacity: config.sources_capacity, - blocks_capacity: config.blocks_capacity, - download_ahead_blocks: config.download_ahead_blocks, - download_bodies: false, - }), - } + inner: match warp_sync::start_warp_sync(warp_sync::Config { + start_chain_information: config.chain_information, + block_number_bytes: config.block_number_bytes, + sources_capacity: config.sources_capacity, + requests_capacity: config.sources_capacity, // TODO: ?! add as config? + code_trie_node_hint: config.code_trie_node_hint, + num_download_ahead_fragments: 128, // TODO: make configurable? + // TODO: make configurable? + // TODO: temporarily 0 before https://github.com/smol-dot/smoldot/issues/1109, as otherwise the warp syncing would take a long time if the starting point is too recent + warp_sync_minimum_gap: 0, + download_block_body: config.download_bodies, + }) { + Ok(inner) => AllSyncInner::WarpSync { + inner, + ready_to_transition: None, + }, + Err(( + chain_information, + warp_sync::WarpSyncInitError::NotGrandpa + | warp_sync::WarpSyncInitError::UnknownConsensus, + )) => { + // On error, `warp_sync` returns back the chain information that was + // provided in its configuration. + AllSyncInner::Optimistic { + inner: optimistic::OptimisticSync::new(optimistic::Config { + chain_information, + block_number_bytes: config.block_number_bytes, + sources_capacity: config.sources_capacity, + blocks_capacity: config.blocks_capacity, + download_ahead_blocks: config.download_ahead_blocks, + download_bodies: false, + }), } } }, shared: Shared { sources: slab::Slab::with_capacity(config.sources_capacity), requests: slab::Slab::with_capacity(config.sources_capacity), - full_mode: config.full_mode, + download_bodies: config.download_bodies, sources_capacity: config.sources_capacity, blocks_capacity: config.blocks_capacity, max_disjoint_headers: config.max_disjoint_headers, @@ -859,7 +845,7 @@ impl AllSync { ( sync[inner_source_id].outer_source_id, &src_user_data.user_data, - all_forks_request_convert(rq_params, self.shared.full_mode), + all_forks_request_convert(rq_params, self.shared.download_bodies), ) }, ); @@ -871,7 +857,7 @@ impl AllSync { ( inner[rq_detail.source_id].outer_source_id, &inner[rq_detail.source_id].user_data, - optimistic_request_convert(rq_detail, self.shared.full_mode), + optimistic_request_convert(rq_detail, self.shared.download_bodies), ) }); @@ -2144,7 +2130,7 @@ pub enum ProcessOne { /// SCALE-encoded extrinsics of the finalized block. The ordering is important. /// - /// `Some` if and only if [`Config::full_mode`] was `true`. + /// `Some` if and only if [`Config::download_bodies`] was `true`. finalized_body: Option>>, /// Storage value at the `:code` key of the finalized block. @@ -2254,7 +2240,7 @@ impl BlockVerify { /// Returns the list of SCALE-encoded extrinsics of the block to verify. /// - /// This is `Some` if and only if [`Config::full_mode`] is `true` + /// This is `Some` if and only if [`Config::download_bodies`] is `true` pub fn scale_encoded_extrinsics( &'_ self, ) -> Option + Clone + '_> + Clone + '_> { @@ -2419,7 +2405,7 @@ impl HeaderVerifySuccess { /// Returns the list of SCALE-encoded extrinsics of the block to verify. /// - /// This is `Some` if and only if [`Config::full_mode`] is `true` + /// This is `Some` if and only if [`Config::download_bodies`] is `true` pub fn scale_encoded_extrinsics( &'_ self, ) -> Option + Clone + '_> + Clone + '_> { @@ -2867,8 +2853,8 @@ struct Shared { sources: slab::Slab, requests: slab::Slab>, - /// See [`Config::full_mode`]. - full_mode: bool, + /// See [`Config::download_bodies`]. + download_bodies: bool, /// Value passed through [`Config::sources_capacity`]. sources_capacity: usize, @@ -2910,7 +2896,7 @@ impl Shared { max_disjoint_headers: self.max_disjoint_headers, max_requests_per_block: self.max_requests_per_block, allow_unknown_consensus_engines: self.allow_unknown_consensus_engines, - download_bodies: false, + download_bodies: self.download_bodies, }); debug_assert!(self @@ -3038,14 +3024,14 @@ enum SourceMapping { fn all_forks_request_convert( rq_params: all_forks::RequestParams, - full_node: bool, + download_body: bool, ) -> DesiredRequest { DesiredRequest::BlocksRequest { ascending: false, // Hardcoded based on the logic of the all-forks syncing. first_block_hash: Some(rq_params.first_block_hash), first_block_height: rq_params.first_block_height, num_blocks: rq_params.num_blocks, - request_bodies: full_node, + request_bodies: download_body, request_headers: true, request_justification: true, } @@ -3053,14 +3039,14 @@ fn all_forks_request_convert( fn optimistic_request_convert( rq_params: optimistic::RequestDetail, - full_node: bool, + download_body: bool, ) -> DesiredRequest { DesiredRequest::BlocksRequest { ascending: true, // Hardcoded based on the logic of the optimistic syncing. first_block_hash: None, first_block_height: rq_params.block_height.get(), num_blocks: rq_params.num_blocks.into(), - request_bodies: full_node, + request_bodies: download_body, request_headers: true, request_justification: true, } diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index 48656f9b5b..f5bfb788dd 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -90,7 +90,7 @@ pub(super) async fn start_standalone_chain( // is 5k. NonZeroU32::new(5000).unwrap() }, - full_mode: false, + download_bodies: false, code_trie_node_hint: runtime_code_hint.map(|hint| all::ConfigCodeTrieNodeHint { merkle_value: hint.merkle_value, storage_value: hint.storage_value,