Skip to content

Commit 7522167

Browse files
authored
fix: improve handling of malformed blobs :) (#34)
* fix: improve handling of malformmated blobs :) * refactor: split coder from blobber
1 parent dae6ca6 commit 7522167

File tree

12 files changed

+335
-307
lines changed

12 files changed

+335
-307
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["crates/*"]
33
resolver = "2"
44

55
[workspace.package]
6-
version = "0.10.3"
6+
version = "0.10.4"
77
edition = "2024"
88
rust-version = "1.88"
99
authors = ["init4"]

crates/blobber/src/builder.rs renamed to crates/blobber/src/blobs/builder.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig};
2-
use init4_bin_base::utils::calc::SlotCalculator;
32
use reth::transaction_pool::TransactionPool;
43
use url::Url;
54

@@ -35,7 +34,6 @@ pub struct BlobFetcherBuilder<Pool> {
3534
client: Option<reqwest::Client>,
3635
cl_url: Option<String>,
3736
pylon_url: Option<String>,
38-
slot_calculator: Option<SlotCalculator>,
3937
}
4038

4139
impl<Pool> BlobFetcherBuilder<Pool> {
@@ -47,7 +45,6 @@ impl<Pool> BlobFetcherBuilder<Pool> {
4745
client: self.client,
4846
cl_url: self.cl_url,
4947
pylon_url: self.pylon_url,
50-
slot_calculator: self.slot_calculator,
5148
}
5249
}
5350

@@ -107,22 +104,6 @@ impl<Pool> BlobFetcherBuilder<Pool> {
107104
self.pylon_url = Some(pylon_url.to_string());
108105
Ok(self)
109106
}
110-
111-
/// Set the slot calculator to use for the extractor.
112-
pub const fn with_slot_calculator(
113-
mut self,
114-
slot_calculator: SlotCalculator,
115-
) -> BlobFetcherBuilder<Pool> {
116-
self.slot_calculator = Some(slot_calculator);
117-
self
118-
}
119-
120-
/// Set the slot calculator to use for the extractor, using the Pecornino
121-
/// host configuration.
122-
pub const fn with_pecornino_slots(mut self) -> BlobFetcherBuilder<Pool> {
123-
self.slot_calculator = Some(SlotCalculator::pecorino_host());
124-
self
125-
}
126107
}
127108

128109
impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
@@ -141,9 +122,7 @@ impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
141122
let explorer =
142123
foundry_blob_explorers::Client::new_with_client(explorer_url, client.clone());
143124

144-
let slot_calculator = self.slot_calculator.ok_or(BuilderError::MissingSlotCalculator)?;
145-
146-
Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url, slot_calculator))
125+
Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url))
147126
}
148127

149128
/// Build a [`BlobCacher`] with the provided parameters.

crates/blobber/src/cache.rs renamed to crates/blobber/src/blobs/cache.rs

Lines changed: 63 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
1-
use crate::{BlobFetcherError, Blobs, FetchResult};
1+
use crate::{BlobFetcher, BlobberError, BlobberResult, Blobs, FetchResult};
22
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
33
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
44
use alloy::eips::merge::EPOCH_SLOTS;
55
use alloy::primitives::{B256, Bytes, keccak256};
6+
use core::fmt;
67
use reth::transaction_pool::TransactionPool;
78
use reth::{network::cache::LruMap, primitives::Receipt};
89
use signet_extract::ExtractedEvent;
910
use signet_zenith::Zenith::BlockSubmitted;
1011
use signet_zenith::ZenithBlock;
12+
use std::marker::PhantomData;
1113
use std::{
1214
sync::{Arc, Mutex},
1315
time::Duration,
1416
};
1517
use tokio::sync::{mpsc, oneshot};
16-
use tracing::{Instrument, debug, error, info, instrument};
18+
use tracing::{Instrument, debug_span, error, info, instrument, trace};
1719

1820
const BLOB_CACHE_SIZE: u32 = (MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS) as u32;
1921
const CACHE_REQUEST_CHANNEL_SIZE: usize = (MAX_BLOBS_PER_BLOCK_ELECTRA * 2) as usize;
@@ -37,11 +39,13 @@ enum CacheInst {
3739

3840
/// Handle for the cache.
3941
#[derive(Debug, Clone)]
40-
pub struct CacheHandle {
42+
pub struct CacheHandle<Coder = SimpleCoder> {
4143
sender: mpsc::Sender<CacheInst>,
44+
45+
_coder: PhantomData<Coder>,
4246
}
4347

44-
impl CacheHandle {
48+
impl<Coder> CacheHandle<Coder> {
4549
/// Sends a cache instruction.
4650
async fn send(&self, inst: CacheInst) {
4751
let _ = self.sender.send(inst).await;
@@ -54,7 +58,7 @@ impl CacheHandle {
5458
slot: usize,
5559
tx_hash: B256,
5660
version_hashes: Vec<B256>,
57-
) -> FetchResult<Blobs> {
61+
) -> BlobberResult<Blobs> {
5862
let (resp, receiver) = oneshot::channel();
5963

6064
self.send(CacheInst::Retrieve {
@@ -66,89 +70,87 @@ impl CacheHandle {
6670
})
6771
.await;
6872

69-
receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash))
73+
receiver.await.map_err(|_| BlobberError::missing_sidecar(tx_hash))
7074
}
7175

7276
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
7377
/// Zenith block data using the provided coder.
74-
pub async fn fetch_and_decode_with_coder<C: SidecarCoder>(
78+
pub async fn fetch_and_decode(
7579
&self,
7680
slot: usize,
7781
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
78-
mut coder: C,
79-
) -> FetchResult<Bytes> {
82+
) -> BlobberResult<Bytes>
83+
where
84+
Coder: SidecarCoder + Default,
85+
{
8086
let tx_hash = extract.tx_hash();
8187
let versioned_hashes = extract
8288
.tx
8389
.as_eip4844()
84-
.ok_or_else(BlobFetcherError::non_4844_transaction)?
90+
.ok_or_else(BlobberError::non_4844_transaction)?
8591
.blob_versioned_hashes()
8692
.expect("tx is eip4844");
8793

8894
let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?;
8995

90-
coder
96+
Coder::default()
9197
.decode_all(blobs.as_ref())
92-
.ok_or_else(BlobFetcherError::blob_decode_error)?
98+
.ok_or_else(BlobberError::blob_decode_error)?
9399
.into_iter()
94100
.find(|data| keccak256(data) == extract.block_data_hash())
95101
.map(Into::into)
96-
.ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash))
97-
}
98-
99-
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
100-
/// [`SimpleCoder`] to get the Zenith block data.
101-
pub async fn fech_and_decode(
102-
&self,
103-
slot: usize,
104-
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
105-
) -> FetchResult<Bytes> {
106-
self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await
102+
.ok_or_else(|| BlobberError::block_data_not_found(extract.block_data_hash()))
107103
}
108104

109105
/// Fetch the blobs, decode them using the provided coder, and construct a
110106
/// Zenith block from the header and data.
111-
pub async fn signet_block_with_coder<C: SidecarCoder>(
112-
&self,
113-
host_block_number: u64,
114-
slot: usize,
115-
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
116-
coder: C,
117-
) -> FetchResult<ZenithBlock> {
118-
let header = extract.ru_header(host_block_number);
119-
self.fetch_and_decode_with_coder(slot, extract, coder)
120-
.await
121-
.map(|buf| ZenithBlock::from_header_and_data(header, buf))
122-
}
123-
124-
/// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
125-
/// Zenith block from the header and data.
107+
///
108+
/// # Returns
109+
///
110+
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
111+
/// decoded.
112+
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
113+
/// decoded (e.g., due to a malformatted blob).
114+
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
115+
/// blobs.
126116
pub async fn signet_block(
127117
&self,
128118
host_block_number: u64,
129119
slot: usize,
130120
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
131-
) -> FetchResult<ZenithBlock> {
132-
self.signet_block_with_coder(host_block_number, slot, extract, SimpleCoder::default()).await
121+
) -> FetchResult<ZenithBlock>
122+
where
123+
Coder: SidecarCoder + Default,
124+
{
125+
let header = extract.ru_header(host_block_number);
126+
let block_data = match self.fetch_and_decode(slot, extract).await {
127+
Ok(buf) => buf,
128+
Err(BlobberError::Decode(_)) => {
129+
trace!("Failed to decode block data");
130+
Bytes::default()
131+
}
132+
Err(BlobberError::Fetch(err)) => return Err(err),
133+
};
134+
Ok(ZenithBlock::from_header_and_data(header, block_data))
133135
}
134136
}
135137

136138
/// Retrieves blobs and stores them in a cache for later use.
137139
pub struct BlobCacher<Pool> {
138-
fetcher: crate::BlobFetcher<Pool>,
140+
fetcher: BlobFetcher<Pool>,
139141

140142
cache: Mutex<LruMap<(usize, B256), Blobs>>,
141143
}
142144

143-
impl<Pool: core::fmt::Debug> core::fmt::Debug for BlobCacher<Pool> {
144-
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
145+
impl<Pool: fmt::Debug> fmt::Debug for BlobCacher<Pool> {
146+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145147
f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive()
146148
}
147149
}
148150

149151
impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
150152
/// Creates a new `BlobCacher` with the provided extractor and cache size.
151-
pub fn new(fetcher: crate::BlobFetcher<Pool>) -> Self {
153+
pub fn new(fetcher: BlobFetcher<Pool>) -> Self {
152154
Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() }
153155
}
154156

@@ -159,7 +161,7 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
159161
slot: usize,
160162
tx_hash: B256,
161163
versioned_hashes: Vec<B256>,
162-
) -> FetchResult<Blobs> {
164+
) -> BlobberResult<Blobs> {
163165
// Cache hit
164166
if let Some(blobs) = self.cache.lock().unwrap().get(&(slot, tx_hash)) {
165167
info!(target: "signet_blobber::BlobCacher", "Cache hit");
@@ -169,23 +171,21 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
169171
// Cache miss, use the fetcher to retrieve blobs
170172
// Retry fetching blobs up to `FETCH_RETRIES` times
171173
for attempt in 1..=FETCH_RETRIES {
172-
let blobs = self.fetcher.fetch_blobs(slot, tx_hash, &versioned_hashes).await;
173-
174-
match blobs {
175-
Ok(blobs) => {
176-
self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone());
177-
return Ok(blobs);
178-
}
179-
Err(BlobFetcherError::Ignorable(e)) => {
180-
debug!(target: "signet_blobber::BlobCacher", attempt, %e, "Blob fetch attempt failed.");
181-
tokio::time::sleep(BETWEEN_RETRIES).await;
182-
continue;
183-
}
184-
Err(e) => return Err(e), // unrecoverable error
185-
}
174+
let Ok(blobs) = self
175+
.fetcher
176+
.fetch_blobs(slot, tx_hash, &versioned_hashes)
177+
.instrument(debug_span!("fetch_blobs_loop", attempt))
178+
.await
179+
else {
180+
tokio::time::sleep(BETWEEN_RETRIES).await;
181+
continue;
182+
};
183+
184+
self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone());
185+
return Ok(blobs);
186186
}
187187
error!(target: "signet_blobber::BlobCacher", "All fetch attempts failed");
188-
Err(BlobFetcherError::missing_sidecar(tx_hash))
188+
Err(BlobberError::missing_sidecar(tx_hash))
189189
}
190190

191191
/// Processes the cache instructions.
@@ -215,10 +215,10 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
215215
///
216216
/// # Panics
217217
/// This function will panic if the cache task fails to spawn.
218-
pub fn spawn(self) -> CacheHandle {
218+
pub fn spawn<C: SidecarCoder + Default>(self) -> CacheHandle<C> {
219219
let (sender, inst) = mpsc::channel(CACHE_REQUEST_CHANNEL_SIZE);
220220
tokio::spawn(Arc::new(self).task_future(inst));
221-
CacheHandle { sender }
221+
CacheHandle { sender, _coder: PhantomData }
222222
}
223223
}
224224

@@ -234,7 +234,6 @@ mod tests {
234234
rlp::encode,
235235
signers::{SignerSync, local::PrivateKeySigner},
236236
};
237-
use init4_bin_base::utils::calc::SlotCalculator;
238237
use reth::primitives::Transaction;
239238
use reth_transaction_pool::{
240239
PoolTransaction, TransactionOrigin,
@@ -250,7 +249,6 @@ mod tests {
250249
let test = signet_constants::KnownChains::Test;
251250

252251
let constants: SignetSystemConstants = test.try_into().unwrap();
253-
let calc = SlotCalculator::new(0, 0, 12);
254252

255253
let explorer_url = "https://api.holesky.blobscan.com/";
256254
let client = reqwest::Client::builder().use_rustls_tls();
@@ -286,9 +284,8 @@ mod tests {
286284
.with_explorer_url(explorer_url)
287285
.with_client_builder(client)
288286
.unwrap()
289-
.with_slot_calculator(calc)
290287
.build_cache()?;
291-
let handle = cache.spawn();
288+
let handle = cache.spawn::<SimpleCoder>();
292289

293290
let got = handle
294291
.fetch_blobs(
File renamed without changes.

crates/blobber/src/blobs/error.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use alloy::primitives::B256;
2+
use reth::transaction_pool::BlobStoreError;
3+
4+
/// Result using [`FetchError`] as the default error type.
5+
pub type FetchResult<T> = Result<T, FetchError>;
6+
7+
/// Unrecoverable blob fetching errors. These result in the node shutting
8+
/// down. They occur when the blobstore is down or the sidecar is unretrievable.
9+
#[derive(Debug, thiserror::Error)]
10+
pub enum FetchError {
11+
/// Reqwest error
12+
#[error(transparent)]
13+
Reqwest(#[from] reqwest::Error),
14+
/// Missing sidecar error
15+
#[error("Cannot retrieve sidecar for {0} from any source")]
16+
MissingSidecar(B256),
17+
/// Reth blobstore error.
18+
#[error(transparent)]
19+
BlobStore(BlobStoreError),
20+
/// Url parse error.
21+
#[error(transparent)]
22+
UrlParse(#[from] url::ParseError),
23+
/// Consensus client URL not set error.
24+
#[error("Consensus client URL not set")]
25+
ConsensusClientUrlNotSet,
26+
/// Pylon client URL not set error.
27+
#[error("Pylon client URL not set")]
28+
PylonClientUrlNotSet,
29+
}
30+
31+
impl From<BlobStoreError> for FetchError {
32+
fn from(err: BlobStoreError) -> Self {
33+
match err {
34+
BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx),
35+
_ => FetchError::BlobStore(err),
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)