Skip to content

Commit d9b56b7

Browse files
committed
fix: improve handling of malformmated blobs :)
1 parent dae6ca6 commit d9b56b7

File tree

5 files changed

+132
-103
lines changed

5 files changed

+132
-103
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["crates/*"]
33
resolver = "2"
44

55
[workspace.package]
6-
version = "0.10.3"
6+
version = "0.10.4"
77
edition = "2024"
88
rust-version = "1.88"
99
authors = ["init4"]

crates/blobber/src/cache.rs

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{BlobFetcherError, Blobs, FetchResult};
1+
use crate::{BlobberError, BlobberResult, Blobs, FetchResult};
22
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
33
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
44
use alloy::eips::merge::EPOCH_SLOTS;
@@ -13,7 +13,7 @@ use std::{
1313
time::Duration,
1414
};
1515
use tokio::sync::{mpsc, oneshot};
16-
use tracing::{Instrument, debug, error, info, instrument};
16+
use tracing::{Instrument, debug_span, error, info, instrument, trace};
1717

1818
const BLOB_CACHE_SIZE: u32 = (MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS) as u32;
1919
const CACHE_REQUEST_CHANNEL_SIZE: usize = (MAX_BLOBS_PER_BLOCK_ELECTRA * 2) as usize;
@@ -54,7 +54,7 @@ impl CacheHandle {
5454
slot: usize,
5555
tx_hash: B256,
5656
version_hashes: Vec<B256>,
57-
) -> FetchResult<Blobs> {
57+
) -> BlobberResult<Blobs> {
5858
let (resp, receiver) = oneshot::channel();
5959

6060
self.send(CacheInst::Retrieve {
@@ -66,7 +66,7 @@ impl CacheHandle {
6666
})
6767
.await;
6868

69-
receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash))
69+
receiver.await.map_err(|_| BlobberError::missing_sidecar(tx_hash))
7070
}
7171

7272
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
@@ -76,24 +76,24 @@ impl CacheHandle {
7676
slot: usize,
7777
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
7878
mut coder: C,
79-
) -> FetchResult<Bytes> {
79+
) -> BlobberResult<Bytes> {
8080
let tx_hash = extract.tx_hash();
8181
let versioned_hashes = extract
8282
.tx
8383
.as_eip4844()
84-
.ok_or_else(BlobFetcherError::non_4844_transaction)?
84+
.ok_or_else(BlobberError::non_4844_transaction)?
8585
.blob_versioned_hashes()
8686
.expect("tx is eip4844");
8787

8888
let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?;
8989

9090
coder
9191
.decode_all(blobs.as_ref())
92-
.ok_or_else(BlobFetcherError::blob_decode_error)?
92+
.ok_or_else(BlobberError::blob_decode_error)?
9393
.into_iter()
9494
.find(|data| keccak256(data) == extract.block_data_hash())
9595
.map(Into::into)
96-
.ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash))
96+
.ok_or_else(|| BlobberError::block_data_not_found(tx_hash))
9797
}
9898

9999
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
@@ -102,12 +102,21 @@ impl CacheHandle {
102102
&self,
103103
slot: usize,
104104
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
105-
) -> FetchResult<Bytes> {
105+
) -> BlobberResult<Bytes> {
106106
self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await
107107
}
108108

109109
/// Fetch the blobs, decode them using the provided coder, and construct a
110110
/// Zenith block from the header and data.
111+
///
112+
/// # Returns
113+
///
114+
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
115+
/// decoded.
116+
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
117+
/// decoded (e.g., due to a malformatted blob).
118+
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
119+
/// blobs.
111120
pub async fn signet_block_with_coder<C: SidecarCoder>(
112121
&self,
113122
host_block_number: u64,
@@ -116,13 +125,28 @@ impl CacheHandle {
116125
coder: C,
117126
) -> FetchResult<ZenithBlock> {
118127
let header = extract.ru_header(host_block_number);
119-
self.fetch_and_decode_with_coder(slot, extract, coder)
120-
.await
121-
.map(|buf| ZenithBlock::from_header_and_data(header, buf))
128+
let block_data = match self.fetch_and_decode_with_coder(slot, extract, coder).await {
129+
Ok(buf) => buf,
130+
Err(BlobberError::Decode(_)) => {
131+
trace!("Failed to decode block data");
132+
Bytes::default()
133+
}
134+
Err(BlobberError::Fetch(err)) => return Err(err),
135+
};
136+
Ok(ZenithBlock::from_header_and_data(header, block_data))
122137
}
123138

124139
/// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
125140
/// Zenith block from the header and data.
141+
///
142+
/// # Returns
143+
///
144+
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
145+
/// decoded.
146+
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
147+
/// decoded (e.g., due to a malformatted blob).
148+
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
149+
/// blobs.
126150
pub async fn signet_block(
127151
&self,
128152
host_block_number: u64,
@@ -159,7 +183,7 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
159183
slot: usize,
160184
tx_hash: B256,
161185
versioned_hashes: Vec<B256>,
162-
) -> FetchResult<Blobs> {
186+
) -> BlobberResult<Blobs> {
163187
// Cache hit
164188
if let Some(blobs) = self.cache.lock().unwrap().get(&(slot, tx_hash)) {
165189
info!(target: "signet_blobber::BlobCacher", "Cache hit");
@@ -169,23 +193,21 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
169193
// Cache miss, use the fetcher to retrieve blobs
170194
// Retry fetching blobs up to `FETCH_RETRIES` times
171195
for attempt in 1..=FETCH_RETRIES {
172-
let blobs = self.fetcher.fetch_blobs(slot, tx_hash, &versioned_hashes).await;
173-
174-
match blobs {
175-
Ok(blobs) => {
176-
self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone());
177-
return Ok(blobs);
178-
}
179-
Err(BlobFetcherError::Ignorable(e)) => {
180-
debug!(target: "signet_blobber::BlobCacher", attempt, %e, "Blob fetch attempt failed.");
181-
tokio::time::sleep(BETWEEN_RETRIES).await;
182-
continue;
183-
}
184-
Err(e) => return Err(e), // unrecoverable error
185-
}
196+
let Ok(blobs) = self
197+
.fetcher
198+
.fetch_blobs(slot, tx_hash, &versioned_hashes)
199+
.instrument(debug_span!("fetch_blobs_loop", attempt))
200+
.await
201+
else {
202+
tokio::time::sleep(BETWEEN_RETRIES).await;
203+
continue;
204+
};
205+
206+
self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone());
207+
return Ok(blobs);
186208
}
187209
error!(target: "signet_blobber::BlobCacher", "All fetch attempts failed");
188-
Err(BlobFetcherError::missing_sidecar(tx_hash))
210+
Err(BlobberError::missing_sidecar(tx_hash))
189211
}
190212

191213
/// Processes the cache instructions.

crates/blobber/src/error.rs

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
use alloy::{eips::eip2718::Eip2718Error, primitives::B256};
22
use reth::transaction_pool::BlobStoreError;
33

4-
/// Fetch Result
5-
pub type FetchResult<T, E = BlobFetcherError> = std::result::Result<T, E>;
4+
/// Result using [`BlobFetcherError`] as the default error type.
5+
pub type BlobberResult<T, E = BlobberError> = std::result::Result<T, E>;
6+
7+
/// Result using [`FetchError`] as the default error type.
8+
pub type FetchResult<T> = BlobberResult<T, FetchError>;
9+
10+
/// Result using [`DecodeError`] as the default error type.
11+
pub type DecodeResult<T> = BlobberResult<T, DecodeError>;
612

713
/// Unrecoverable blob fetching errors. These result in the node shutting
814
/// down. They occur when the blobstore is down or the sidecar is unretrievable.
915
#[derive(Debug, thiserror::Error)]
10-
pub enum UnrecoverableBlobError {
16+
pub enum FetchError {
1117
/// Reqwest error
1218
#[error(transparent)]
1319
Reqwest(#[from] reqwest::Error),
@@ -30,7 +36,7 @@ pub enum UnrecoverableBlobError {
3036

3137
/// Ignorable blob fetching errors. These result in the block being skipped.
3238
#[derive(Debug, thiserror::Error, Copy, Clone)]
33-
pub enum IgnorableBlobError {
39+
pub enum DecodeError {
3440
/// Incorrect transaction type error
3541
#[error("Non-4844 transaction")]
3642
Non4844Transaction,
@@ -50,86 +56,86 @@ pub enum IgnorableBlobError {
5056

5157
/// Blob fetching errors
5258
#[derive(Debug, thiserror::Error)]
53-
pub enum BlobFetcherError {
59+
pub enum BlobberError {
5460
/// Unrecoverable blob fetching error
5561
#[error(transparent)]
56-
Unrecoverable(#[from] UnrecoverableBlobError),
62+
Fetch(#[from] FetchError),
5763
/// Ignorable blob fetching error
5864
#[error(transparent)]
59-
Ignorable(#[from] IgnorableBlobError),
65+
Decode(#[from] DecodeError),
6066
}
6167

62-
impl BlobFetcherError {
68+
impl BlobberError {
6369
/// Returns true if the error is ignorable
64-
pub const fn is_ignorable(&self) -> bool {
65-
matches!(self, Self::Ignorable(_))
70+
pub const fn is_decode(&self) -> bool {
71+
matches!(self, Self::Decode(_))
6672
}
6773

6874
/// Returns true if the error is unrecoverable
69-
pub const fn is_unrecoverable(&self) -> bool {
70-
matches!(self, Self::Unrecoverable(_))
75+
pub const fn is_fetch(&self) -> bool {
76+
matches!(self, Self::Fetch(_))
7177
}
7278

7379
/// Non-4844 transaction error
7480
pub fn non_4844_transaction() -> Self {
75-
IgnorableBlobError::Non4844Transaction.into()
81+
DecodeError::Non4844Transaction.into()
7682
}
7783

7884
/// Blob decode error
7985
pub fn blob_decode_error() -> Self {
80-
IgnorableBlobError::BlobDecodeError.into()
86+
DecodeError::BlobDecodeError.into()
8187
}
8288

8389
/// Blob decode error
8490
pub fn block_decode_error(err: Eip2718Error) -> Self {
85-
IgnorableBlobError::BlockDecodeError(err).into()
91+
DecodeError::BlockDecodeError(err).into()
8692
}
8793

8894
/// Blob decoded, but expected hash not found
8995
pub fn block_data_not_found(tx: B256) -> Self {
90-
IgnorableBlobError::BlockDataNotFound(tx).into()
96+
DecodeError::BlockDataNotFound(tx).into()
9197
}
9298

9399
/// Missing sidecar error
94100
pub fn missing_sidecar(tx: B256) -> Self {
95-
UnrecoverableBlobError::MissingSidecar(tx).into()
101+
FetchError::MissingSidecar(tx).into()
96102
}
97103

98104
/// Blob store error
99105
pub fn blob_store(err: BlobStoreError) -> Self {
100-
UnrecoverableBlobError::BlobStore(err).into()
106+
FetchError::BlobStore(err).into()
101107
}
102108
}
103109

104-
impl From<BlobStoreError> for UnrecoverableBlobError {
110+
impl From<BlobStoreError> for FetchError {
105111
fn from(err: BlobStoreError) -> Self {
106112
match err {
107-
BlobStoreError::MissingSidecar(tx) => UnrecoverableBlobError::MissingSidecar(tx),
108-
_ => UnrecoverableBlobError::BlobStore(err),
113+
BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx),
114+
_ => FetchError::BlobStore(err),
109115
}
110116
}
111117
}
112118

113-
impl From<BlobStoreError> for BlobFetcherError {
119+
impl From<BlobStoreError> for BlobberError {
114120
fn from(err: BlobStoreError) -> Self {
115-
Self::Unrecoverable(err.into())
121+
Self::Fetch(err.into())
116122
}
117123
}
118124

119-
impl From<reqwest::Error> for BlobFetcherError {
125+
impl From<reqwest::Error> for BlobberError {
120126
fn from(err: reqwest::Error) -> Self {
121-
Self::Unrecoverable(err.into())
127+
Self::Fetch(err.into())
122128
}
123129
}
124130

125-
impl From<Eip2718Error> for BlobFetcherError {
131+
impl From<Eip2718Error> for BlobberError {
126132
fn from(err: Eip2718Error) -> Self {
127-
Self::Ignorable(err.into())
133+
Self::Decode(err.into())
128134
}
129135
}
130136

131-
impl From<url::ParseError> for BlobFetcherError {
137+
impl From<url::ParseError> for BlobberError {
132138
fn from(err: url::ParseError) -> Self {
133-
Self::Unrecoverable(UnrecoverableBlobError::UrlParse(err))
139+
Self::Fetch(FetchError::UrlParse(err))
134140
}
135141
}

0 commit comments

Comments
 (0)