From e07917422b10138135629efaeeaa86a24cc88018 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Mon, 10 Jun 2024 15:18:49 +0200 Subject: [PATCH 1/8] refactor(indexer): check for reorgs by verifying parent and current block root hashes instead of listening to `chain_reorg` sse event --- src/indexer/error.rs | 73 +++---- .../event_handlers/finalized_checkpoint.rs | 85 ++++++++ src/indexer/event_handlers/head.rs | 139 ++++++++++++ src/indexer/event_handlers/mod.rs | 2 + src/indexer/mod.rs | 202 ++++-------------- src/indexer/types.rs | 4 +- src/synchronizer/mod.rs | 20 +- 7 files changed, 315 insertions(+), 210 deletions(-) create mode 100644 src/indexer/event_handlers/finalized_checkpoint.rs create mode 100644 src/indexer/event_handlers/head.rs create mode 100644 src/indexer/event_handlers/mod.rs diff --git a/src/indexer/error.rs b/src/indexer/error.rs index 143e3cd..b707e59 100644 --- a/src/indexer/error.rs +++ b/src/indexer/error.rs @@ -2,82 +2,69 @@ use tokio::sync::mpsc::error::SendError; use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError}; -use super::types::IndexerTaskMessage; +use super::{ + event_handlers::{ + finalized_checkpoint::FinalizedCheckpointEventHandlerError, head::HeadEventHandlerError, + }, + types::IndexerTaskMessage, +}; #[derive(Debug, thiserror::Error)] pub enum IndexerError { #[error("failed to create indexer")] CreationFailure(#[source] anyhow::Error), #[error(transparent)] - SyncingTaskError(#[from] SyncingTaskError), + SyncingTaskError(#[from] IndexingError), #[error("failed to retrieve blobscan's sync state")] BlobscanSyncStateRetrievalError(#[source] ClientError), - #[error("sync task message send failure")] + #[error("failed to send syncing task message")] SyncingTaskMessageSendFailure(#[from] SendError), } #[derive(Debug, thiserror::Error)] -pub enum SyncingTaskError { - #[error("an error ocurred while syncing historical data")] - HistoricalSyncingTaskError(#[from] HistoricalSyncingError), - #[error("an error occurred while syncing realtime data")] - RealtimeSyncingTaskError(#[from] RealtimeSyncingError), +pub enum IndexingError { + #[error(transparent)] + HistoricalIndexingFailure(#[from] HistoricalIndexingError), + #[error(transparent)] + LiveIndexingError(#[from] LiveIndexingError), } #[derive(Debug, thiserror::Error)] -pub enum HistoricalSyncingError { +pub enum HistoricalIndexingError { #[error(transparent)] SynchronizerError(#[from] SynchronizerError), } #[derive(Debug, thiserror::Error)] -pub enum RealtimeSyncingError { +pub enum LiveIndexingError { #[error("an error ocurred while receiving beacon events")] BeaconEventsConnectionFailure(#[from] reqwest_eventsource::Error), #[error("failed to subscribe to beacon events")] BeaconEventsSubscriptionError(#[source] ClientError), #[error("unexpected event \"{0}\" received")] UnexpectedBeaconEvent(String), - #[error(transparent)] - BeaconEventProcessingError(#[from] BeaconEventError), -} - -#[derive(Debug, thiserror::Error)] -pub enum BeaconEventError { - #[error("failed to handle \"chain_reorged\" event")] - ChainReorged(#[from] ChainReorgedEventHandlingError), - #[error("failed to handle \"head\" event")] - HeadBlock(#[from] HeadBlockEventHandlingError), - #[error("failed to handle \"finalized_checkpoint\" event")] - FinalizedCheckpoint(#[from] FinalizedBlockEventHandlingError), + #[error("failed to handle beacon event")] + BeaconEventHandlingError(#[from] EventHandlerError), } #[derive(Debug, thiserror::Error)] -pub enum FinalizedBlockEventHandlingError { +pub enum EventHandlerError { #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error("failed to retrieve finalized block {0}")] - BlockRetrievalError(String, #[source] ClientError), + HeadEventHandlerError(#[from] HeadEventHandlerError), #[error(transparent)] - Other(#[from] anyhow::Error), - #[error("failed to update blobscan's last finalized block")] - BlobscanSyncStateUpdateError(#[source] ClientError), + FinalizedCheckpointHandlerError(#[from] FinalizedCheckpointEventHandlerError), } -#[derive(Debug, thiserror::Error)] -pub enum ChainReorgedEventHandlingError { - #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error("failed to retrieve reorged block {0}")] - BlockRetrievalError(String, #[source] ClientError), - #[error("failed to handle reorged of depth {0} starting at block {1}")] - ReorgedHandlingFailure(u32, String, #[source] ClientError), +impl From for LiveIndexingError { + fn from(err: HeadEventHandlerError) -> Self { + LiveIndexingError::BeaconEventHandlingError(EventHandlerError::HeadEventHandlerError(err)) + } } -#[derive(Debug, thiserror::Error)] -pub enum HeadBlockEventHandlingError { - #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error(transparent)] - SynchronizerError(#[from] SynchronizerError), +impl From for LiveIndexingError { + fn from(err: FinalizedCheckpointEventHandlerError) -> Self { + LiveIndexingError::BeaconEventHandlingError( + EventHandlerError::FinalizedCheckpointHandlerError(err), + ) + } } diff --git a/src/indexer/event_handlers/finalized_checkpoint.rs b/src/indexer/event_handlers/finalized_checkpoint.rs new file mode 100644 index 0000000..2e37290 --- /dev/null +++ b/src/indexer/event_handlers/finalized_checkpoint.rs @@ -0,0 +1,85 @@ +use tracing::info; + +use crate::{ + clients::{ + beacon::types::{BlockId, FinalizedCheckpointEventData}, + blobscan::types::BlockchainSyncState, + common::ClientError, + }, + context::Context, + utils::web3::get_full_hash, +}; + +#[derive(Debug, thiserror::Error)] +pub enum FinalizedCheckpointEventHandlerError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve block {0}")] + BlockRetrievalError(String, #[source] ClientError), + #[error("block \"{0}\" not found")] + BlockNotFound(String), + #[error("failed to update last finalized block")] + BlobscanFinalizedBlockUpdateFailure(#[source] ClientError), +} + +pub struct FinalizedCheckpointHandler { + context: Context, +} + +impl FinalizedCheckpointHandler { + pub fn new(context: Context) -> Self { + FinalizedCheckpointHandler { context } + } + + pub async fn handle( + &self, + event_data: String, + ) -> Result<(), FinalizedCheckpointEventHandlerError> { + let finalized_checkpoint_data = + serde_json::from_str::(&event_data)?; + let block_hash = finalized_checkpoint_data.block; + let full_block_hash = get_full_hash(&block_hash); + let last_finalized_block_number = match self + .context + .beacon_client() + .get_block(&BlockId::Hash(block_hash)) + .await + .map_err(|err| { + FinalizedCheckpointEventHandlerError::BlockRetrievalError( + full_block_hash.clone(), + err, + ) + })? { + Some(block) => match block.message.body.execution_payload { + Some(execution_payload) => execution_payload.block_number, + None => { + return Err(FinalizedCheckpointEventHandlerError::BlockNotFound( + full_block_hash, + )) + } + }, + None => { + return Err(FinalizedCheckpointEventHandlerError::BlockNotFound( + full_block_hash, + )) + } + }; + + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_lower_synced_slot: None, + last_upper_synced_slot: None, + last_finalized_block: Some(last_finalized_block_number), + }) + .await + .map_err(FinalizedCheckpointEventHandlerError::BlobscanFinalizedBlockUpdateFailure)?; + + info!( + finalized_execution_block = last_finalized_block_number, + "Finalized checkpoint event received. Updated last finalized block number" + ); + + Ok(()) + } +} diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs new file mode 100644 index 0000000..faad56d --- /dev/null +++ b/src/indexer/event_handlers/head.rs @@ -0,0 +1,139 @@ +use std::cmp; + +use ethers::types::H256; +use tracing::info; + +use crate::{ + clients::{ + beacon::types::{BlockHeader, BlockId, HeadEventData}, + blobscan::types::BlockchainSyncState, + common::ClientError, + }, + context::Context, + synchronizer::{error::SynchronizerError, Synchronizer}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum HeadEventHandlerError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve header for block \"{0}\"")] + BlockHeaderRetrievalError(BlockId, #[source] ClientError), + #[error("header for block \"{0}\" not found")] + BlockHeaderNotFound(BlockId), + #[error("failed to index head block")] + BlockSyncedError(#[from] SynchronizerError), + #[error("failed to handle reorged slots")] + BlobscanReorgedSlotsFailure(#[source] ClientError), + #[error("failed to update blobscan's sync state")] + BlobscanSyncStateUpdateError(#[source] ClientError), +} + +pub struct HeadEventHandler { + context: Context, + synchronizer: Synchronizer, + start_block_id: BlockId, + last_block_hash: Option, +} + +impl HeadEventHandler { + pub fn new(context: Context, synchronizer: Synchronizer, start_block_id: BlockId) -> Self { + HeadEventHandler { + context, + synchronizer, + start_block_id, + last_block_hash: None, + } + } + + pub async fn handle(&mut self, event_data: String) -> Result<(), HeadEventHandlerError> { + let head_block_data = serde_json::from_str::(&event_data)?; + + let head_block_slot = head_block_data.slot; + let head_block_hash = head_block_data.block; + + let head_block_id = BlockId::Slot(head_block_data.slot); + let initial_block_id = if self.last_block_hash.is_none() { + self.start_block_id.clone() + } else { + head_block_id.clone() + }; + + let head_block_header = self.get_block_header(&head_block_id).await?.header; + + if let Some(last_block_hash) = self.last_block_hash { + if last_block_hash != head_block_header.message.parent_root { + let parent_block_header = self + .get_block_header(&BlockId::Hash(head_block_header.message.parent_root)) + .await? + .header; + let parent_block_slot = parent_block_header.message.slot; + let reorg_start_slot = parent_block_slot + 1; + let reorg_final_slot = head_block_slot; + let reorged_slots = (reorg_start_slot..reorg_final_slot).collect::>(); + + let result: Result<(), HeadEventHandlerError> = async { + let total_updated_slots = self.context + .blobscan_client() + .handle_reorged_slots(reorged_slots.as_slice()) + .await + .map_err(HeadEventHandlerError::BlobscanReorgedSlotsFailure)?; + + + info!(slot=head_block_slot, "Reorganization detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); + + // Re-index parent block as it may be mark as reorged and not indexed + self.synchronizer + .run( + &BlockId::Slot(parent_block_slot), + &BlockId::Slot(parent_block_slot + 1), + ) + .await?; + + Ok(()) + } + .await; + + if let Err(err) = result { + // If an error occurred while handling the reorg try to update the latest synced slot to the last known slot before the reorg + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot: None, + last_upper_synced_slot: Some(cmp::max(parent_block_slot - 1, 0)), + }) + .await + .map_err(HeadEventHandlerError::BlobscanSyncStateUpdateError)?; + + return Err(err); + } + } + } + + self.synchronizer + .run(&initial_block_id, &BlockId::Slot(head_block_slot + 1)) + .await?; + + self.last_block_hash = Some(head_block_hash); + + Ok(()) + } + + async fn get_block_header( + &self, + block_id: &BlockId, + ) -> Result { + match self + .context + .beacon_client() + .get_block_header(block_id) + .await + .map_err(|err| { + HeadEventHandlerError::BlockHeaderRetrievalError(block_id.clone(), err) + })? { + Some(block) => Ok(block), + None => Err(HeadEventHandlerError::BlockHeaderNotFound(block_id.clone())), + } + } +} diff --git a/src/indexer/event_handlers/mod.rs b/src/indexer/event_handlers/mod.rs new file mode 100644 index 0000000..dea7c11 --- /dev/null +++ b/src/indexer/event_handlers/mod.rs @@ -0,0 +1,2 @@ +pub mod finalized_checkpoint; +pub mod head; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index ceca713..02ef0e7 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,36 +1,29 @@ use std::thread; -use anyhow::{anyhow, Context as AnyhowContext}; +use anyhow::anyhow; +use event_handlers::{finalized_checkpoint::FinalizedCheckpointHandler, head::HeadEventHandler}; use futures::StreamExt; use reqwest_eventsource::Event; use tokio::{sync::mpsc, task::JoinHandle}; -use tracing::{debug, error, info, warn, Instrument}; +use tracing::{debug, error, info, Instrument}; use crate::{ args::Args, - clients::{ - beacon::types::{ - BlockId, ChainReorgEventData, FinalizedCheckpointEventData, HeadEventData, Topic, - }, - blobscan::types::BlockchainSyncState, - }, + clients::beacon::types::{BlockId, Topic}, context::{Config as ContextConfig, Context}, env::Environment, - indexer::error::{ - ChainReorgedEventHandlingError, FinalizedBlockEventHandlingError, - HeadBlockEventHandlingError, HistoricalSyncingError, - }, + indexer::error::HistoricalIndexingError, synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder}, - utils::web3::get_full_hash, }; use self::{ - error::{IndexerError, RealtimeSyncingError}, + error::{IndexerError, LiveIndexingError}, types::{IndexerResult, IndexerTaskMessage}, }; pub mod error; +pub mod event_handlers; pub mod types; pub struct Indexer { @@ -142,7 +135,7 @@ impl Indexer { let mut total_tasks = 0; if end_block_id.is_none() { - self._start_realtime_syncing_task(tx, current_upper_block_id); + self.start_live_indexing_task(tx, current_upper_block_id); total_tasks += 1; } @@ -152,7 +145,7 @@ impl Indexer { matches!(current_lower_block_id, BlockId::Slot(slot) if slot < self.dencun_fork_slot); if !self.disable_sync_historical && !historical_sync_completed { - self._start_historical_syncing_task(tx1, current_lower_block_id, end_block_id); + self.start_historical_indexing_task(tx1, current_lower_block_id, end_block_id); total_tasks += 1; } @@ -179,23 +172,23 @@ impl Indexer { Ok(()) } - fn _start_historical_syncing_task( + fn start_historical_indexing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, end_block_id: BlockId, ) -> JoinHandle> { - let mut synchronizer = self._create_synchronizer(CheckpointType::Lower); + let synchronizer = self.create_synchronizer(CheckpointType::Lower); tokio::spawn(async move { - let historical_syc_thread_span = tracing::info_span!("sync:historical"); + let historical_syc_thread_span = tracing::info_span!("indexer:historical"); let result: Result<(), IndexerError> = async move { let result = synchronizer.run(&start_block_id, &end_block_id).await; if let Err(error) = result { tx.send(IndexerTaskMessage::Error( - HistoricalSyncingError::SynchronizerError(error).into(), + HistoricalIndexingError::SynchronizerError(error).into(), )) .await?; } else { @@ -215,34 +208,33 @@ impl Indexer { }) } - fn _start_realtime_syncing_task( + fn start_live_indexing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, ) -> JoinHandle> { let task_context = self.context.clone(); - let mut synchronizer = self._create_synchronizer(CheckpointType::Upper); + let synchronizer = self.create_synchronizer(CheckpointType::Upper); tokio::spawn(async move { - let realtime_sync_task_span = tracing::info_span!("sync:realtime"); - - let result: Result<(), RealtimeSyncingError> = async { - let beacon_client = task_context.beacon_client(); - let blobscan_client = task_context.blobscan_client(); - let topics = vec![ - Topic::ChainReorg, - Topic::Head, - Topic::FinalizedCheckpoint, - ]; + let realtime_sync_task_span = tracing::info_span!("indexer:live"); + + let result: Result<(), LiveIndexingError> = async { + let topics = vec![Topic::Head, Topic::FinalizedCheckpoint]; let mut event_source = task_context .beacon_client() - .subscribe_to_events(&topics).map_err(RealtimeSyncingError::BeaconEventsSubscriptionError)?; - let mut is_initial_sync_to_head = true; + .subscribe_to_events(&topics) + .map_err(LiveIndexingError::BeaconEventsSubscriptionError)?; let events = topics - .iter() - .map(|topic| topic.into()) - .collect::>() - .join(", "); + .iter() + .map(|topic| topic.into()) + .collect::>() + .join(", "); + + let mut head_event_handler = + HeadEventHandler::new(task_context.clone(), synchronizer, start_block_id); + let finalized_checkpoint_event_handler = + FinalizedCheckpointHandler::new(task_context); info!("Subscribed to beacon events: {events}"); @@ -255,125 +247,25 @@ impl Indexer { let event_name = event.event.as_str(); match event_name { - "chain_reorg" => { - let chain_reorg_span = tracing::info_span!("chain_reorg"); - - let result: Result<(), ChainReorgedEventHandlingError> = async { - let reorg_block_data = - serde_json::from_str::(&event.data)?; - let slot = reorg_block_data.slot; - let old_head_block = reorg_block_data.old_head_block; - let target_depth = reorg_block_data.depth; - - let mut current_reorged_block = old_head_block; - let mut reorged_slots: Vec = vec![]; - - for current_depth in 1..=target_depth { - let reorged_block_head = match beacon_client.get_block_header(&BlockId::Hash(current_reorged_block)).await.map_err(|err| ChainReorgedEventHandlingError::BlockRetrievalError(get_full_hash(¤t_reorged_block), err))? { - Some(block) => block, - None => { - warn!(event=event_name, slot=slot, "Found {current_depth} out of {target_depth} reorged blocks only"); - break - } - }; - - reorged_slots.push(reorged_block_head.header.message.slot); - current_reorged_block = reorged_block_head.header.message.parent_root; - } - - let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await.map_err(|err| ChainReorgedEventHandlingError::ReorgedHandlingFailure(target_depth, get_full_hash(&old_head_block), err))?; - - info!(event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); - - Ok(()) - }.instrument(chain_reorg_span).await; - - if let Err(error) = result { - // If an error occurred while processing the event, try to update the latest synced slot to the last known slot before the reorg - if let Ok(reorg_block_data) = serde_json::from_str::(&event.data) { - let slot = reorg_block_data.slot; - - let _ = blobscan_client.update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot: None, - last_upper_synced_slot: Some(slot -1) - }).await; - } - - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - }, - "head" => { - let head_span = tracing::info_span!("head_block"); - - let result: Result<(), HeadBlockEventHandlingError> = async { - let head_block_data = - serde_json::from_str::(&event.data)?; - - - let head_block_id = &BlockId::Slot(head_block_data.slot); - let initial_block_id = if is_initial_sync_to_head { - is_initial_sync_to_head = false; - - &start_block_id - } else { - head_block_id - }; - - synchronizer.run(initial_block_id, &BlockId::Slot(head_block_data.slot + 1)).await?; - - Ok(()) - }.instrument(head_span).await; - - if let Err(error) = result { - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - }, + "head" => { + head_event_handler + .handle(event.data) + .instrument(tracing::info_span!("head_block")) + .await?; + } "finalized_checkpoint" => { - let finalized_checkpoint_span = tracing::info_span!("finalized_checkpoint"); - - let result: Result<(), FinalizedBlockEventHandlingError> = async move { - let finalized_checkpoint_data = - serde_json::from_str::( - &event.data, - )?; - let block_hash = finalized_checkpoint_data.block; - let last_finalized_block_number = beacon_client - .get_block(&BlockId::Hash(block_hash)) - .await.map_err(|err| FinalizedBlockEventHandlingError::BlockRetrievalError(get_full_hash(&block_hash), err))? - .with_context(|| { - anyhow!("Finalized block not found") - })? - .message.body.execution_payload - .with_context(|| { - anyhow!("Finalized block has no execution payload") - })?.block_number; - - blobscan_client - .update_sync_state(BlockchainSyncState { - last_lower_synced_slot: None, - last_upper_synced_slot: None, - last_finalized_block: Some( - last_finalized_block_number - ), - }) - .await.map_err(FinalizedBlockEventHandlingError::BlobscanSyncStateUpdateError)?; - - info!(finalized_execution_block=last_finalized_block_number, "Finalized checkpoint event received. Updated last finalized block number"); - - Ok(()) - }.instrument(finalized_checkpoint_span).await; - - if let Err(error) = result { - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - - }, + finalized_checkpoint_event_handler + .handle(event.data) + .instrument(tracing::info_span!("finalized_checkpoint")) + .await?; + } unexpected_event_id => { - return Err(RealtimeSyncingError::UnexpectedBeaconEvent(unexpected_event_id.to_string())); - }, + return Err(LiveIndexingError::UnexpectedBeaconEvent( + unexpected_event_id.to_string(), + )); + } } - }, + } Err(error) => { event_source.close(); @@ -397,7 +289,7 @@ impl Indexer { }) } - fn _create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { + fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { let mut synchronizer_builder = SynchronizerBuilder::new(); if let Some(checkpoint_slots) = self.checkpoint_slots { diff --git a/src/indexer/types.rs b/src/indexer/types.rs index bde0077..36189b1 100644 --- a/src/indexer/types.rs +++ b/src/indexer/types.rs @@ -1,8 +1,8 @@ -use super::error::{IndexerError, SyncingTaskError}; +use super::error::{IndexerError, IndexingError}; pub type IndexerResult = Result; pub enum IndexerTaskMessage { Done, - Error(SyncingTaskError), + Error(IndexingError), } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 8487d5f..856320e 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -83,22 +83,22 @@ impl SynchronizerBuilder { impl Synchronizer { pub async fn run( - &mut self, + &self, initial_block_id: &BlockId, final_block_id: &BlockId, ) -> Result<(), SynchronizerError> { - let initial_slot = self._resolve_to_slot(initial_block_id).await?; - let mut final_slot = self._resolve_to_slot(final_block_id).await?; + let initial_slot = self.resolve_to_slot(initial_block_id).await?; + let mut final_slot = self.resolve_to_slot(final_block_id).await?; if initial_slot == final_slot { return Ok(()); } loop { - self._sync_slots_by_checkpoints(initial_slot, final_slot) + self.sync_slots_by_checkpoints(initial_slot, final_slot) .await?; - let latest_final_slot = self._resolve_to_slot(final_block_id).await?; + let latest_final_slot = self.resolve_to_slot(final_block_id).await?; if final_slot == latest_final_slot { return Ok(()); @@ -108,7 +108,7 @@ impl Synchronizer { } } - async fn _sync_slots(&mut self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { + async fn sync_slots(&self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { let is_reverse_sync = to_slot < from_slot; let unprocessed_slots = to_slot.abs_diff(from_slot); let min_slots_per_thread = std::cmp::min(unprocessed_slots, self.min_slots_per_thread); @@ -190,8 +190,8 @@ impl Synchronizer { } } - async fn _sync_slots_by_checkpoints( - &mut self, + async fn sync_slots_by_checkpoints( + &self, initial_slot: u32, final_slot: u32, ) -> Result<(), SynchronizerError> { @@ -222,7 +222,7 @@ impl Synchronizer { checkpoint_final_slot = final_chunk_slot ); - self._sync_slots(initial_chunk_slot, final_chunk_slot) + self.sync_slots(initial_chunk_slot, final_chunk_slot) .instrument(sync_slots_chunk_span) .await?; @@ -293,7 +293,7 @@ impl Synchronizer { Ok(()) } - async fn _resolve_to_slot(&self, block_id: &BlockId) -> Result { + async fn resolve_to_slot(&self, block_id: &BlockId) -> Result { let beacon_client = self.context.beacon_client(); let resolved_block_id: Result = match block_id { From 0d2b0596fc7dfde16fa22781637a9fdb79d6bee5 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 16 Jun 2024 03:06:23 +0200 Subject: [PATCH 2/8] chore: remove empty file --- src/clients/beacon/errors.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/clients/beacon/errors.rs diff --git a/src/clients/beacon/errors.rs b/src/clients/beacon/errors.rs deleted file mode 100644 index e69de29..0000000 From 9dd0350f5b8f15d3e19b6d22d4e50cc171c4b8d8 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 16 Jun 2024 12:17:48 +0200 Subject: [PATCH 3/8] refactor: abstract all clients and context functions into traits --- Cargo.lock | 11 ++++- Cargo.toml | 3 +- src/clients/beacon/mod.rs | 23 +++++++-- src/clients/blobscan/mod.rs | 33 ++++++++++--- src/context.rs | 47 ++++++++++++------- .../event_handlers/finalized_checkpoint.rs | 11 +++-- src/indexer/event_handlers/head.rs | 18 ++++--- src/indexer/mod.rs | 14 +++--- src/slots_processor/mod.rs | 32 ++++++++++--- src/synchronizer/mod.rs | 14 ++++-- 10 files changed, 145 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b1cd3c..dbcfb46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,9 +127,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", @@ -319,6 +319,7 @@ dependencies = [ "chrono", "clap", "dotenv", + "dyn-clone", "envy", "ethers", "futures", @@ -727,6 +728,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "ecdsa" version = "0.14.8" diff --git a/Cargo.toml b/Cargo.toml index fd582e2..99e8629 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.66" +async-trait = "0.1.80" +dyn-clone = "1.0.17" dotenv = "0.15.0" envy = "0.4.2" ethers = "1.0.2" diff --git a/src/clients/beacon/mod.rs b/src/clients/beacon/mod.rs index 28c0a3e..4d1fe15 100644 --- a/src/clients/beacon/mod.rs +++ b/src/clients/beacon/mod.rs @@ -1,5 +1,9 @@ +use std::fmt::Debug; + use anyhow::Context as AnyhowContext; +use async_trait::async_trait; use backoff::ExponentialBackoff; + use reqwest::{Client, Url}; use reqwest_eventsource::EventSource; @@ -24,6 +28,14 @@ pub struct Config { pub exp_backoff: Option, } +#[async_trait] +pub trait CommonBeaconClient: Send + Sync + Debug { + async fn get_block(&self, block_id: &BlockId) -> ClientResult>; + async fn get_block_header(&self, block_id: &BlockId) -> ClientResult>; + async fn get_blobs(&self, block_id: &BlockId) -> ClientResult>>; + fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult; +} + impl BeaconClient { pub fn try_with_client(client: Client, config: Config) -> ClientResult { let base_url = Url::parse(&format!("{}/eth/", config.base_url)) @@ -36,8 +48,11 @@ impl BeaconClient { exp_backoff, }) } +} - pub async fn get_block(&self, block_id: &BlockId) -> ClientResult> { +#[async_trait] +impl CommonBeaconClient for BeaconClient { + async fn get_block(&self, block_id: &BlockId) -> ClientResult> { let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() }); let url = self.base_url.join(path.as_str())?; @@ -47,7 +62,7 @@ impl BeaconClient { }) } - pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult> { + async fn get_block_header(&self, block_id: &BlockId) -> ClientResult> { let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() }); let url = self.base_url.join(path.as_str())?; @@ -63,7 +78,7 @@ impl BeaconClient { }) } - pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult>> { + async fn get_blobs(&self, block_id: &BlockId) -> ClientResult>> { let path = format!("v1/beacon/blob_sidecars/{}", { block_id.to_detailed_string() }); @@ -75,7 +90,7 @@ impl BeaconClient { }) } - pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult { + fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult { let topics = topics .iter() .map(|topic| topic.into()) diff --git a/src/clients/blobscan/mod.rs b/src/clients/blobscan/mod.rs index 8af32bb..693284d 100644 --- a/src/clients/blobscan/mod.rs +++ b/src/clients/blobscan/mod.rs @@ -1,3 +1,6 @@ +use std::fmt::Debug; + +use async_trait::async_trait; use backoff::ExponentialBackoff; use chrono::TimeDelta; use reqwest::{Client, Url}; @@ -18,6 +21,23 @@ use self::{ mod jwt_manager; pub mod types; + +#[async_trait] +pub trait CommonBlobscanClient: Send + Sync + Debug { + fn try_with_client(client: Client, config: Config) -> ClientResult + where + Self: Sized; + async fn index( + &self, + block: Block, + transactions: Vec, + blobs: Vec, + ) -> ClientResult<()>; + async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult; + async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>; + async fn get_sync_state(&self) -> ClientResult>; +} + #[derive(Debug, Clone)] pub struct BlobscanClient { base_url: Url, @@ -32,8 +52,9 @@ pub struct Config { pub exp_backoff: Option, } -impl BlobscanClient { - pub fn try_with_client(client: Client, config: Config) -> ClientResult { +#[async_trait] +impl CommonBlobscanClient for BlobscanClient { + fn try_with_client(client: Client, config: Config) -> ClientResult { let base_url = Url::parse(&format!("{}/", config.base_url))?; let jwt_manager = JWTManager::new(JWTManagerConfig { secret_key: config.secret_key, @@ -50,7 +71,7 @@ impl BlobscanClient { }) } - pub async fn index( + async fn index( &self, block: Block, transactions: Vec, @@ -67,7 +88,7 @@ impl BlobscanClient { json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult { + async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult { let url = self.base_url.join("indexer/reorged-slots")?; let token = self.jwt_manager.get_token()?; let req = ReorgedSlotsRequest { @@ -78,7 +99,7 @@ impl BlobscanClient { .map(|res: Option| res.unwrap().total_updated_slots) } - pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> { + async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> { let url = self.base_url.join("blockchain-sync-state")?; let token = self.jwt_manager.get_token()?; let req: BlockchainSyncStateRequest = sync_state.into(); @@ -86,7 +107,7 @@ impl BlobscanClient { json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn get_sync_state(&self) -> ClientResult> { + async fn get_sync_state(&self) -> ClientResult> { let url = self.base_url.join("blockchain-sync-state")?; json_get!( &self.client, diff --git a/src/context.rs b/src/context.rs index 8bcfd7e..ec24b4c 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,20 +1,24 @@ -use std::{sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use anyhow::Result as AnyhowResult; use backoff::ExponentialBackoffBuilder; -use ethers::prelude::*; +use dyn_clone::DynClone; +use ethers::providers::{Http as HttpProvider, Provider}; use crate::{ - clients::beacon::{BeaconClient, Config as BeaconClientConfig}, - clients::blobscan::{BlobscanClient, Config as BlobscanClientConfig}, + clients::{ + beacon::{BeaconClient, CommonBeaconClient, Config as BeaconClientConfig}, + blobscan::{BlobscanClient, CommonBlobscanClient, Config as BlobscanClientConfig}, + }, env::Environment, }; -#[derive(Debug, Clone)] -struct ContextRef { - pub beacon_client: BeaconClient, - pub blobscan_client: BlobscanClient, - pub provider: Provider, +dyn_clone::clone_trait_object!(CommonContext); + +pub trait CommonContext: Send + Sync + Debug + DynClone { + fn beacon_client(&self) -> &Box; + fn blobscan_client(&self) -> &Box; + fn provider(&self) -> &Provider; } pub struct Config { @@ -24,6 +28,13 @@ pub struct Config { pub secret_key: String, } +#[derive(Debug)] +struct ContextRef { + pub beacon_client: Box, + pub blobscan_client: Box, + pub provider: Provider, +} + #[derive(Debug, Clone)] pub struct Context { inner: Arc, @@ -45,35 +56,37 @@ impl Context { Ok(Self { inner: Arc::new(ContextRef { - blobscan_client: BlobscanClient::try_with_client( + blobscan_client: Box::new(BlobscanClient::try_with_client( client.clone(), BlobscanClientConfig { base_url: blobscan_api_endpoint, secret_key, exp_backoff: exp_backoff.clone(), }, - )?, - beacon_client: BeaconClient::try_with_client( + )?), + beacon_client: Box::new(BeaconClient::try_with_client( client, BeaconClientConfig { base_url: beacon_node_url, exp_backoff, }, - )?, - provider: Provider::::try_from(execution_node_endpoint)?, + )?), + provider: Provider::::try_from(execution_node_endpoint)?, }), }) } +} - pub fn beacon_client(&self) -> &BeaconClient { +impl CommonContext for Context { + fn beacon_client(&self) -> &Box { &self.inner.beacon_client } - pub fn blobscan_client(&self) -> &BlobscanClient { + fn blobscan_client(&self) -> &Box { &self.inner.blobscan_client } - pub fn provider(&self) -> &Provider { + fn provider(&self) -> &Provider { &self.inner.provider } } diff --git a/src/indexer/event_handlers/finalized_checkpoint.rs b/src/indexer/event_handlers/finalized_checkpoint.rs index 2e37290..0d70f50 100644 --- a/src/indexer/event_handlers/finalized_checkpoint.rs +++ b/src/indexer/event_handlers/finalized_checkpoint.rs @@ -1,3 +1,4 @@ +use ethers::providers::Http as HttpProvider; use tracing::info; use crate::{ @@ -6,7 +7,7 @@ use crate::{ blobscan::types::BlockchainSyncState, common::ClientError, }, - context::Context, + context::CommonContext, utils::web3::get_full_hash, }; @@ -22,12 +23,12 @@ pub enum FinalizedCheckpointEventHandlerError { BlobscanFinalizedBlockUpdateFailure(#[source] ClientError), } -pub struct FinalizedCheckpointHandler { - context: Context, +pub struct FinalizedCheckpointHandler { + context: Box>, } -impl FinalizedCheckpointHandler { - pub fn new(context: Context) -> Self { +impl FinalizedCheckpointHandler { + pub fn new(context: Box>) -> Self { FinalizedCheckpointHandler { context } } diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs index faad56d..55062c9 100644 --- a/src/indexer/event_handlers/head.rs +++ b/src/indexer/event_handlers/head.rs @@ -1,6 +1,6 @@ use std::cmp; -use ethers::types::H256; +use ethers::{providers::Http as HttpProvider, types::H256}; use tracing::info; use crate::{ @@ -9,7 +9,7 @@ use crate::{ blobscan::types::BlockchainSyncState, common::ClientError, }, - context::Context, + context::CommonContext, synchronizer::{error::SynchronizerError, Synchronizer}, }; @@ -29,15 +29,19 @@ pub enum HeadEventHandlerError { BlobscanSyncStateUpdateError(#[source] ClientError), } -pub struct HeadEventHandler { - context: Context, - synchronizer: Synchronizer, +pub struct HeadEventHandler { + context: Box>, + synchronizer: Synchronizer, start_block_id: BlockId, last_block_hash: Option, } -impl HeadEventHandler { - pub fn new(context: Context, synchronizer: Synchronizer, start_block_id: BlockId) -> Self { +impl HeadEventHandler { + pub fn new( + context: Box>, + synchronizer: Synchronizer, + start_block_id: BlockId, + ) -> Self { HeadEventHandler { context, synchronizer, diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 02ef0e7..eebec8a 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,7 +1,7 @@ use std::thread; use anyhow::anyhow; - +use ethers::providers::Http as HttpProvider; use event_handlers::{finalized_checkpoint::FinalizedCheckpointHandler, head::HeadEventHandler}; use futures::StreamExt; use reqwest_eventsource::Event; @@ -11,7 +11,7 @@ use tracing::{debug, error, info, Instrument}; use crate::{ args::Args, clients::beacon::types::{BlockId, Topic}, - context::{Config as ContextConfig, Context}, + context::{CommonContext, Config as ContextConfig, Context}, env::Environment, indexer::error::HistoricalIndexingError, synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder}, @@ -26,8 +26,8 @@ pub mod error; pub mod event_handlers; pub mod types; -pub struct Indexer { - context: Context, +pub struct Indexer { + context: Box>, dencun_fork_slot: u32, disable_sync_historical: bool, @@ -36,7 +36,7 @@ pub struct Indexer { num_threads: u32, } -impl Indexer { +impl Indexer { pub fn try_new(env: &Environment, args: &Args) -> IndexerResult { let context = match Context::try_new(ContextConfig::from(env)) { Ok(c) => c, @@ -74,7 +74,7 @@ impl Indexer { .unwrap_or(env.network_name.dencun_fork_slot()); Ok(Self { - context, + context: Box::new(context), dencun_fork_slot, disable_sync_historical, checkpoint_slots, @@ -289,7 +289,7 @@ impl Indexer { }) } - fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { + fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { let mut synchronizer_builder = SynchronizerBuilder::new(); if let Some(checkpoint_slots) = self.checkpoint_slots { diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index 9e1e0af..5766213 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -1,14 +1,17 @@ use anyhow::{anyhow, Context as AnyhowContext, Result}; -use ethers::prelude::*; +use ethers::{ + providers::{Http as HttpProvider, Middleware}, + types::H256, +}; use tracing::{debug, info}; use crate::{ clients::{ - beacon::types::BlockId, + beacon::types::{BlockHeader, BlockId}, blobscan::types::{Blob, Block, Transaction}, }, - context::Context, + context::CommonContext, }; use self::error::{SlotProcessingError, SlotsProcessorError}; @@ -17,12 +20,27 @@ use self::helpers::{create_tx_hash_versioned_hashes_mapping, create_versioned_ha pub mod error; mod helpers; -pub struct SlotsProcessor { - context: Context, +pub struct SlotsProcessor { + context: Box>, +} + +#[derive(Debug, Clone)] +pub struct BlockData { + pub root: H256, + pub slot: u32, +} + +impl From for BlockData { + fn from(block_header: BlockHeader) -> Self { + Self { + root: block_header.root, + slot: block_header.header.message.slot, + } + } } -impl SlotsProcessor { - pub fn new(context: Context) -> SlotsProcessor { +impl SlotsProcessor { + pub fn new(context: Box>) -> SlotsProcessor { Self { context } } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 856320e..36b7df3 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -1,11 +1,12 @@ use anyhow::anyhow; +use ethers::providers::Http as HttpProvider; use futures::future::join_all; use tokio::task::JoinHandle; use tracing::{debug, info, Instrument}; use crate::{ clients::{beacon::types::BlockId, blobscan::types::BlockchainSyncState, common::ClientError}, - context::Context, + context::CommonContext, slots_processor::{error::SlotsProcessorError, SlotsProcessor}, }; @@ -22,8 +23,8 @@ pub struct SynchronizerBuilder { } #[derive(Debug)] -pub struct Synchronizer { - context: Context, +pub struct Synchronizer { + context: Box>, num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, @@ -70,7 +71,10 @@ impl SynchronizerBuilder { self } - pub fn build(&self, context: Context) -> Synchronizer { + pub fn build( + &self, + context: Box>, + ) -> Synchronizer { Synchronizer { context, num_threads: self.num_threads, @@ -81,7 +85,7 @@ impl SynchronizerBuilder { } } -impl Synchronizer { +impl Synchronizer { pub async fn run( &self, initial_block_id: &BlockId, From 7dfd24d583f1a4ca8b73547581d15431defdd64b Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 16 Jun 2024 14:25:46 +0200 Subject: [PATCH 4/8] refactor: abstract synchronizer's functions into a trait --- src/indexer/event_handlers/head.rs | 6 +-- src/indexer/mod.rs | 6 +-- src/synchronizer/mod.rs | 67 ++++++++++++++++++------------ 3 files changed, 47 insertions(+), 32 deletions(-) diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs index 55062c9..e1ec695 100644 --- a/src/indexer/event_handlers/head.rs +++ b/src/indexer/event_handlers/head.rs @@ -10,7 +10,7 @@ use crate::{ common::ClientError, }, context::CommonContext, - synchronizer::{error::SynchronizerError, Synchronizer}, + synchronizer::{error::SynchronizerError, CommonSynchronizer}, }; #[derive(Debug, thiserror::Error)] @@ -31,7 +31,7 @@ pub enum HeadEventHandlerError { pub struct HeadEventHandler { context: Box>, - synchronizer: Synchronizer, + synchronizer: Box, start_block_id: BlockId, last_block_hash: Option, } @@ -39,7 +39,7 @@ pub struct HeadEventHandler { impl HeadEventHandler { pub fn new( context: Box>, - synchronizer: Synchronizer, + synchronizer: Box, start_block_id: BlockId, ) -> Self { HeadEventHandler { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index eebec8a..0236076 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -14,7 +14,7 @@ use crate::{ context::{CommonContext, Config as ContextConfig, Context}, env::Environment, indexer::error::HistoricalIndexingError, - synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder}, + synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder}, }; use self::{ @@ -289,7 +289,7 @@ impl Indexer { }) } - fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { + fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Box { let mut synchronizer_builder = SynchronizerBuilder::new(); if let Some(checkpoint_slots) = self.checkpoint_slots { @@ -302,6 +302,6 @@ impl Indexer { synchronizer_builder.with_num_threads(self.num_threads); - synchronizer_builder.build(self.context.clone()) + Box::new(synchronizer_builder.build(self.context.clone())) } } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 36b7df3..167dfe9 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -1,4 +1,7 @@ +use std::fmt::Debug; + use anyhow::anyhow; +use async_trait::async_trait; use ethers::providers::Http as HttpProvider; use futures::future::join_all; use tokio::task::JoinHandle; @@ -14,6 +17,15 @@ use self::error::{SlotsChunksErrors, SynchronizerError}; pub mod error; +#[async_trait] +pub trait CommonSynchronizer: Send + Sync + Debug { + async fn run( + &self, + initial_block_id: &BlockId, + final_block_id: &BlockId, + ) -> Result<(), SynchronizerError>; +} + #[derive(Debug)] pub struct SynchronizerBuilder { num_threads: u32, @@ -86,32 +98,6 @@ impl SynchronizerBuilder { } impl Synchronizer { - pub async fn run( - &self, - initial_block_id: &BlockId, - final_block_id: &BlockId, - ) -> Result<(), SynchronizerError> { - let initial_slot = self.resolve_to_slot(initial_block_id).await?; - let mut final_slot = self.resolve_to_slot(final_block_id).await?; - - if initial_slot == final_slot { - return Ok(()); - } - - loop { - self.sync_slots_by_checkpoints(initial_slot, final_slot) - .await?; - - let latest_final_slot = self.resolve_to_slot(final_block_id).await?; - - if final_slot == latest_final_slot { - return Ok(()); - } - - final_slot = latest_final_slot; - } - } - async fn sync_slots(&self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { let is_reverse_sync = to_slot < from_slot; let unprocessed_slots = to_slot.abs_diff(from_slot); @@ -322,3 +308,32 @@ impl Synchronizer { } } } + +#[async_trait] +impl CommonSynchronizer for Synchronizer { + async fn run( + &self, + initial_block_id: &BlockId, + final_block_id: &BlockId, + ) -> Result<(), SynchronizerError> { + let initial_slot = self.resolve_to_slot(initial_block_id).await?; + let mut final_slot = self.resolve_to_slot(final_block_id).await?; + + if initial_slot == final_slot { + return Ok(()); + } + + loop { + self.sync_slots_by_checkpoints(initial_slot, final_slot) + .await?; + + let latest_final_slot = self.resolve_to_slot(final_block_id).await?; + + if final_slot == latest_final_slot { + return Ok(()); + } + + final_slot = latest_final_slot; + } + } +} From 315e38d28c52e26150e4fa6095ae58f116ac380f Mon Sep 17 00:00:00 2001 From: PJColombo Date: Tue, 18 Jun 2024 18:23:05 +0200 Subject: [PATCH 5/8] test: add `mockall` dep --- Cargo.lock | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 3 +++ 2 files changed, 75 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index dbcfb46..d3aff70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -325,6 +325,7 @@ dependencies = [ "futures", "hex", "jsonwebtoken", + "mockall", "reqwest", "reqwest-eventsource", "sentry", @@ -722,6 +723,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dunce" version = "1.0.4" @@ -1186,6 +1193,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "funty" version = "2.0.0" @@ -1764,6 +1777,33 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -2136,6 +2176,32 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -3045,6 +3111,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.57" diff --git a/Cargo.toml b/Cargo.toml index 99e8629..5fc9d7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,6 @@ anyhow = { version = "1.0.70", features = ["backtrace"] } thiserror = "1.0.40" sentry = { version = "0.31.2", features = ["debug-images"] } sentry-tracing = "0.31.2" + +[dev-dependencies] +mockall = "0.12.1" From 3b3f90680863fca228d30875256f18bc265cc551 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Tue, 18 Jun 2024 18:24:54 +0200 Subject: [PATCH 6/8] test: create mocks for indexer components --- src/clients/beacon/mod.rs | 4 +++ src/clients/blobscan/mod.rs | 5 ++++ src/context.rs | 54 +++++++++++++++++++++++++++++++------ src/synchronizer/mod.rs | 4 +++ 4 files changed, 59 insertions(+), 8 deletions(-) diff --git a/src/clients/beacon/mod.rs b/src/clients/beacon/mod.rs index 4d1fe15..f06ff21 100644 --- a/src/clients/beacon/mod.rs +++ b/src/clients/beacon/mod.rs @@ -7,6 +7,9 @@ use backoff::ExponentialBackoff; use reqwest::{Client, Url}; use reqwest_eventsource::EventSource; +#[cfg(test)] +use mockall::automock; + use crate::{ clients::{beacon::types::BlockHeaderResponse, common::ClientResult}, json_get, @@ -29,6 +32,7 @@ pub struct Config { } #[async_trait] +#[cfg_attr(test, automock)] pub trait CommonBeaconClient: Send + Sync + Debug { async fn get_block(&self, block_id: &BlockId) -> ClientResult>; async fn get_block_header(&self, block_id: &BlockId) -> ClientResult>; diff --git a/src/clients/blobscan/mod.rs b/src/clients/blobscan/mod.rs index 693284d..edbc15a 100644 --- a/src/clients/blobscan/mod.rs +++ b/src/clients/blobscan/mod.rs @@ -5,6 +5,9 @@ use backoff::ExponentialBackoff; use chrono::TimeDelta; use reqwest::{Client, Url}; +#[cfg(test)] +use mockall::automock; + use crate::{ clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult}, json_get, json_put, @@ -23,6 +26,7 @@ mod jwt_manager; pub mod types; #[async_trait] +#[cfg_attr(test, automock)] pub trait CommonBlobscanClient: Send + Sync + Debug { fn try_with_client(client: Client, config: Config) -> ClientResult where @@ -53,6 +57,7 @@ pub struct Config { } #[async_trait] + impl CommonBlobscanClient for BlobscanClient { fn try_with_client(client: Client, config: Config) -> ClientResult { let base_url = Url::parse(&format!("{}/", config.base_url))?; diff --git a/src/context.rs b/src/context.rs index ec24b4c..ace4648 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; use anyhow::Result as AnyhowResult; use backoff::ExponentialBackoffBuilder; use dyn_clone::DynClone; -use ethers::providers::{Http as HttpProvider, Provider}; +use ethers::providers::{Http as HttpProvider, MockProvider, Provider}; use crate::{ clients::{ @@ -13,7 +13,8 @@ use crate::{ env::Environment, }; -dyn_clone::clone_trait_object!(CommonContext); +#[cfg(test)] +use crate::clients::{beacon::MockCommonBeaconClient, blobscan::MockCommonBlobscanClient}; pub trait CommonContext: Send + Sync + Debug + DynClone { fn beacon_client(&self) -> &Box; @@ -21,6 +22,9 @@ pub trait CommonContext: Send + Sync + Debug + DynClone { fn provider(&self) -> &Provider; } +dyn_clone::clone_trait_object!(CommonContext); +dyn_clone::clone_trait_object!(CommonContext); + pub struct Config { pub blobscan_api_endpoint: String, pub beacon_node_url: String, @@ -29,18 +33,18 @@ pub struct Config { } #[derive(Debug)] -struct ContextRef { +struct ContextRef { pub beacon_client: Box, pub blobscan_client: Box, - pub provider: Provider, + pub provider: Provider, } #[derive(Debug, Clone)] -pub struct Context { - inner: Arc, +pub struct Context { + inner: Arc>, } -impl Context { +impl Context { pub fn try_new(config: Config) -> AnyhowResult { let Config { blobscan_api_endpoint, @@ -77,7 +81,7 @@ impl Context { } } -impl CommonContext for Context { +impl CommonContext for Context { fn beacon_client(&self) -> &Box { &self.inner.beacon_client } @@ -101,3 +105,37 @@ impl From<&Environment> for Config { } } } + +#[cfg(test)] +impl Context { + pub fn new( + beacon_client: Option, + blobscan_client: Option, + provider: Option>, + ) -> Box { + Box::new(Self { + inner: Arc::new(ContextRef { + beacon_client: Box::new(beacon_client.unwrap_or(MockCommonBeaconClient::new())), + blobscan_client: Box::new( + blobscan_client.unwrap_or(MockCommonBlobscanClient::new()), + ), + provider: provider.unwrap_or(Provider::mocked().0), + }), + }) + } +} + +#[cfg(test)] +impl CommonContext for Context { + fn beacon_client(&self) -> &Box { + &self.inner.beacon_client + } + + fn blobscan_client(&self) -> &Box { + &self.inner.blobscan_client + } + + fn provider(&self) -> &Provider { + &self.inner.provider + } +} diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 167dfe9..9f4f660 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -7,6 +7,9 @@ use futures::future::join_all; use tokio::task::JoinHandle; use tracing::{debug, info, Instrument}; +#[cfg(test)] +use mockall::automock; + use crate::{ clients::{beacon::types::BlockId, blobscan::types::BlockchainSyncState, common::ClientError}, context::CommonContext, @@ -18,6 +21,7 @@ use self::error::{SlotsChunksErrors, SynchronizerError}; pub mod error; #[async_trait] +#[cfg_attr(test, automock)] pub trait CommonSynchronizer: Send + Sync + Debug { async fn run( &self, From fa7d7356ac48808dbcf172ba1bd5c9938706f88c Mon Sep 17 00:00:00 2001 From: PJColombo Date: Tue, 18 Jun 2024 19:01:04 +0200 Subject: [PATCH 7/8] test(HeadEventHandler): add tests --- src/clients/beacon/types.rs | 2 +- src/clients/blobscan/types.rs | 2 +- src/indexer/event_handlers/head.rs | 569 ++++++++++++++++++++++++++++- 3 files changed, 568 insertions(+), 5 deletions(-) diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index a5fcea2..4621602 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -3,7 +3,7 @@ use std::{fmt, str::FromStr}; use ethers::types::{Bytes, H256}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Debug, Clone)] +#[derive(Serialize, Debug, Clone, PartialEq)] pub enum BlockId { Head, Finalized, diff --git a/src/clients/blobscan/types.rs b/src/clients/blobscan/types.rs index 95ce669..4ae7157 100644 --- a/src/clients/blobscan/types.rs +++ b/src/clients/blobscan/types.rs @@ -71,7 +71,7 @@ pub struct BlockchainSyncStateResponse { pub last_upper_synced_slot: Option, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct BlockchainSyncState { pub last_finalized_block: Option, pub last_lower_synced_slot: Option, diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs index e1ec695..50de1be 100644 --- a/src/indexer/event_handlers/head.rs +++ b/src/indexer/event_handlers/head.rs @@ -1,6 +1,7 @@ use std::cmp; -use ethers::{providers::Http as HttpProvider, types::H256}; +use ethers::providers::JsonRpcClient; +use ethers::types::H256; use tracing::info; use crate::{ @@ -36,9 +37,12 @@ pub struct HeadEventHandler { last_block_hash: Option, } -impl HeadEventHandler { +impl HeadEventHandler +where + T: JsonRpcClient + Send + Sync + 'static, +{ pub fn new( - context: Box>, + context: Box>, synchronizer: Box, start_block_id: BlockId, ) -> Self { @@ -141,3 +145,562 @@ impl HeadEventHandler { } } } + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + use ethers::types::H256; + use mockall::predicate::eq; + + use super::HeadEventHandler; + use crate::{ + clients::{ + beacon::{ + types::{BlockHeader, BlockHeaderMessage, BlockId, InnerBlockHeader}, + MockCommonBeaconClient, + }, + blobscan::{types::BlockchainSyncState, MockCommonBlobscanClient}, + }, + context::Context, + synchronizer::MockCommonSynchronizer, + }; + + #[derive(Clone, Debug)] + struct BlockData { + slot: u32, + hash: H256, + parent_hash: Option, + } + + impl BlockData { + pub fn to_head_event(self) -> String { + format!( + r#"{{"slot": "{}", "block": "{}"}}"#, + self.slot, + format!("0x{:x}", self.hash) + ) + } + } + + #[tokio::test] + async fn test_handler_on_initial_event() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let block_data = Box::new(BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: None, + }); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &block_data, + Some(initial_start_block_id.clone()), + ); + + let mock_context = Context::new(Some(mock_beacon_client), None, None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler.handle(block_data.to_head_event()).await; + + assert!(result.is_ok()) + } + + #[tokio::test] + async fn test_handler_after_first_event() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let first_head_block = BlockData { + hash: _create_hash("5"), + slot: 5, + parent_hash: None, + }; + let second_head_block = BlockData { + hash: _create_hash("6"), + slot: 6, + parent_hash: Some(first_head_block.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &first_head_block, + Some(initial_start_block_id.clone()), + ); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &second_head_block, + None, + ); + + let mock_context = Context::new(Some(mock_beacon_client), None, None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(first_head_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handler to succeed" + ); + + let result = head_event_handler + .handle(second_head_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected second head event handler to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_reorg() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let before_reorg_block = BlockData { + slot: 2, + hash: _create_hash("2"), + parent_hash: Some(_create_hash("1")), + }; + let reorged_block = BlockData { + slot: 5, + hash: _create_hash("5"), + parent_hash: Some(_create_hash("4")), + }; + let after_reorg_block = BlockData { + slot: 6, + hash: _create_hash("3b"), + parent_hash: Some(before_reorg_block.hash), + }; + + _stub_get_block_header(&mut mock_beacon_client, &before_reorg_block); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &reorged_block, + Some(initial_start_block_id.clone()), + ); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &after_reorg_block, + None, + ); + + _stub_handle_reorged_slots( + &mut mock_blobscan_client, + (before_reorg_block.slot + 1..after_reorg_block.slot).collect::>(), + ); + + // We're expecting the synchronizer to re-sync the parent block of the reorged block + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(before_reorg_block.slot), + BlockId::Slot(before_reorg_block.slot + 1), + ); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(after_reorg_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected reorged head event handling to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_one_depth_reorg() { + // 4 -> 5a + // 5b -> 6 -> ... + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let block_before_reorg = BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: None, + }; + let reorged_block = BlockData { + slot: 5, + hash: _create_hash("50"), + parent_hash: Some(block_before_reorg.hash), + }; + let block_after_reorg = BlockData { + slot: 6, + hash: _create_hash("5"), + parent_hash: Some(block_before_reorg.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &reorged_block, + Some(initial_start_block_id.clone()), + ); + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &block_after_reorg, + None, + ); + + _stub_get_block_header(&mut mock_beacon_client, &block_before_reorg); + + _stub_handle_reorged_slots(&mut mock_blobscan_client, vec![reorged_block.slot]); + + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(block_before_reorg.slot), + BlockId::Slot(block_before_reorg.slot + 1), + ); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(block_after_reorg.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected reorged head event handling to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_one_depth_later_reorg() { + // 4 -> 5a -> 6 -> ... + // 5b + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let before_reorg_parent_block = BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: None, + }; + let before_reorg_block = BlockData { + slot: 5, + hash: _create_hash("50"), + parent_hash: Some(before_reorg_parent_block.hash), + }; + let reorged_block = BlockData { + slot: 6, + hash: _create_hash("5"), + parent_hash: Some(before_reorg_parent_block.hash), + }; + let after_reorg_block = BlockData { + slot: 7, + hash: _create_hash("7"), + parent_hash: Some(before_reorg_block.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &before_reorg_block, + Some(initial_start_block_id.clone()), + ); + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &reorged_block, + None, + ); + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &after_reorg_block, + None, + ); + + _stub_get_block_header(&mut mock_beacon_client, &before_reorg_parent_block); + + _stub_handle_reorged_slots(&mut mock_blobscan_client, vec![before_reorg_block.slot]); + + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(before_reorg_parent_block.slot), + BlockId::Slot(before_reorg_parent_block.slot + 1), + ); + + _stub_handle_reorged_slots(&mut mock_blobscan_client, vec![reorged_block.slot]); + + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(before_reorg_block.slot), + BlockId::Slot(before_reorg_block.slot + 1), + ); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(before_reorg_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected reorged head event handling to succeed" + ); + + let result = head_event_handler + .handle(after_reorg_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected after reorged head event handling to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_reorg_with_error() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let before_reorg_parent_block = BlockData { + slot: 3, + hash: _create_hash("3"), + parent_hash: None, + }; + let before_reorg_block = BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: Some(before_reorg_parent_block.hash), + }; + let first_block = BlockData { + slot: 5, + hash: _create_hash("5"), + parent_hash: Some(before_reorg_block.hash), + }; + let reorged_block = BlockData { + slot: 6, + hash: _create_hash("999"), + parent_hash: Some(before_reorg_block.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &first_block, + Some(initial_start_block_id.clone()), + ); + + _stub_get_block_header(&mut mock_beacon_client, &reorged_block); + + _stub_get_block_header(&mut mock_beacon_client, &before_reorg_block); + + mock_blobscan_client + .expect_handle_reorged_slots() + .returning(|_x| { + Box::pin(async move { + Err(crate::clients::common::ClientError::Other(anyhow!( + "Internal blobscan client error" + ))) + }) + }); + + mock_blobscan_client + .expect_update_sync_state() + .times(1) + .with(eq(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot: None, + last_upper_synced_slot: Some(before_reorg_parent_block.slot), + })) + .returning(|_x| Box::pin(async move { Ok(()) })); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler.handle(first_block.to_head_event()).await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_err(), + "Expected reorged head event handling to fail" + ); + } + + fn _prepare_handler_calls( + mock_beacon_client: &mut MockCommonBeaconClient, + mock_synchronizer: &mut MockCommonSynchronizer, + head_block_data: &BlockData, + initial_block_id: Option, + ) { + let slot = head_block_data.slot; + + _stub_get_block_header(mock_beacon_client, head_block_data); + + _stub_synchronizer_run( + mock_synchronizer, + initial_block_id.unwrap_or(BlockId::Slot(slot)), + BlockId::Slot(slot + 1), + ) + } + + fn _stub_get_block_header( + mock_beacon_client: &mut MockCommonBeaconClient, + block_data: &BlockData, + ) { + let root = block_data.hash; + let slot = block_data.slot; + let parent_root = block_data + .parent_hash + .unwrap_or(_create_hash((slot - 1).to_string().as_str())); + + mock_beacon_client + .expect_get_block_header() + .with(eq(BlockId::Slot(block_data.slot))) + .returning(move |_x| { + Box::pin(async move { + Ok(Some(BlockHeader { + root, + header: InnerBlockHeader { + message: BlockHeaderMessage { parent_root, slot }, + }, + })) + }) + }); + mock_beacon_client + .expect_get_block_header() + .with(eq(BlockId::Hash(block_data.hash))) + .returning(move |_x| { + Box::pin(async move { + Ok(Some(BlockHeader { + root, + header: InnerBlockHeader { + message: BlockHeaderMessage { parent_root, slot }, + }, + })) + }) + }); + } + + fn _stub_handle_reorged_slots( + mock_blobscan_client: &mut MockCommonBlobscanClient, + reorged_slots: Vec, + ) { + let reorged_slots_len = reorged_slots.len() as u32; + + mock_blobscan_client + .expect_handle_reorged_slots() + .with(eq(reorged_slots)) + .returning(move |_x| Box::pin(async move { Ok(reorged_slots_len) })); + } + + fn _stub_synchronizer_run( + mock_synchronizer: &mut MockCommonSynchronizer, + initial_block_id: BlockId, + final_block_id: BlockId, + ) { + mock_synchronizer + .expect_run() + .times(1) + .with(eq(initial_block_id.clone()), eq(final_block_id)) + .returning(|_x, _y| Box::pin(async { Ok(()) })); + } + + fn _create_hash(input: &str) -> H256 { + // Ensure the input string is at most 64 characters + let truncated_input = if input.len() > 64 { + &input[0..64] + } else { + input + }; + + // Format the string to have a length of 64 characters by padding with zeros + let hash = format!("0x{:0>64}", truncated_input); + + hash.parse().unwrap() + } + + fn _create_head_event(slot: u32, block_hash: H256) -> String { + let head_event = format!( + r#"{{"slot": "{}", "block": "{}"}}"#, + slot, + format!("0x{:x}", block_hash) + ); + + head_event + } + + // Additional tests for error handling, etc. +} From 36577be98bdab8cc58ee4f7004147928db0b9eb4 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Wed, 19 Jun 2024 13:34:36 +0200 Subject: [PATCH 8/8] style: resolve clippy issues --- src/clients/beacon/types.rs | 11 ----------- src/context.rs | 20 ++++++++++---------- src/indexer/event_handlers/head.rs | 15 ++++++++------- src/slots_processor/mod.rs | 22 ++-------------------- 4 files changed, 20 insertions(+), 48 deletions(-) diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index 4621602..3b6710d 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -16,7 +16,6 @@ pub enum BlockId { pub enum Topic { Head, FinalizedCheckpoint, - ChainReorg, } #[derive(Deserialize, Debug)] @@ -80,15 +79,6 @@ pub struct BlockHeaderMessage { pub slot: u32, } -#[derive(Deserialize, Debug)] -pub struct ChainReorgEventData { - pub old_head_block: H256, - #[serde(deserialize_with = "deserialize_number")] - pub slot: u32, - #[serde(deserialize_with = "deserialize_number")] - pub depth: u32, -} - #[derive(Deserialize, Debug)] pub struct HeadEventData { #[serde(deserialize_with = "deserialize_number")] @@ -161,7 +151,6 @@ impl FromStr for BlockId { impl From<&Topic> for String { fn from(value: &Topic) -> Self { match value { - Topic::ChainReorg => String::from("chain_reorg"), Topic::Head => String::from("head"), Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"), } diff --git a/src/context.rs b/src/context.rs index ace4648..9cdedbd 100644 --- a/src/context.rs +++ b/src/context.rs @@ -17,8 +17,8 @@ use crate::{ use crate::clients::{beacon::MockCommonBeaconClient, blobscan::MockCommonBlobscanClient}; pub trait CommonContext: Send + Sync + Debug + DynClone { - fn beacon_client(&self) -> &Box; - fn blobscan_client(&self) -> &Box; + fn beacon_client(&self) -> &dyn CommonBeaconClient; + fn blobscan_client(&self) -> &dyn CommonBlobscanClient; fn provider(&self) -> &Provider; } @@ -82,12 +82,12 @@ impl Context { } impl CommonContext for Context { - fn beacon_client(&self) -> &Box { - &self.inner.beacon_client + fn beacon_client(&self) -> &dyn CommonBeaconClient { + self.inner.beacon_client.as_ref() } - fn blobscan_client(&self) -> &Box { - &self.inner.blobscan_client + fn blobscan_client(&self) -> &dyn CommonBlobscanClient { + self.inner.blobscan_client.as_ref() } fn provider(&self) -> &Provider { @@ -127,12 +127,12 @@ impl Context { #[cfg(test)] impl CommonContext for Context { - fn beacon_client(&self) -> &Box { - &self.inner.beacon_client + fn beacon_client(&self) -> &dyn CommonBeaconClient { + self.inner.beacon_client.as_ref() } - fn blobscan_client(&self) -> &Box { - &self.inner.blobscan_client + fn blobscan_client(&self) -> &dyn CommonBlobscanClient { + self.inner.blobscan_client.as_ref() } fn provider(&self) -> &Provider { diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs index 50de1be..d2ff23e 100644 --- a/src/indexer/event_handlers/head.rs +++ b/src/indexer/event_handlers/head.rs @@ -345,8 +345,9 @@ mod tests { #[tokio::test] async fn test_handler_on_one_depth_reorg() { - // 4 -> 5a - // 5b -> 6 -> ... + // Slots: + // 4 -> 5 + // 6 -> 7 -> ... let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); let mut mock_beacon_client = MockCommonBeaconClient::new(); let mut mock_blobscan_client = MockCommonBlobscanClient::new(); @@ -417,9 +418,11 @@ mod tests { } #[tokio::test] - async fn test_handler_on_one_depth_later_reorg() { - // 4 -> 5a -> 6 -> ... - // 5b + async fn test_handler_on_one_depth_former_reorg() { + // Reorged block is reorged back to its former parent + // Slots: + // 4 -> 5 -> 7 -> ... + // 6 let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); let mut mock_beacon_client = MockCommonBeaconClient::new(); let mut mock_blobscan_client = MockCommonBlobscanClient::new(); @@ -701,6 +704,4 @@ mod tests { head_event } - - // Additional tests for error handling, etc. } diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index 5766213..259ab99 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -1,14 +1,11 @@ use anyhow::{anyhow, Context as AnyhowContext, Result}; -use ethers::{ - providers::{Http as HttpProvider, Middleware}, - types::H256, -}; +use ethers::providers::{Http as HttpProvider, Middleware}; use tracing::{debug, info}; use crate::{ clients::{ - beacon::types::{BlockHeader, BlockId}, + beacon::types::BlockId, blobscan::types::{Blob, Block, Transaction}, }, context::CommonContext, @@ -24,21 +21,6 @@ pub struct SlotsProcessor { context: Box>, } -#[derive(Debug, Clone)] -pub struct BlockData { - pub root: H256, - pub slot: u32, -} - -impl From for BlockData { - fn from(block_header: BlockHeader) -> Self { - Self { - root: block_header.root, - slot: block_header.header.message.slot, - } - } -} - impl SlotsProcessor { pub fn new(context: Box>) -> SlotsProcessor { Self { context }