Skip to content

Commit

Permalink
Merge pull request #72 from Blobscan/refactor/reorgs-handling
Browse files Browse the repository at this point in the history
refactor: refactor chain reorgs handling
  • Loading branch information
PJColombo authored Jun 21, 2024
2 parents 5e51c75 + 36577be commit 2646828
Show file tree
Hide file tree
Showing 16 changed files with 1,163 additions and 298 deletions.
83 changes: 81 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.66"
async-trait = "0.1.80"
dyn-clone = "1.0.17"
dotenv = "0.15.0"
envy = "0.4.2"
ethers = "1.0.2"
Expand Down Expand Up @@ -34,3 +35,6 @@ anyhow = { version = "1.0.70", features = ["backtrace"] }
thiserror = "1.0.40"
sentry = { version = "0.31.2", features = ["debug-images"] }
sentry-tracing = "0.31.2"

[dev-dependencies]
mockall = "0.12.1"
Empty file removed src/clients/beacon/errors.rs
Empty file.
27 changes: 23 additions & 4 deletions src/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use std::fmt::Debug;

use anyhow::Context as AnyhowContext;
use async_trait::async_trait;
use backoff::ExponentialBackoff;

use reqwest::{Client, Url};
use reqwest_eventsource::EventSource;

#[cfg(test)]
use mockall::automock;

use crate::{
clients::{beacon::types::BlockHeaderResponse, common::ClientResult},
json_get,
Expand All @@ -24,6 +31,15 @@ pub struct Config {
pub exp_backoff: Option<ExponentialBackoff>,
}

#[async_trait]
#[cfg_attr(test, automock)]
pub trait CommonBeaconClient: Send + Sync + Debug {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>>;
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>>;
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>>;
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource>;
}

impl BeaconClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/eth/", config.base_url))
Expand All @@ -36,8 +52,11 @@ impl BeaconClient {
exp_backoff,
})
}
}

pub async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
#[async_trait]
impl CommonBeaconClient for BeaconClient {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -47,7 +66,7 @@ impl BeaconClient {
})
}

pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -63,7 +82,7 @@ impl BeaconClient {
})
}

pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{}", {
block_id.to_detailed_string()
});
Expand All @@ -75,7 +94,7 @@ impl BeaconClient {
})
}

pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
let topics = topics
.iter()
.map(|topic| topic.into())
Expand Down
13 changes: 1 addition & 12 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt, str::FromStr};
use ethers::types::{Bytes, H256};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Debug, Clone)]
#[derive(Serialize, Debug, Clone, PartialEq)]
pub enum BlockId {
Head,
Finalized,
Expand All @@ -16,7 +16,6 @@ pub enum BlockId {
pub enum Topic {
Head,
FinalizedCheckpoint,
ChainReorg,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -80,15 +79,6 @@ pub struct BlockHeaderMessage {
pub slot: u32,
}

#[derive(Deserialize, Debug)]
pub struct ChainReorgEventData {
pub old_head_block: H256,
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
#[serde(deserialize_with = "deserialize_number")]
pub depth: u32,
}

#[derive(Deserialize, Debug)]
pub struct HeadEventData {
#[serde(deserialize_with = "deserialize_number")]
Expand Down Expand Up @@ -161,7 +151,6 @@ impl FromStr for BlockId {
impl From<&Topic> for String {
fn from(value: &Topic) -> Self {
match value {
Topic::ChainReorg => String::from("chain_reorg"),
Topic::Head => String::from("head"),
Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"),
}
Expand Down
38 changes: 32 additions & 6 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::fmt::Debug;

use async_trait::async_trait;
use backoff::ExponentialBackoff;
use chrono::TimeDelta;
use reqwest::{Client, Url};

#[cfg(test)]
use mockall::automock;

use crate::{
clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult},
json_get, json_put,
Expand All @@ -18,6 +24,24 @@ use self::{
mod jwt_manager;

pub mod types;

#[async_trait]
#[cfg_attr(test, automock)]
pub trait CommonBlobscanClient: Send + Sync + Debug {
fn try_with_client(client: Client, config: Config) -> ClientResult<Self>
where
Self: Sized;
async fn index(
&self,
block: Block,
transactions: Vec<Transaction>,
blobs: Vec<Blob>,
) -> ClientResult<()>;
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32>;
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
}

#[derive(Debug, Clone)]
pub struct BlobscanClient {
base_url: Url,
Expand All @@ -32,8 +56,10 @@ pub struct Config {
pub exp_backoff: Option<ExponentialBackoff>,
}

impl BlobscanClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
#[async_trait]

impl CommonBlobscanClient for BlobscanClient {
fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/", config.base_url))?;
let jwt_manager = JWTManager::new(JWTManagerConfig {
secret_key: config.secret_key,
Expand All @@ -50,7 +76,7 @@ impl BlobscanClient {
})
}

pub async fn index(
async fn index(
&self,
block: Block,
transactions: Vec<Transaction>,
Expand All @@ -67,7 +93,7 @@ impl BlobscanClient {
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
let url = self.base_url.join("indexer/reorged-slots")?;
let token = self.jwt_manager.get_token()?;
let req = ReorgedSlotsRequest {
Expand All @@ -78,15 +104,15 @@ impl BlobscanClient {
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
}

pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
let url = self.base_url.join("blockchain-sync-state")?;
let token = self.jwt_manager.get_token()?;
let req: BlockchainSyncStateRequest = sync_state.into();

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
let url = self.base_url.join("blockchain-sync-state")?;
json_get!(
&self.client,
Expand Down
2 changes: 1 addition & 1 deletion src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct BlockchainSyncStateResponse {
pub last_upper_synced_slot: Option<u32>,
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct BlockchainSyncState {
pub last_finalized_block: Option<u32>,
pub last_lower_synced_slot: Option<u32>,
Expand Down
Loading

0 comments on commit 2646828

Please sign in to comment.