diff --git a/Cargo.lock b/Cargo.lock index 6b1cd3c..d3aff70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,9 +127,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", @@ -319,11 +319,13 @@ dependencies = [ "chrono", "clap", "dotenv", + "dyn-clone", "envy", "ethers", "futures", "hex", "jsonwebtoken", + "mockall", "reqwest", "reqwest-eventsource", "sentry", @@ -721,12 +723,24 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dunce" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "ecdsa" version = "0.14.8" @@ -1179,6 +1193,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "funty" version = "2.0.0" @@ -1757,6 +1777,33 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -2129,6 +2176,32 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -3038,6 +3111,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.57" diff --git a/Cargo.toml b/Cargo.toml index fd582e2..5fc9d7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.66" +async-trait = "0.1.80" +dyn-clone = "1.0.17" dotenv = "0.15.0" envy = "0.4.2" ethers = "1.0.2" @@ -34,3 +35,6 @@ anyhow = { version = "1.0.70", features = ["backtrace"] } thiserror = "1.0.40" sentry = { version = "0.31.2", features = ["debug-images"] } sentry-tracing = "0.31.2" + +[dev-dependencies] +mockall = "0.12.1" diff --git a/src/clients/beacon/errors.rs b/src/clients/beacon/errors.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/clients/beacon/mod.rs b/src/clients/beacon/mod.rs index 28c0a3e..f06ff21 100644 --- a/src/clients/beacon/mod.rs +++ b/src/clients/beacon/mod.rs @@ -1,8 +1,15 @@ +use std::fmt::Debug; + use anyhow::Context as AnyhowContext; +use async_trait::async_trait; use backoff::ExponentialBackoff; + use reqwest::{Client, Url}; use reqwest_eventsource::EventSource; +#[cfg(test)] +use mockall::automock; + use crate::{ clients::{beacon::types::BlockHeaderResponse, common::ClientResult}, json_get, @@ -24,6 +31,15 @@ pub struct Config { pub exp_backoff: Option, } +#[async_trait] +#[cfg_attr(test, automock)] +pub trait CommonBeaconClient: Send + Sync + Debug { + async fn get_block(&self, block_id: &BlockId) -> ClientResult>; + async fn get_block_header(&self, block_id: &BlockId) -> ClientResult>; + async fn get_blobs(&self, block_id: &BlockId) -> ClientResult>>; + fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult; +} + impl BeaconClient { pub fn try_with_client(client: Client, config: Config) -> ClientResult { let base_url = Url::parse(&format!("{}/eth/", config.base_url)) @@ -36,8 +52,11 @@ impl BeaconClient { exp_backoff, }) } +} - pub async fn get_block(&self, block_id: &BlockId) -> ClientResult> { +#[async_trait] +impl CommonBeaconClient for BeaconClient { + async fn get_block(&self, block_id: &BlockId) -> ClientResult> { let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() }); let url = self.base_url.join(path.as_str())?; @@ -47,7 +66,7 @@ impl BeaconClient { }) } - pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult> { + async fn get_block_header(&self, block_id: &BlockId) -> ClientResult> { let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() }); let url = self.base_url.join(path.as_str())?; @@ -63,7 +82,7 @@ impl BeaconClient { }) } - pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult>> { + async fn get_blobs(&self, block_id: &BlockId) -> ClientResult>> { let path = format!("v1/beacon/blob_sidecars/{}", { block_id.to_detailed_string() }); @@ -75,7 +94,7 @@ impl BeaconClient { }) } - pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult { + fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult { let topics = topics .iter() .map(|topic| topic.into()) diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index a5fcea2..3b6710d 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -3,7 +3,7 @@ use std::{fmt, str::FromStr}; use ethers::types::{Bytes, H256}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Debug, Clone)] +#[derive(Serialize, Debug, Clone, PartialEq)] pub enum BlockId { Head, Finalized, @@ -16,7 +16,6 @@ pub enum BlockId { pub enum Topic { Head, FinalizedCheckpoint, - ChainReorg, } #[derive(Deserialize, Debug)] @@ -80,15 +79,6 @@ pub struct BlockHeaderMessage { pub slot: u32, } -#[derive(Deserialize, Debug)] -pub struct ChainReorgEventData { - pub old_head_block: H256, - #[serde(deserialize_with = "deserialize_number")] - pub slot: u32, - #[serde(deserialize_with = "deserialize_number")] - pub depth: u32, -} - #[derive(Deserialize, Debug)] pub struct HeadEventData { #[serde(deserialize_with = "deserialize_number")] @@ -161,7 +151,6 @@ impl FromStr for BlockId { impl From<&Topic> for String { fn from(value: &Topic) -> Self { match value { - Topic::ChainReorg => String::from("chain_reorg"), Topic::Head => String::from("head"), Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"), } diff --git a/src/clients/blobscan/mod.rs b/src/clients/blobscan/mod.rs index 8af32bb..edbc15a 100644 --- a/src/clients/blobscan/mod.rs +++ b/src/clients/blobscan/mod.rs @@ -1,7 +1,13 @@ +use std::fmt::Debug; + +use async_trait::async_trait; use backoff::ExponentialBackoff; use chrono::TimeDelta; use reqwest::{Client, Url}; +#[cfg(test)] +use mockall::automock; + use crate::{ clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult}, json_get, json_put, @@ -18,6 +24,24 @@ use self::{ mod jwt_manager; pub mod types; + +#[async_trait] +#[cfg_attr(test, automock)] +pub trait CommonBlobscanClient: Send + Sync + Debug { + fn try_with_client(client: Client, config: Config) -> ClientResult + where + Self: Sized; + async fn index( + &self, + block: Block, + transactions: Vec, + blobs: Vec, + ) -> ClientResult<()>; + async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult; + async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>; + async fn get_sync_state(&self) -> ClientResult>; +} + #[derive(Debug, Clone)] pub struct BlobscanClient { base_url: Url, @@ -32,8 +56,10 @@ pub struct Config { pub exp_backoff: Option, } -impl BlobscanClient { - pub fn try_with_client(client: Client, config: Config) -> ClientResult { +#[async_trait] + +impl CommonBlobscanClient for BlobscanClient { + fn try_with_client(client: Client, config: Config) -> ClientResult { let base_url = Url::parse(&format!("{}/", config.base_url))?; let jwt_manager = JWTManager::new(JWTManagerConfig { secret_key: config.secret_key, @@ -50,7 +76,7 @@ impl BlobscanClient { }) } - pub async fn index( + async fn index( &self, block: Block, transactions: Vec, @@ -67,7 +93,7 @@ impl BlobscanClient { json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult { + async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult { let url = self.base_url.join("indexer/reorged-slots")?; let token = self.jwt_manager.get_token()?; let req = ReorgedSlotsRequest { @@ -78,7 +104,7 @@ impl BlobscanClient { .map(|res: Option| res.unwrap().total_updated_slots) } - pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> { + async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> { let url = self.base_url.join("blockchain-sync-state")?; let token = self.jwt_manager.get_token()?; let req: BlockchainSyncStateRequest = sync_state.into(); @@ -86,7 +112,7 @@ impl BlobscanClient { json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn get_sync_state(&self) -> ClientResult> { + async fn get_sync_state(&self) -> ClientResult> { let url = self.base_url.join("blockchain-sync-state")?; json_get!( &self.client, diff --git a/src/clients/blobscan/types.rs b/src/clients/blobscan/types.rs index 95ce669..4ae7157 100644 --- a/src/clients/blobscan/types.rs +++ b/src/clients/blobscan/types.rs @@ -71,7 +71,7 @@ pub struct BlockchainSyncStateResponse { pub last_upper_synced_slot: Option, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct BlockchainSyncState { pub last_finalized_block: Option, pub last_lower_synced_slot: Option, diff --git a/src/context.rs b/src/context.rs index 8bcfd7e..9cdedbd 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,22 +1,30 @@ -use std::{sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use anyhow::Result as AnyhowResult; use backoff::ExponentialBackoffBuilder; -use ethers::prelude::*; +use dyn_clone::DynClone; +use ethers::providers::{Http as HttpProvider, MockProvider, Provider}; use crate::{ - clients::beacon::{BeaconClient, Config as BeaconClientConfig}, - clients::blobscan::{BlobscanClient, Config as BlobscanClientConfig}, + clients::{ + beacon::{BeaconClient, CommonBeaconClient, Config as BeaconClientConfig}, + blobscan::{BlobscanClient, CommonBlobscanClient, Config as BlobscanClientConfig}, + }, env::Environment, }; -#[derive(Debug, Clone)] -struct ContextRef { - pub beacon_client: BeaconClient, - pub blobscan_client: BlobscanClient, - pub provider: Provider, +#[cfg(test)] +use crate::clients::{beacon::MockCommonBeaconClient, blobscan::MockCommonBlobscanClient}; + +pub trait CommonContext: Send + Sync + Debug + DynClone { + fn beacon_client(&self) -> &dyn CommonBeaconClient; + fn blobscan_client(&self) -> &dyn CommonBlobscanClient; + fn provider(&self) -> &Provider; } +dyn_clone::clone_trait_object!(CommonContext); +dyn_clone::clone_trait_object!(CommonContext); + pub struct Config { pub blobscan_api_endpoint: String, pub beacon_node_url: String, @@ -24,12 +32,19 @@ pub struct Config { pub secret_key: String, } +#[derive(Debug)] +struct ContextRef { + pub beacon_client: Box, + pub blobscan_client: Box, + pub provider: Provider, +} + #[derive(Debug, Clone)] -pub struct Context { - inner: Arc, +pub struct Context { + inner: Arc>, } -impl Context { +impl Context { pub fn try_new(config: Config) -> AnyhowResult { let Config { blobscan_api_endpoint, @@ -45,35 +60,37 @@ impl Context { Ok(Self { inner: Arc::new(ContextRef { - blobscan_client: BlobscanClient::try_with_client( + blobscan_client: Box::new(BlobscanClient::try_with_client( client.clone(), BlobscanClientConfig { base_url: blobscan_api_endpoint, secret_key, exp_backoff: exp_backoff.clone(), }, - )?, - beacon_client: BeaconClient::try_with_client( + )?), + beacon_client: Box::new(BeaconClient::try_with_client( client, BeaconClientConfig { base_url: beacon_node_url, exp_backoff, }, - )?, - provider: Provider::::try_from(execution_node_endpoint)?, + )?), + provider: Provider::::try_from(execution_node_endpoint)?, }), }) } +} - pub fn beacon_client(&self) -> &BeaconClient { - &self.inner.beacon_client +impl CommonContext for Context { + fn beacon_client(&self) -> &dyn CommonBeaconClient { + self.inner.beacon_client.as_ref() } - pub fn blobscan_client(&self) -> &BlobscanClient { - &self.inner.blobscan_client + fn blobscan_client(&self) -> &dyn CommonBlobscanClient { + self.inner.blobscan_client.as_ref() } - pub fn provider(&self) -> &Provider { + fn provider(&self) -> &Provider { &self.inner.provider } } @@ -88,3 +105,37 @@ impl From<&Environment> for Config { } } } + +#[cfg(test)] +impl Context { + pub fn new( + beacon_client: Option, + blobscan_client: Option, + provider: Option>, + ) -> Box { + Box::new(Self { + inner: Arc::new(ContextRef { + beacon_client: Box::new(beacon_client.unwrap_or(MockCommonBeaconClient::new())), + blobscan_client: Box::new( + blobscan_client.unwrap_or(MockCommonBlobscanClient::new()), + ), + provider: provider.unwrap_or(Provider::mocked().0), + }), + }) + } +} + +#[cfg(test)] +impl CommonContext for Context { + fn beacon_client(&self) -> &dyn CommonBeaconClient { + self.inner.beacon_client.as_ref() + } + + fn blobscan_client(&self) -> &dyn CommonBlobscanClient { + self.inner.blobscan_client.as_ref() + } + + fn provider(&self) -> &Provider { + &self.inner.provider + } +} diff --git a/src/indexer/error.rs b/src/indexer/error.rs index 143e3cd..b707e59 100644 --- a/src/indexer/error.rs +++ b/src/indexer/error.rs @@ -2,82 +2,69 @@ use tokio::sync::mpsc::error::SendError; use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError}; -use super::types::IndexerTaskMessage; +use super::{ + event_handlers::{ + finalized_checkpoint::FinalizedCheckpointEventHandlerError, head::HeadEventHandlerError, + }, + types::IndexerTaskMessage, +}; #[derive(Debug, thiserror::Error)] pub enum IndexerError { #[error("failed to create indexer")] CreationFailure(#[source] anyhow::Error), #[error(transparent)] - SyncingTaskError(#[from] SyncingTaskError), + SyncingTaskError(#[from] IndexingError), #[error("failed to retrieve blobscan's sync state")] BlobscanSyncStateRetrievalError(#[source] ClientError), - #[error("sync task message send failure")] + #[error("failed to send syncing task message")] SyncingTaskMessageSendFailure(#[from] SendError), } #[derive(Debug, thiserror::Error)] -pub enum SyncingTaskError { - #[error("an error ocurred while syncing historical data")] - HistoricalSyncingTaskError(#[from] HistoricalSyncingError), - #[error("an error occurred while syncing realtime data")] - RealtimeSyncingTaskError(#[from] RealtimeSyncingError), +pub enum IndexingError { + #[error(transparent)] + HistoricalIndexingFailure(#[from] HistoricalIndexingError), + #[error(transparent)] + LiveIndexingError(#[from] LiveIndexingError), } #[derive(Debug, thiserror::Error)] -pub enum HistoricalSyncingError { +pub enum HistoricalIndexingError { #[error(transparent)] SynchronizerError(#[from] SynchronizerError), } #[derive(Debug, thiserror::Error)] -pub enum RealtimeSyncingError { +pub enum LiveIndexingError { #[error("an error ocurred while receiving beacon events")] BeaconEventsConnectionFailure(#[from] reqwest_eventsource::Error), #[error("failed to subscribe to beacon events")] BeaconEventsSubscriptionError(#[source] ClientError), #[error("unexpected event \"{0}\" received")] UnexpectedBeaconEvent(String), - #[error(transparent)] - BeaconEventProcessingError(#[from] BeaconEventError), -} - -#[derive(Debug, thiserror::Error)] -pub enum BeaconEventError { - #[error("failed to handle \"chain_reorged\" event")] - ChainReorged(#[from] ChainReorgedEventHandlingError), - #[error("failed to handle \"head\" event")] - HeadBlock(#[from] HeadBlockEventHandlingError), - #[error("failed to handle \"finalized_checkpoint\" event")] - FinalizedCheckpoint(#[from] FinalizedBlockEventHandlingError), + #[error("failed to handle beacon event")] + BeaconEventHandlingError(#[from] EventHandlerError), } #[derive(Debug, thiserror::Error)] -pub enum FinalizedBlockEventHandlingError { +pub enum EventHandlerError { #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error("failed to retrieve finalized block {0}")] - BlockRetrievalError(String, #[source] ClientError), + HeadEventHandlerError(#[from] HeadEventHandlerError), #[error(transparent)] - Other(#[from] anyhow::Error), - #[error("failed to update blobscan's last finalized block")] - BlobscanSyncStateUpdateError(#[source] ClientError), + FinalizedCheckpointHandlerError(#[from] FinalizedCheckpointEventHandlerError), } -#[derive(Debug, thiserror::Error)] -pub enum ChainReorgedEventHandlingError { - #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error("failed to retrieve reorged block {0}")] - BlockRetrievalError(String, #[source] ClientError), - #[error("failed to handle reorged of depth {0} starting at block {1}")] - ReorgedHandlingFailure(u32, String, #[source] ClientError), +impl From for LiveIndexingError { + fn from(err: HeadEventHandlerError) -> Self { + LiveIndexingError::BeaconEventHandlingError(EventHandlerError::HeadEventHandlerError(err)) + } } -#[derive(Debug, thiserror::Error)] -pub enum HeadBlockEventHandlingError { - #[error(transparent)] - EventDeserializationFailure(#[from] serde_json::Error), - #[error(transparent)] - SynchronizerError(#[from] SynchronizerError), +impl From for LiveIndexingError { + fn from(err: FinalizedCheckpointEventHandlerError) -> Self { + LiveIndexingError::BeaconEventHandlingError( + EventHandlerError::FinalizedCheckpointHandlerError(err), + ) + } } diff --git a/src/indexer/event_handlers/finalized_checkpoint.rs b/src/indexer/event_handlers/finalized_checkpoint.rs new file mode 100644 index 0000000..0d70f50 --- /dev/null +++ b/src/indexer/event_handlers/finalized_checkpoint.rs @@ -0,0 +1,86 @@ +use ethers::providers::Http as HttpProvider; +use tracing::info; + +use crate::{ + clients::{ + beacon::types::{BlockId, FinalizedCheckpointEventData}, + blobscan::types::BlockchainSyncState, + common::ClientError, + }, + context::CommonContext, + utils::web3::get_full_hash, +}; + +#[derive(Debug, thiserror::Error)] +pub enum FinalizedCheckpointEventHandlerError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve block {0}")] + BlockRetrievalError(String, #[source] ClientError), + #[error("block \"{0}\" not found")] + BlockNotFound(String), + #[error("failed to update last finalized block")] + BlobscanFinalizedBlockUpdateFailure(#[source] ClientError), +} + +pub struct FinalizedCheckpointHandler { + context: Box>, +} + +impl FinalizedCheckpointHandler { + pub fn new(context: Box>) -> Self { + FinalizedCheckpointHandler { context } + } + + pub async fn handle( + &self, + event_data: String, + ) -> Result<(), FinalizedCheckpointEventHandlerError> { + let finalized_checkpoint_data = + serde_json::from_str::(&event_data)?; + let block_hash = finalized_checkpoint_data.block; + let full_block_hash = get_full_hash(&block_hash); + let last_finalized_block_number = match self + .context + .beacon_client() + .get_block(&BlockId::Hash(block_hash)) + .await + .map_err(|err| { + FinalizedCheckpointEventHandlerError::BlockRetrievalError( + full_block_hash.clone(), + err, + ) + })? { + Some(block) => match block.message.body.execution_payload { + Some(execution_payload) => execution_payload.block_number, + None => { + return Err(FinalizedCheckpointEventHandlerError::BlockNotFound( + full_block_hash, + )) + } + }, + None => { + return Err(FinalizedCheckpointEventHandlerError::BlockNotFound( + full_block_hash, + )) + } + }; + + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_lower_synced_slot: None, + last_upper_synced_slot: None, + last_finalized_block: Some(last_finalized_block_number), + }) + .await + .map_err(FinalizedCheckpointEventHandlerError::BlobscanFinalizedBlockUpdateFailure)?; + + info!( + finalized_execution_block = last_finalized_block_number, + "Finalized checkpoint event received. Updated last finalized block number" + ); + + Ok(()) + } +} diff --git a/src/indexer/event_handlers/head.rs b/src/indexer/event_handlers/head.rs new file mode 100644 index 0000000..d2ff23e --- /dev/null +++ b/src/indexer/event_handlers/head.rs @@ -0,0 +1,707 @@ +use std::cmp; + +use ethers::providers::JsonRpcClient; +use ethers::types::H256; +use tracing::info; + +use crate::{ + clients::{ + beacon::types::{BlockHeader, BlockId, HeadEventData}, + blobscan::types::BlockchainSyncState, + common::ClientError, + }, + context::CommonContext, + synchronizer::{error::SynchronizerError, CommonSynchronizer}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum HeadEventHandlerError { + #[error(transparent)] + EventDeserializationFailure(#[from] serde_json::Error), + #[error("failed to retrieve header for block \"{0}\"")] + BlockHeaderRetrievalError(BlockId, #[source] ClientError), + #[error("header for block \"{0}\" not found")] + BlockHeaderNotFound(BlockId), + #[error("failed to index head block")] + BlockSyncedError(#[from] SynchronizerError), + #[error("failed to handle reorged slots")] + BlobscanReorgedSlotsFailure(#[source] ClientError), + #[error("failed to update blobscan's sync state")] + BlobscanSyncStateUpdateError(#[source] ClientError), +} + +pub struct HeadEventHandler { + context: Box>, + synchronizer: Box, + start_block_id: BlockId, + last_block_hash: Option, +} + +impl HeadEventHandler +where + T: JsonRpcClient + Send + Sync + 'static, +{ + pub fn new( + context: Box>, + synchronizer: Box, + start_block_id: BlockId, + ) -> Self { + HeadEventHandler { + context, + synchronizer, + start_block_id, + last_block_hash: None, + } + } + + pub async fn handle(&mut self, event_data: String) -> Result<(), HeadEventHandlerError> { + let head_block_data = serde_json::from_str::(&event_data)?; + + let head_block_slot = head_block_data.slot; + let head_block_hash = head_block_data.block; + + let head_block_id = BlockId::Slot(head_block_data.slot); + let initial_block_id = if self.last_block_hash.is_none() { + self.start_block_id.clone() + } else { + head_block_id.clone() + }; + + let head_block_header = self.get_block_header(&head_block_id).await?.header; + + if let Some(last_block_hash) = self.last_block_hash { + if last_block_hash != head_block_header.message.parent_root { + let parent_block_header = self + .get_block_header(&BlockId::Hash(head_block_header.message.parent_root)) + .await? + .header; + let parent_block_slot = parent_block_header.message.slot; + let reorg_start_slot = parent_block_slot + 1; + let reorg_final_slot = head_block_slot; + let reorged_slots = (reorg_start_slot..reorg_final_slot).collect::>(); + + let result: Result<(), HeadEventHandlerError> = async { + let total_updated_slots = self.context + .blobscan_client() + .handle_reorged_slots(reorged_slots.as_slice()) + .await + .map_err(HeadEventHandlerError::BlobscanReorgedSlotsFailure)?; + + + info!(slot=head_block_slot, "Reorganization detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); + + // Re-index parent block as it may be mark as reorged and not indexed + self.synchronizer + .run( + &BlockId::Slot(parent_block_slot), + &BlockId::Slot(parent_block_slot + 1), + ) + .await?; + + Ok(()) + } + .await; + + if let Err(err) = result { + // If an error occurred while handling the reorg try to update the latest synced slot to the last known slot before the reorg + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot: None, + last_upper_synced_slot: Some(cmp::max(parent_block_slot - 1, 0)), + }) + .await + .map_err(HeadEventHandlerError::BlobscanSyncStateUpdateError)?; + + return Err(err); + } + } + } + + self.synchronizer + .run(&initial_block_id, &BlockId::Slot(head_block_slot + 1)) + .await?; + + self.last_block_hash = Some(head_block_hash); + + Ok(()) + } + + async fn get_block_header( + &self, + block_id: &BlockId, + ) -> Result { + match self + .context + .beacon_client() + .get_block_header(block_id) + .await + .map_err(|err| { + HeadEventHandlerError::BlockHeaderRetrievalError(block_id.clone(), err) + })? { + Some(block) => Ok(block), + None => Err(HeadEventHandlerError::BlockHeaderNotFound(block_id.clone())), + } + } +} + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + use ethers::types::H256; + use mockall::predicate::eq; + + use super::HeadEventHandler; + use crate::{ + clients::{ + beacon::{ + types::{BlockHeader, BlockHeaderMessage, BlockId, InnerBlockHeader}, + MockCommonBeaconClient, + }, + blobscan::{types::BlockchainSyncState, MockCommonBlobscanClient}, + }, + context::Context, + synchronizer::MockCommonSynchronizer, + }; + + #[derive(Clone, Debug)] + struct BlockData { + slot: u32, + hash: H256, + parent_hash: Option, + } + + impl BlockData { + pub fn to_head_event(self) -> String { + format!( + r#"{{"slot": "{}", "block": "{}"}}"#, + self.slot, + format!("0x{:x}", self.hash) + ) + } + } + + #[tokio::test] + async fn test_handler_on_initial_event() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let block_data = Box::new(BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: None, + }); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &block_data, + Some(initial_start_block_id.clone()), + ); + + let mock_context = Context::new(Some(mock_beacon_client), None, None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler.handle(block_data.to_head_event()).await; + + assert!(result.is_ok()) + } + + #[tokio::test] + async fn test_handler_after_first_event() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let first_head_block = BlockData { + hash: _create_hash("5"), + slot: 5, + parent_hash: None, + }; + let second_head_block = BlockData { + hash: _create_hash("6"), + slot: 6, + parent_hash: Some(first_head_block.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &first_head_block, + Some(initial_start_block_id.clone()), + ); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &second_head_block, + None, + ); + + let mock_context = Context::new(Some(mock_beacon_client), None, None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(first_head_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handler to succeed" + ); + + let result = head_event_handler + .handle(second_head_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected second head event handler to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_reorg() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let before_reorg_block = BlockData { + slot: 2, + hash: _create_hash("2"), + parent_hash: Some(_create_hash("1")), + }; + let reorged_block = BlockData { + slot: 5, + hash: _create_hash("5"), + parent_hash: Some(_create_hash("4")), + }; + let after_reorg_block = BlockData { + slot: 6, + hash: _create_hash("3b"), + parent_hash: Some(before_reorg_block.hash), + }; + + _stub_get_block_header(&mut mock_beacon_client, &before_reorg_block); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &reorged_block, + Some(initial_start_block_id.clone()), + ); + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &after_reorg_block, + None, + ); + + _stub_handle_reorged_slots( + &mut mock_blobscan_client, + (before_reorg_block.slot + 1..after_reorg_block.slot).collect::>(), + ); + + // We're expecting the synchronizer to re-sync the parent block of the reorged block + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(before_reorg_block.slot), + BlockId::Slot(before_reorg_block.slot + 1), + ); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(after_reorg_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected reorged head event handling to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_one_depth_reorg() { + // Slots: + // 4 -> 5 + // 6 -> 7 -> ... + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let block_before_reorg = BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: None, + }; + let reorged_block = BlockData { + slot: 5, + hash: _create_hash("50"), + parent_hash: Some(block_before_reorg.hash), + }; + let block_after_reorg = BlockData { + slot: 6, + hash: _create_hash("5"), + parent_hash: Some(block_before_reorg.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &reorged_block, + Some(initial_start_block_id.clone()), + ); + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &block_after_reorg, + None, + ); + + _stub_get_block_header(&mut mock_beacon_client, &block_before_reorg); + + _stub_handle_reorged_slots(&mut mock_blobscan_client, vec![reorged_block.slot]); + + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(block_before_reorg.slot), + BlockId::Slot(block_before_reorg.slot + 1), + ); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(block_after_reorg.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected reorged head event handling to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_one_depth_former_reorg() { + // Reorged block is reorged back to its former parent + // Slots: + // 4 -> 5 -> 7 -> ... + // 6 + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let before_reorg_parent_block = BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: None, + }; + let before_reorg_block = BlockData { + slot: 5, + hash: _create_hash("50"), + parent_hash: Some(before_reorg_parent_block.hash), + }; + let reorged_block = BlockData { + slot: 6, + hash: _create_hash("5"), + parent_hash: Some(before_reorg_parent_block.hash), + }; + let after_reorg_block = BlockData { + slot: 7, + hash: _create_hash("7"), + parent_hash: Some(before_reorg_block.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &before_reorg_block, + Some(initial_start_block_id.clone()), + ); + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &reorged_block, + None, + ); + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &after_reorg_block, + None, + ); + + _stub_get_block_header(&mut mock_beacon_client, &before_reorg_parent_block); + + _stub_handle_reorged_slots(&mut mock_blobscan_client, vec![before_reorg_block.slot]); + + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(before_reorg_parent_block.slot), + BlockId::Slot(before_reorg_parent_block.slot + 1), + ); + + _stub_handle_reorged_slots(&mut mock_blobscan_client, vec![reorged_block.slot]); + + _stub_synchronizer_run( + &mut mock_synchronizer, + BlockId::Slot(before_reorg_block.slot), + BlockId::Slot(before_reorg_block.slot + 1), + ); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler + .handle(before_reorg_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected reorged head event handling to succeed" + ); + + let result = head_event_handler + .handle(after_reorg_block.to_head_event()) + .await; + + assert!( + result.is_ok(), + "Expected after reorged head event handling to succeed" + ); + } + + #[tokio::test] + async fn test_handler_on_reorg_with_error() { + let mut mock_synchronizer = Box::new(MockCommonSynchronizer::new()); + let mut mock_beacon_client = MockCommonBeaconClient::new(); + let mut mock_blobscan_client = MockCommonBlobscanClient::new(); + + let initial_start_block_id = BlockId::Slot(1); + + let before_reorg_parent_block = BlockData { + slot: 3, + hash: _create_hash("3"), + parent_hash: None, + }; + let before_reorg_block = BlockData { + slot: 4, + hash: _create_hash("4"), + parent_hash: Some(before_reorg_parent_block.hash), + }; + let first_block = BlockData { + slot: 5, + hash: _create_hash("5"), + parent_hash: Some(before_reorg_block.hash), + }; + let reorged_block = BlockData { + slot: 6, + hash: _create_hash("999"), + parent_hash: Some(before_reorg_block.hash), + }; + + _prepare_handler_calls( + &mut mock_beacon_client, + &mut mock_synchronizer, + &first_block, + Some(initial_start_block_id.clone()), + ); + + _stub_get_block_header(&mut mock_beacon_client, &reorged_block); + + _stub_get_block_header(&mut mock_beacon_client, &before_reorg_block); + + mock_blobscan_client + .expect_handle_reorged_slots() + .returning(|_x| { + Box::pin(async move { + Err(crate::clients::common::ClientError::Other(anyhow!( + "Internal blobscan client error" + ))) + }) + }); + + mock_blobscan_client + .expect_update_sync_state() + .times(1) + .with(eq(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot: None, + last_upper_synced_slot: Some(before_reorg_parent_block.slot), + })) + .returning(|_x| Box::pin(async move { Ok(()) })); + + let mock_context = Context::new(Some(mock_beacon_client), Some(mock_blobscan_client), None); + + let mut head_event_handler = + HeadEventHandler::new(mock_context, mock_synchronizer, initial_start_block_id); + + let result = head_event_handler.handle(first_block.to_head_event()).await; + + assert!( + result.is_ok(), + "Expected first head event handling to succeed" + ); + + let result = head_event_handler + .handle(reorged_block.to_head_event()) + .await; + + assert!( + result.is_err(), + "Expected reorged head event handling to fail" + ); + } + + fn _prepare_handler_calls( + mock_beacon_client: &mut MockCommonBeaconClient, + mock_synchronizer: &mut MockCommonSynchronizer, + head_block_data: &BlockData, + initial_block_id: Option, + ) { + let slot = head_block_data.slot; + + _stub_get_block_header(mock_beacon_client, head_block_data); + + _stub_synchronizer_run( + mock_synchronizer, + initial_block_id.unwrap_or(BlockId::Slot(slot)), + BlockId::Slot(slot + 1), + ) + } + + fn _stub_get_block_header( + mock_beacon_client: &mut MockCommonBeaconClient, + block_data: &BlockData, + ) { + let root = block_data.hash; + let slot = block_data.slot; + let parent_root = block_data + .parent_hash + .unwrap_or(_create_hash((slot - 1).to_string().as_str())); + + mock_beacon_client + .expect_get_block_header() + .with(eq(BlockId::Slot(block_data.slot))) + .returning(move |_x| { + Box::pin(async move { + Ok(Some(BlockHeader { + root, + header: InnerBlockHeader { + message: BlockHeaderMessage { parent_root, slot }, + }, + })) + }) + }); + mock_beacon_client + .expect_get_block_header() + .with(eq(BlockId::Hash(block_data.hash))) + .returning(move |_x| { + Box::pin(async move { + Ok(Some(BlockHeader { + root, + header: InnerBlockHeader { + message: BlockHeaderMessage { parent_root, slot }, + }, + })) + }) + }); + } + + fn _stub_handle_reorged_slots( + mock_blobscan_client: &mut MockCommonBlobscanClient, + reorged_slots: Vec, + ) { + let reorged_slots_len = reorged_slots.len() as u32; + + mock_blobscan_client + .expect_handle_reorged_slots() + .with(eq(reorged_slots)) + .returning(move |_x| Box::pin(async move { Ok(reorged_slots_len) })); + } + + fn _stub_synchronizer_run( + mock_synchronizer: &mut MockCommonSynchronizer, + initial_block_id: BlockId, + final_block_id: BlockId, + ) { + mock_synchronizer + .expect_run() + .times(1) + .with(eq(initial_block_id.clone()), eq(final_block_id)) + .returning(|_x, _y| Box::pin(async { Ok(()) })); + } + + fn _create_hash(input: &str) -> H256 { + // Ensure the input string is at most 64 characters + let truncated_input = if input.len() > 64 { + &input[0..64] + } else { + input + }; + + // Format the string to have a length of 64 characters by padding with zeros + let hash = format!("0x{:0>64}", truncated_input); + + hash.parse().unwrap() + } + + fn _create_head_event(slot: u32, block_hash: H256) -> String { + let head_event = format!( + r#"{{"slot": "{}", "block": "{}"}}"#, + slot, + format!("0x{:x}", block_hash) + ); + + head_event + } +} diff --git a/src/indexer/event_handlers/mod.rs b/src/indexer/event_handlers/mod.rs new file mode 100644 index 0000000..dea7c11 --- /dev/null +++ b/src/indexer/event_handlers/mod.rs @@ -0,0 +1,2 @@ +pub mod finalized_checkpoint; +pub mod head; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index ceca713..0236076 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,40 +1,33 @@ use std::thread; -use anyhow::{anyhow, Context as AnyhowContext}; - +use anyhow::anyhow; +use ethers::providers::Http as HttpProvider; +use event_handlers::{finalized_checkpoint::FinalizedCheckpointHandler, head::HeadEventHandler}; use futures::StreamExt; use reqwest_eventsource::Event; use tokio::{sync::mpsc, task::JoinHandle}; -use tracing::{debug, error, info, warn, Instrument}; +use tracing::{debug, error, info, Instrument}; use crate::{ args::Args, - clients::{ - beacon::types::{ - BlockId, ChainReorgEventData, FinalizedCheckpointEventData, HeadEventData, Topic, - }, - blobscan::types::BlockchainSyncState, - }, - context::{Config as ContextConfig, Context}, + clients::beacon::types::{BlockId, Topic}, + context::{CommonContext, Config as ContextConfig, Context}, env::Environment, - indexer::error::{ - ChainReorgedEventHandlingError, FinalizedBlockEventHandlingError, - HeadBlockEventHandlingError, HistoricalSyncingError, - }, - synchronizer::{CheckpointType, Synchronizer, SynchronizerBuilder}, - utils::web3::get_full_hash, + indexer::error::HistoricalIndexingError, + synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder}, }; use self::{ - error::{IndexerError, RealtimeSyncingError}, + error::{IndexerError, LiveIndexingError}, types::{IndexerResult, IndexerTaskMessage}, }; pub mod error; +pub mod event_handlers; pub mod types; -pub struct Indexer { - context: Context, +pub struct Indexer { + context: Box>, dencun_fork_slot: u32, disable_sync_historical: bool, @@ -43,7 +36,7 @@ pub struct Indexer { num_threads: u32, } -impl Indexer { +impl Indexer { pub fn try_new(env: &Environment, args: &Args) -> IndexerResult { let context = match Context::try_new(ContextConfig::from(env)) { Ok(c) => c, @@ -81,7 +74,7 @@ impl Indexer { .unwrap_or(env.network_name.dencun_fork_slot()); Ok(Self { - context, + context: Box::new(context), dencun_fork_slot, disable_sync_historical, checkpoint_slots, @@ -142,7 +135,7 @@ impl Indexer { let mut total_tasks = 0; if end_block_id.is_none() { - self._start_realtime_syncing_task(tx, current_upper_block_id); + self.start_live_indexing_task(tx, current_upper_block_id); total_tasks += 1; } @@ -152,7 +145,7 @@ impl Indexer { matches!(current_lower_block_id, BlockId::Slot(slot) if slot < self.dencun_fork_slot); if !self.disable_sync_historical && !historical_sync_completed { - self._start_historical_syncing_task(tx1, current_lower_block_id, end_block_id); + self.start_historical_indexing_task(tx1, current_lower_block_id, end_block_id); total_tasks += 1; } @@ -179,23 +172,23 @@ impl Indexer { Ok(()) } - fn _start_historical_syncing_task( + fn start_historical_indexing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, end_block_id: BlockId, ) -> JoinHandle> { - let mut synchronizer = self._create_synchronizer(CheckpointType::Lower); + let synchronizer = self.create_synchronizer(CheckpointType::Lower); tokio::spawn(async move { - let historical_syc_thread_span = tracing::info_span!("sync:historical"); + let historical_syc_thread_span = tracing::info_span!("indexer:historical"); let result: Result<(), IndexerError> = async move { let result = synchronizer.run(&start_block_id, &end_block_id).await; if let Err(error) = result { tx.send(IndexerTaskMessage::Error( - HistoricalSyncingError::SynchronizerError(error).into(), + HistoricalIndexingError::SynchronizerError(error).into(), )) .await?; } else { @@ -215,34 +208,33 @@ impl Indexer { }) } - fn _start_realtime_syncing_task( + fn start_live_indexing_task( &self, tx: mpsc::Sender, start_block_id: BlockId, ) -> JoinHandle> { let task_context = self.context.clone(); - let mut synchronizer = self._create_synchronizer(CheckpointType::Upper); + let synchronizer = self.create_synchronizer(CheckpointType::Upper); tokio::spawn(async move { - let realtime_sync_task_span = tracing::info_span!("sync:realtime"); - - let result: Result<(), RealtimeSyncingError> = async { - let beacon_client = task_context.beacon_client(); - let blobscan_client = task_context.blobscan_client(); - let topics = vec![ - Topic::ChainReorg, - Topic::Head, - Topic::FinalizedCheckpoint, - ]; + let realtime_sync_task_span = tracing::info_span!("indexer:live"); + + let result: Result<(), LiveIndexingError> = async { + let topics = vec![Topic::Head, Topic::FinalizedCheckpoint]; let mut event_source = task_context .beacon_client() - .subscribe_to_events(&topics).map_err(RealtimeSyncingError::BeaconEventsSubscriptionError)?; - let mut is_initial_sync_to_head = true; + .subscribe_to_events(&topics) + .map_err(LiveIndexingError::BeaconEventsSubscriptionError)?; let events = topics - .iter() - .map(|topic| topic.into()) - .collect::>() - .join(", "); + .iter() + .map(|topic| topic.into()) + .collect::>() + .join(", "); + + let mut head_event_handler = + HeadEventHandler::new(task_context.clone(), synchronizer, start_block_id); + let finalized_checkpoint_event_handler = + FinalizedCheckpointHandler::new(task_context); info!("Subscribed to beacon events: {events}"); @@ -255,125 +247,25 @@ impl Indexer { let event_name = event.event.as_str(); match event_name { - "chain_reorg" => { - let chain_reorg_span = tracing::info_span!("chain_reorg"); - - let result: Result<(), ChainReorgedEventHandlingError> = async { - let reorg_block_data = - serde_json::from_str::(&event.data)?; - let slot = reorg_block_data.slot; - let old_head_block = reorg_block_data.old_head_block; - let target_depth = reorg_block_data.depth; - - let mut current_reorged_block = old_head_block; - let mut reorged_slots: Vec = vec![]; - - for current_depth in 1..=target_depth { - let reorged_block_head = match beacon_client.get_block_header(&BlockId::Hash(current_reorged_block)).await.map_err(|err| ChainReorgedEventHandlingError::BlockRetrievalError(get_full_hash(¤t_reorged_block), err))? { - Some(block) => block, - None => { - warn!(event=event_name, slot=slot, "Found {current_depth} out of {target_depth} reorged blocks only"); - break - } - }; - - reorged_slots.push(reorged_block_head.header.message.slot); - current_reorged_block = reorged_block_head.header.message.parent_root; - } - - let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await.map_err(|err| ChainReorgedEventHandlingError::ReorgedHandlingFailure(target_depth, get_full_hash(&old_head_block), err))?; - - info!(event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); - - Ok(()) - }.instrument(chain_reorg_span).await; - - if let Err(error) = result { - // If an error occurred while processing the event, try to update the latest synced slot to the last known slot before the reorg - if let Ok(reorg_block_data) = serde_json::from_str::(&event.data) { - let slot = reorg_block_data.slot; - - let _ = blobscan_client.update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot: None, - last_upper_synced_slot: Some(slot -1) - }).await; - } - - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - }, - "head" => { - let head_span = tracing::info_span!("head_block"); - - let result: Result<(), HeadBlockEventHandlingError> = async { - let head_block_data = - serde_json::from_str::(&event.data)?; - - - let head_block_id = &BlockId::Slot(head_block_data.slot); - let initial_block_id = if is_initial_sync_to_head { - is_initial_sync_to_head = false; - - &start_block_id - } else { - head_block_id - }; - - synchronizer.run(initial_block_id, &BlockId::Slot(head_block_data.slot + 1)).await?; - - Ok(()) - }.instrument(head_span).await; - - if let Err(error) = result { - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - }, + "head" => { + head_event_handler + .handle(event.data) + .instrument(tracing::info_span!("head_block")) + .await?; + } "finalized_checkpoint" => { - let finalized_checkpoint_span = tracing::info_span!("finalized_checkpoint"); - - let result: Result<(), FinalizedBlockEventHandlingError> = async move { - let finalized_checkpoint_data = - serde_json::from_str::( - &event.data, - )?; - let block_hash = finalized_checkpoint_data.block; - let last_finalized_block_number = beacon_client - .get_block(&BlockId::Hash(block_hash)) - .await.map_err(|err| FinalizedBlockEventHandlingError::BlockRetrievalError(get_full_hash(&block_hash), err))? - .with_context(|| { - anyhow!("Finalized block not found") - })? - .message.body.execution_payload - .with_context(|| { - anyhow!("Finalized block has no execution payload") - })?.block_number; - - blobscan_client - .update_sync_state(BlockchainSyncState { - last_lower_synced_slot: None, - last_upper_synced_slot: None, - last_finalized_block: Some( - last_finalized_block_number - ), - }) - .await.map_err(FinalizedBlockEventHandlingError::BlobscanSyncStateUpdateError)?; - - info!(finalized_execution_block=last_finalized_block_number, "Finalized checkpoint event received. Updated last finalized block number"); - - Ok(()) - }.instrument(finalized_checkpoint_span).await; - - if let Err(error) = result { - return Err(RealtimeSyncingError::BeaconEventProcessingError(error.into())); - } - - }, + finalized_checkpoint_event_handler + .handle(event.data) + .instrument(tracing::info_span!("finalized_checkpoint")) + .await?; + } unexpected_event_id => { - return Err(RealtimeSyncingError::UnexpectedBeaconEvent(unexpected_event_id.to_string())); - }, + return Err(LiveIndexingError::UnexpectedBeaconEvent( + unexpected_event_id.to_string(), + )); + } } - }, + } Err(error) => { event_source.close(); @@ -397,7 +289,7 @@ impl Indexer { }) } - fn _create_synchronizer(&self, checkpoint_type: CheckpointType) -> Synchronizer { + fn create_synchronizer(&self, checkpoint_type: CheckpointType) -> Box { let mut synchronizer_builder = SynchronizerBuilder::new(); if let Some(checkpoint_slots) = self.checkpoint_slots { @@ -410,6 +302,6 @@ impl Indexer { synchronizer_builder.with_num_threads(self.num_threads); - synchronizer_builder.build(self.context.clone()) + Box::new(synchronizer_builder.build(self.context.clone())) } } diff --git a/src/indexer/types.rs b/src/indexer/types.rs index bde0077..36189b1 100644 --- a/src/indexer/types.rs +++ b/src/indexer/types.rs @@ -1,8 +1,8 @@ -use super::error::{IndexerError, SyncingTaskError}; +use super::error::{IndexerError, IndexingError}; pub type IndexerResult = Result; pub enum IndexerTaskMessage { Done, - Error(SyncingTaskError), + Error(IndexingError), } diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index 9e1e0af..259ab99 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Context as AnyhowContext, Result}; -use ethers::prelude::*; +use ethers::providers::{Http as HttpProvider, Middleware}; use tracing::{debug, info}; use crate::{ @@ -8,7 +8,7 @@ use crate::{ beacon::types::BlockId, blobscan::types::{Blob, Block, Transaction}, }, - context::Context, + context::CommonContext, }; use self::error::{SlotProcessingError, SlotsProcessorError}; @@ -17,12 +17,12 @@ use self::helpers::{create_tx_hash_versioned_hashes_mapping, create_versioned_ha pub mod error; mod helpers; -pub struct SlotsProcessor { - context: Context, +pub struct SlotsProcessor { + context: Box>, } -impl SlotsProcessor { - pub fn new(context: Context) -> SlotsProcessor { +impl SlotsProcessor { + pub fn new(context: Box>) -> SlotsProcessor { Self { context } } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 8487d5f..9f4f660 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -1,11 +1,18 @@ +use std::fmt::Debug; + use anyhow::anyhow; +use async_trait::async_trait; +use ethers::providers::Http as HttpProvider; use futures::future::join_all; use tokio::task::JoinHandle; use tracing::{debug, info, Instrument}; +#[cfg(test)] +use mockall::automock; + use crate::{ clients::{beacon::types::BlockId, blobscan::types::BlockchainSyncState, common::ClientError}, - context::Context, + context::CommonContext, slots_processor::{error::SlotsProcessorError, SlotsProcessor}, }; @@ -13,6 +20,16 @@ use self::error::{SlotsChunksErrors, SynchronizerError}; pub mod error; +#[async_trait] +#[cfg_attr(test, automock)] +pub trait CommonSynchronizer: Send + Sync + Debug { + async fn run( + &self, + initial_block_id: &BlockId, + final_block_id: &BlockId, + ) -> Result<(), SynchronizerError>; +} + #[derive(Debug)] pub struct SynchronizerBuilder { num_threads: u32, @@ -22,8 +39,8 @@ pub struct SynchronizerBuilder { } #[derive(Debug)] -pub struct Synchronizer { - context: Context, +pub struct Synchronizer { + context: Box>, num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, @@ -70,7 +87,10 @@ impl SynchronizerBuilder { self } - pub fn build(&self, context: Context) -> Synchronizer { + pub fn build( + &self, + context: Box>, + ) -> Synchronizer { Synchronizer { context, num_threads: self.num_threads, @@ -81,34 +101,8 @@ impl SynchronizerBuilder { } } -impl Synchronizer { - pub async fn run( - &mut self, - initial_block_id: &BlockId, - final_block_id: &BlockId, - ) -> Result<(), SynchronizerError> { - let initial_slot = self._resolve_to_slot(initial_block_id).await?; - let mut final_slot = self._resolve_to_slot(final_block_id).await?; - - if initial_slot == final_slot { - return Ok(()); - } - - loop { - self._sync_slots_by_checkpoints(initial_slot, final_slot) - .await?; - - let latest_final_slot = self._resolve_to_slot(final_block_id).await?; - - if final_slot == latest_final_slot { - return Ok(()); - } - - final_slot = latest_final_slot; - } - } - - async fn _sync_slots(&mut self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { +impl Synchronizer { + async fn sync_slots(&self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { let is_reverse_sync = to_slot < from_slot; let unprocessed_slots = to_slot.abs_diff(from_slot); let min_slots_per_thread = std::cmp::min(unprocessed_slots, self.min_slots_per_thread); @@ -190,8 +184,8 @@ impl Synchronizer { } } - async fn _sync_slots_by_checkpoints( - &mut self, + async fn sync_slots_by_checkpoints( + &self, initial_slot: u32, final_slot: u32, ) -> Result<(), SynchronizerError> { @@ -222,7 +216,7 @@ impl Synchronizer { checkpoint_final_slot = final_chunk_slot ); - self._sync_slots(initial_chunk_slot, final_chunk_slot) + self.sync_slots(initial_chunk_slot, final_chunk_slot) .instrument(sync_slots_chunk_span) .await?; @@ -293,7 +287,7 @@ impl Synchronizer { Ok(()) } - async fn _resolve_to_slot(&self, block_id: &BlockId) -> Result { + async fn resolve_to_slot(&self, block_id: &BlockId) -> Result { let beacon_client = self.context.beacon_client(); let resolved_block_id: Result = match block_id { @@ -318,3 +312,32 @@ impl Synchronizer { } } } + +#[async_trait] +impl CommonSynchronizer for Synchronizer { + async fn run( + &self, + initial_block_id: &BlockId, + final_block_id: &BlockId, + ) -> Result<(), SynchronizerError> { + let initial_slot = self.resolve_to_slot(initial_block_id).await?; + let mut final_slot = self.resolve_to_slot(final_block_id).await?; + + if initial_slot == final_slot { + return Ok(()); + } + + loop { + self.sync_slots_by_checkpoints(initial_slot, final_slot) + .await?; + + let latest_final_slot = self.resolve_to_slot(final_block_id).await?; + + if final_slot == latest_final_slot { + return Ok(()); + } + + final_slot = latest_final_slot; + } + } +}