From 533b4ba8c67381a74a6d4293af8051148eda804a Mon Sep 17 00:00:00 2001 From: PJColombo Date: Fri, 17 May 2024 18:53:46 +0200 Subject: [PATCH] fix(indexer): improve errors --- src/clients/beacon/errors.rs | 0 src/indexer/error.rs | 78 +++++++++++- src/indexer/mod.rs | 222 +++++++++++++++++++++-------------- src/indexer/types.rs | 4 +- src/synchronizer/mod.rs | 2 +- src/utils/web3.rs | 4 + 6 files changed, 215 insertions(+), 95 deletions(-) create mode 100644 src/clients/beacon/errors.rs diff --git a/src/clients/beacon/errors.rs b/src/clients/beacon/errors.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/indexer/error.rs b/src/indexer/error.rs index c963d66..143e3cd 100644 --- a/src/indexer/error.rs +++ b/src/indexer/error.rs @@ -1,17 +1,83 @@ +use tokio::sync::mpsc::error::SendError; + use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError}; +use super::types::IndexerTaskMessage; + #[derive(Debug, thiserror::Error)] pub enum IndexerError { + #[error("failed to create indexer")] + CreationFailure(#[source] anyhow::Error), + #[error(transparent)] + SyncingTaskError(#[from] SyncingTaskError), + #[error("failed to retrieve blobscan's sync state")] + BlobscanSyncStateRetrievalError(#[source] ClientError), + #[error("sync task message send failure")] + SyncingTaskMessageSendFailure(#[from] SendError), +} + +#[derive(Debug, thiserror::Error)] +pub enum SyncingTaskError { + #[error("an error ocurred while syncing historical data")] + HistoricalSyncingTaskError(#[from] HistoricalSyncingError), + #[error("an error occurred while syncing realtime data")] + RealtimeSyncingTaskError(#[from] RealtimeSyncingError), +} + +#[derive(Debug, thiserror::Error)] +pub enum HistoricalSyncingError { + #[error(transparent)] + SynchronizerError(#[from] SynchronizerError), +} + +#[derive(Debug, thiserror::Error)] +pub enum RealtimeSyncingError { + #[error("an error ocurred while receiving beacon events")] + BeaconEventsConnectionFailure(#[from] reqwest_eventsource::Error), + #[error("failed to subscribe to beacon events")] + BeaconEventsSubscriptionError(#[source] ClientError), + #[error("unexpected event \"{0}\" received")] + UnexpectedBeaconEvent(String), #[error(transparent)] - ReqwestEventSourceError(#[from] reqwest_eventsource::Error), + BeaconEventProcessingError(#[from] BeaconEventError), +} + +#[derive(Debug, thiserror::Error)] +pub enum BeaconEventError { + #[error("failed to handle \"chain_reorged\" event")] + ChainReorged(#[from] ChainReorgedEventHandlingError), + #[error("failed to handle \"head\" event")] + HeadBlock(#[from] HeadBlockEventHandlingError), + #[error("failed to handle \"finalized_checkpoint\" event")] + FinalizedCheckpoint(#[from] FinalizedBlockEventHandlingError), +} + +#[derive(Debug, thiserror::Error)] +pub enum FinalizedBlockEventHandlingError { #[error(transparent)] - ClientError(#[from] ClientError), + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve finalized block {0}")] + BlockRetrievalError(String, #[source] ClientError), #[error(transparent)] Other(#[from] anyhow::Error), + #[error("failed to update blobscan's last finalized block")] + BlobscanSyncStateUpdateError(#[source] ClientError), +} + +#[derive(Debug, thiserror::Error)] +pub enum ChainReorgedEventHandlingError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve reorged block {0}")] + BlockRetrievalError(String, #[source] ClientError), + #[error("failed to handle reorged of depth {0} starting at block {1}")] + ReorgedHandlingFailure(u32, String, #[source] ClientError), +} + +#[derive(Debug, thiserror::Error)] +pub enum HeadBlockEventHandlingError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), #[error(transparent)] SynchronizerError(#[from] SynchronizerError), - #[error("{0}")] - SerdeError(#[from] serde_json::Error), - #[error("Unexpected event \"{event}\" received")] - UnexpectedEvent { event: String }, } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 3a07bdf..a552f1a 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -17,11 +17,16 @@ use crate::{ }, context::{Config as ContextConfig, Context}, env::Environment, + indexer::error::{ + ChainReorgedEventHandlingError, FinalizedBlockEventHandlingError, + HeadBlockEventHandlingError, HistoricalSyncingError, + }, synchronizer::SynchronizerBuilder, + utils::web3::get_full_hash, }; use self::{ - error::IndexerError, + error::{IndexerError, RealtimeSyncingError}, types::{IndexerResult, IndexerTaskMessage}, }; @@ -42,7 +47,10 @@ impl Indexer { Err(error) => { error!(?error, "Failed to create context"); - return Err(error.into()); + return Err(IndexerError::CreationFailure(anyhow!( + "Failed to create context: {:?}", + error + ))); } }; @@ -50,7 +58,12 @@ impl Indexer { let num_threads = match args.num_threads { Some(num_threads) => num_threads, None => thread::available_parallelism() - .map_err(|err| anyhow!("Failed to get number of available threads: {:?}", err))? + .map_err(|err| { + IndexerError::CreationFailure(anyhow!( + "Failed to get number of available threads: {:?}", + err + )) + })? .get() as u32, }; let disable_sync_checkpoint_save = args.disable_sync_checkpoint_save; @@ -85,9 +98,9 @@ impl Indexer { let sync_state = match self.context.blobscan_client().get_sync_state().await { Ok(state) => state, Err(error) => { - error!(?error, "Failed to fetch sync state"); + error!(?error, "Failed to fetch blobscan's sync state"); - return Err(error.into()); + return Err(IndexerError::BlobscanSyncStateRetrievalError(error)); } }; @@ -129,7 +142,7 @@ impl Indexer { let mut total_tasks = 0; if end_block_id.is_none() { - self._start_realtime_sync_task(tx, current_upper_block_id); + self._start_realtime_syncing_task(tx, current_upper_block_id); total_tasks += 1; } @@ -138,7 +151,7 @@ impl Indexer { matches!(end_block_id, BlockId::Slot(slot) if slot < self.dencun_fork_slot); if !self.disable_sync_historical && !historical_sync_completed { - self._start_historical_sync_task(tx1, current_lower_block_id, end_block_id); + self._start_historical_syncing_task(tx1, current_lower_block_id, end_block_id); total_tasks += 1; } @@ -155,6 +168,8 @@ impl Indexer { } } IndexerTaskMessage::Error(error) => { + error!(?error, "An error occurred while running a syncing task"); + return Err(error.into()); } } @@ -163,7 +178,7 @@ impl Indexer { Ok(()) } - fn _start_historical_sync_task( + fn _start_historical_syncing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, @@ -174,29 +189,32 @@ impl Indexer { tokio::spawn(async move { let historical_syc_thread_span = tracing::info_span!("sync:historical"); - async move { + let result: Result<(), IndexerError> = async move { let result = synchronizer.run(&start_block_id, &end_block_id).await; if let Err(error) = result { - error!(?error, "An error occurred while syncing historical data"); - // TODO: Find a better way to handle this error - tx.send(IndexerTaskMessage::Error(error.into())) - .await - .unwrap(); + tx.send(IndexerTaskMessage::Error( + HistoricalSyncingError::SynchronizerError(error).into(), + )) + .await?; } else { - info!("Historical sync completed successfully"); + info!("Historical syncing completed successfully"); - tx.send(IndexerTaskMessage::Done).await.unwrap(); + tx.send(IndexerTaskMessage::Done).await?; } + + Ok(()) } .instrument(historical_syc_thread_span) .await; + result?; + Ok(()) }) } - fn _start_realtime_sync_task( + fn _start_realtime_syncing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, @@ -207,8 +225,7 @@ impl Indexer { tokio::spawn(async move { let realtime_sync_task_span = tracing::info_span!("sync:realtime"); - let result: Result<(), IndexerError> = async { - info!("Starting realtime sync…"); + let result: Result<(), RealtimeSyncingError> = async { let beacon_client = task_context.beacon_client(); let blobscan_client = task_context.blobscan_client(); let topics = vec![ @@ -218,97 +235,132 @@ impl Indexer { ]; let mut event_source = task_context .beacon_client() - .subscribe_to_events(&topics)?; + .subscribe_to_events(&topics).map_err(RealtimeSyncingError::BeaconEventsSubscriptionError)?; let mut is_initial_sync_to_head = true; + let events = topics + .iter() + .map(|topic| topic.into()) + .collect::>() + .join(", "); + + info!("Subscribed to beacon events: {events}"); while let Some(event) = event_source.next().await { match event { Ok(Event::Open) => { - let events = topics - .iter() - .map(|topic| topic.into()) - .collect::>() - .join(", "); - debug!(events, "Listening to beacon events…") + debug!("Subscription connection opened") } Ok(Event::Message(event)) => { let event_name = event.event.as_str(); match event_name { "chain_reorg" => { - let reorg_block_data = - serde_json::from_str::(&event.data)?; - let slot = reorg_block_data.slot; - let old_head_block = reorg_block_data.old_head_block; - let target_depth = reorg_block_data.depth; - - let mut current_reorged_block = old_head_block; - let mut reorged_slots: Vec = vec![]; - - for current_depth in 1..=target_depth { - let reorged_block_head = match beacon_client.get_block_header(&BlockId::Hash(current_reorged_block)).await? { - Some(block) => block, - None => { - warn!(event=event_name, slot=slot, "Found {current_depth} out of {target_depth} reorged blocks only"); - break - } - }; - - reorged_slots.push(reorged_block_head.header.message.slot); - current_reorged_block = reorged_block_head.header.message.parent_root; - } + let chain_reorg_span = tracing::info_span!("chain_reorg"); + + let result: Result<(), ChainReorgedEventHandlingError> = async { + + let reorg_block_data = + serde_json::from_str::(&event.data)?; + let slot = reorg_block_data.slot; + let old_head_block = reorg_block_data.old_head_block; + let target_depth = reorg_block_data.depth; + + let mut current_reorged_block = old_head_block; + let mut reorged_slots: Vec = vec![]; + + for current_depth in 1..=target_depth { + let reorged_block_head = match beacon_client.get_block_header(&BlockId::Hash(current_reorged_block)).await.map_err(|err| ChainReorgedEventHandlingError::BlockRetrievalError(get_full_hash(¤t_reorged_block), err))? { + Some(block) => block, + None => { + warn!(event=event_name, slot=slot, "Found {current_depth} out of {target_depth} reorged blocks only"); + break + } + }; + + reorged_slots.push(reorged_block_head.header.message.slot); + current_reorged_block = reorged_block_head.header.message.parent_root; + } + + let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await.map_err(|err| ChainReorgedEventHandlingError::ReorgedHandlingFailure(target_depth, get_full_hash(&old_head_block), err))?; - let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await?; + info!(event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); - info!(event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); + Ok(()) + }.instrument(chain_reorg_span).await; + + if let Err(error) = result { + return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); + } }, - "head" => { - let head_block_data = + "head" => { + let head_span = tracing::info_span!("head_block"); + + let result: Result<(), HeadBlockEventHandlingError> = async { + let head_block_data = serde_json::from_str::(&event.data)?; + let head_block_id = &BlockId::Slot(head_block_data.slot); let initial_block_id = if is_initial_sync_to_head { is_initial_sync_to_head = false; + &start_block_id } else { head_block_id }; synchronizer.run(initial_block_id, &BlockId::Slot(head_block_data.slot + 1)).await?; - } + + Ok(()) + }.instrument(head_span).await; + + if let Err(error) = result { + return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); + } + }, "finalized_checkpoint" => { - let finalized_checkpoint_data = - serde_json::from_str::( - &event.data, - )?; - let block_hash = finalized_checkpoint_data.block; - let full_block_hash = format!("0x{:x}", block_hash); - let last_finalized_block_number = beacon_client - .get_block(&BlockId::Hash(block_hash)) - .await? - .with_context(|| { - anyhow!("Finalized block with hash {full_block_hash} not found") - })? - .message.body.execution_payload - .with_context(|| { - anyhow!("Finalized block with hash {full_block_hash} has no execution payload") - })?.block_number; - - blobscan_client - .update_sync_state(BlockchainSyncState { - last_lower_synced_slot: None, - last_upper_synced_slot: None, - last_finalized_block: Some( - last_finalized_block_number - ), - }) - .await?; - - info!(event=event_name, execution_block=last_finalized_block_number, "New finalized block detected"); + let finalized_checkpoint_span = tracing::info_span!("finalized_checkpoint"); + + let result: Result<(), FinalizedBlockEventHandlingError> = async move { + let finalized_checkpoint_data = + serde_json::from_str::( + &event.data, + )?; + let block_hash = finalized_checkpoint_data.block; + let last_finalized_block_number = beacon_client + .get_block(&BlockId::Hash(block_hash)) + .await.map_err(|err| FinalizedBlockEventHandlingError::BlockRetrievalError(get_full_hash(&block_hash), err))? + .with_context(|| { + anyhow!("Finalized block not found") + })? + .message.body.execution_payload + .with_context(|| { + anyhow!("Finalized block has no execution payload") + })?.block_number; + + blobscan_client + .update_sync_state(BlockchainSyncState { + last_lower_synced_slot: None, + last_upper_synced_slot: None, + last_finalized_block: Some( + last_finalized_block_number + ), + }) + .await.map_err(FinalizedBlockEventHandlingError::BlobscanSyncStateUpdateError)?; + + info!(finalized_execution_block=last_finalized_block_number, "Finalized checkpoint event received. Updated last finalized block number"); + + Ok(()) + }.instrument(finalized_checkpoint_span).await; + + if let Err(error) = result { + return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); + } + }, unexpected_event_id => { - return Err(IndexerError::UnexpectedEvent { event: unexpected_event_id.to_string() }) - } + return Err(RealtimeSyncingError::UnexpectedBeaconEvent(unexpected_event_id.to_string())); + }, } }, Err(error) => { @@ -325,11 +377,9 @@ impl Indexer { .await; if let Err(error) = result { - error!(?error, "An error occurred while syncing realtime data"); - // TODO: Find a better way to handle this error - tx.send(IndexerTaskMessage::Error(error)).await.unwrap(); + tx.send(IndexerTaskMessage::Error(error.into())).await?; } else { - tx.send(IndexerTaskMessage::Done).await.unwrap(); + tx.send(IndexerTaskMessage::Done).await?; } Ok(()) diff --git a/src/indexer/types.rs b/src/indexer/types.rs index 80ba119..bde0077 100644 --- a/src/indexer/types.rs +++ b/src/indexer/types.rs @@ -1,8 +1,8 @@ -use super::error::IndexerError; +use super::error::{IndexerError, SyncingTaskError}; pub type IndexerResult = Result; pub enum IndexerTaskMessage { Done, - Error(IndexerError), + Error(SyncingTaskError), } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 948e868..deb3481 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -195,7 +195,7 @@ impl Synchronizer { initial_slot, final_slot, reverse_sync = is_reverse_sync, - "Processing {unprocessed_slots} slots…" + "Syncing {unprocessed_slots} slots…" ); while unprocessed_slots > 0 { diff --git a/src/utils/web3.rs b/src/utils/web3.rs index bf4b170..25a3faf 100644 --- a/src/utils/web3.rs +++ b/src/utils/web3.rs @@ -64,3 +64,7 @@ pub fn get_tx_versioned_hashes(tx: &Transaction) -> Result>> { None => Ok(None), } } + +pub fn get_full_hash(hash: &H256) -> String { + format!("0x{:x}", hash) +}