Skip to content

Commit 6736b5d

Browse files
committed
refactor: split coder from blobber
1 parent d9b56b7 commit 6736b5d

File tree

11 files changed

+176
-256
lines changed

11 files changed

+176
-256
lines changed

crates/blobber/src/builder.rs renamed to crates/blobber/src/blobs/builder.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig};
2-
use init4_bin_base::utils::calc::SlotCalculator;
32
use reth::transaction_pool::TransactionPool;
43
use url::Url;
54

@@ -35,7 +34,6 @@ pub struct BlobFetcherBuilder<Pool> {
3534
client: Option<reqwest::Client>,
3635
cl_url: Option<String>,
3736
pylon_url: Option<String>,
38-
slot_calculator: Option<SlotCalculator>,
3937
}
4038

4139
impl<Pool> BlobFetcherBuilder<Pool> {
@@ -47,7 +45,6 @@ impl<Pool> BlobFetcherBuilder<Pool> {
4745
client: self.client,
4846
cl_url: self.cl_url,
4947
pylon_url: self.pylon_url,
50-
slot_calculator: self.slot_calculator,
5148
}
5249
}
5350

@@ -107,22 +104,6 @@ impl<Pool> BlobFetcherBuilder<Pool> {
107104
self.pylon_url = Some(pylon_url.to_string());
108105
Ok(self)
109106
}
110-
111-
/// Set the slot calculator to use for the extractor.
112-
pub const fn with_slot_calculator(
113-
mut self,
114-
slot_calculator: SlotCalculator,
115-
) -> BlobFetcherBuilder<Pool> {
116-
self.slot_calculator = Some(slot_calculator);
117-
self
118-
}
119-
120-
/// Set the slot calculator to use for the extractor, using the Pecornino
121-
/// host configuration.
122-
pub const fn with_pecornino_slots(mut self) -> BlobFetcherBuilder<Pool> {
123-
self.slot_calculator = Some(SlotCalculator::pecorino_host());
124-
self
125-
}
126107
}
127108

128109
impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
@@ -141,9 +122,7 @@ impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
141122
let explorer =
142123
foundry_blob_explorers::Client::new_with_client(explorer_url, client.clone());
143124

144-
let slot_calculator = self.slot_calculator.ok_or(BuilderError::MissingSlotCalculator)?;
145-
146-
Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url, slot_calculator))
125+
Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url))
147126
}
148127

149128
/// Build a [`BlobCacher`] with the provided parameters.

crates/blobber/src/cache.rs renamed to crates/blobber/src/blobs/cache.rs

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
use crate::{BlobberError, BlobberResult, Blobs, FetchResult};
1+
use crate::{BlobFetcher, BlobberError, BlobberResult, Blobs, FetchResult};
22
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
33
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
44
use alloy::eips::merge::EPOCH_SLOTS;
55
use alloy::primitives::{B256, Bytes, keccak256};
6+
use core::fmt;
67
use reth::transaction_pool::TransactionPool;
78
use reth::{network::cache::LruMap, primitives::Receipt};
89
use signet_extract::ExtractedEvent;
910
use signet_zenith::Zenith::BlockSubmitted;
1011
use signet_zenith::ZenithBlock;
12+
use std::marker::PhantomData;
1113
use std::{
1214
sync::{Arc, Mutex},
1315
time::Duration,
@@ -37,11 +39,13 @@ enum CacheInst {
3739

3840
/// Handle for the cache.
3941
#[derive(Debug, Clone)]
40-
pub struct CacheHandle {
42+
pub struct CacheHandle<Coder = SimpleCoder> {
4143
sender: mpsc::Sender<CacheInst>,
44+
45+
_coder: PhantomData<Coder>,
4246
}
4347

44-
impl CacheHandle {
48+
impl<Coder> CacheHandle<Coder> {
4549
/// Sends a cache instruction.
4650
async fn send(&self, inst: CacheInst) {
4751
let _ = self.sender.send(inst).await;
@@ -71,12 +75,14 @@ impl CacheHandle {
7175

7276
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
7377
/// Zenith block data using the provided coder.
74-
pub async fn fetch_and_decode_with_coder<C: SidecarCoder>(
78+
pub async fn fetch_and_decode(
7579
&self,
7680
slot: usize,
7781
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
78-
mut coder: C,
79-
) -> BlobberResult<Bytes> {
82+
) -> BlobberResult<Bytes>
83+
where
84+
Coder: SidecarCoder + Default,
85+
{
8086
let tx_hash = extract.tx_hash();
8187
let versioned_hashes = extract
8288
.tx
@@ -87,23 +93,13 @@ impl CacheHandle {
8793

8894
let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?;
8995

90-
coder
96+
Coder::default()
9197
.decode_all(blobs.as_ref())
9298
.ok_or_else(BlobberError::blob_decode_error)?
9399
.into_iter()
94100
.find(|data| keccak256(data) == extract.block_data_hash())
95101
.map(Into::into)
96-
.ok_or_else(|| BlobberError::block_data_not_found(tx_hash))
97-
}
98-
99-
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
100-
/// [`SimpleCoder`] to get the Zenith block data.
101-
pub async fn fech_and_decode(
102-
&self,
103-
slot: usize,
104-
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
105-
) -> BlobberResult<Bytes> {
106-
self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await
102+
.ok_or_else(|| BlobberError::block_data_not_found(extract.block_data_hash()))
107103
}
108104

109105
/// Fetch the blobs, decode them using the provided coder, and construct a
@@ -117,15 +113,17 @@ impl CacheHandle {
117113
/// decoded (e.g., due to a malformatted blob).
118114
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
119115
/// blobs.
120-
pub async fn signet_block_with_coder<C: SidecarCoder>(
116+
pub async fn signet_block(
121117
&self,
122118
host_block_number: u64,
123119
slot: usize,
124120
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
125-
coder: C,
126-
) -> FetchResult<ZenithBlock> {
121+
) -> FetchResult<ZenithBlock>
122+
where
123+
Coder: SidecarCoder + Default,
124+
{
127125
let header = extract.ru_header(host_block_number);
128-
let block_data = match self.fetch_and_decode_with_coder(slot, extract, coder).await {
126+
let block_data = match self.fetch_and_decode(slot, extract).await {
129127
Ok(buf) => buf,
130128
Err(BlobberError::Decode(_)) => {
131129
trace!("Failed to decode block data");
@@ -135,44 +133,24 @@ impl CacheHandle {
135133
};
136134
Ok(ZenithBlock::from_header_and_data(header, block_data))
137135
}
138-
139-
/// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
140-
/// Zenith block from the header and data.
141-
///
142-
/// # Returns
143-
///
144-
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
145-
/// decoded.
146-
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
147-
/// decoded (e.g., due to a malformatted blob).
148-
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
149-
/// blobs.
150-
pub async fn signet_block(
151-
&self,
152-
host_block_number: u64,
153-
slot: usize,
154-
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
155-
) -> FetchResult<ZenithBlock> {
156-
self.signet_block_with_coder(host_block_number, slot, extract, SimpleCoder::default()).await
157-
}
158136
}
159137

160138
/// Retrieves blobs and stores them in a cache for later use.
161139
pub struct BlobCacher<Pool> {
162-
fetcher: crate::BlobFetcher<Pool>,
140+
fetcher: BlobFetcher<Pool>,
163141

164142
cache: Mutex<LruMap<(usize, B256), Blobs>>,
165143
}
166144

167-
impl<Pool: core::fmt::Debug> core::fmt::Debug for BlobCacher<Pool> {
168-
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
145+
impl<Pool: fmt::Debug> fmt::Debug for BlobCacher<Pool> {
146+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169147
f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive()
170148
}
171149
}
172150

173151
impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
174152
/// Creates a new `BlobCacher` with the provided extractor and cache size.
175-
pub fn new(fetcher: crate::BlobFetcher<Pool>) -> Self {
153+
pub fn new(fetcher: BlobFetcher<Pool>) -> Self {
176154
Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() }
177155
}
178156

@@ -237,10 +215,10 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
237215
///
238216
/// # Panics
239217
/// This function will panic if the cache task fails to spawn.
240-
pub fn spawn(self) -> CacheHandle {
218+
pub fn spawn<C: SidecarCoder + Default>(self) -> CacheHandle<C> {
241219
let (sender, inst) = mpsc::channel(CACHE_REQUEST_CHANNEL_SIZE);
242220
tokio::spawn(Arc::new(self).task_future(inst));
243-
CacheHandle { sender }
221+
CacheHandle { sender, _coder: PhantomData }
244222
}
245223
}
246224

@@ -256,7 +234,6 @@ mod tests {
256234
rlp::encode,
257235
signers::{SignerSync, local::PrivateKeySigner},
258236
};
259-
use init4_bin_base::utils::calc::SlotCalculator;
260237
use reth::primitives::Transaction;
261238
use reth_transaction_pool::{
262239
PoolTransaction, TransactionOrigin,
@@ -272,7 +249,6 @@ mod tests {
272249
let test = signet_constants::KnownChains::Test;
273250

274251
let constants: SignetSystemConstants = test.try_into().unwrap();
275-
let calc = SlotCalculator::new(0, 0, 12);
276252

277253
let explorer_url = "https://api.holesky.blobscan.com/";
278254
let client = reqwest::Client::builder().use_rustls_tls();
@@ -308,9 +284,8 @@ mod tests {
308284
.with_explorer_url(explorer_url)
309285
.with_client_builder(client)
310286
.unwrap()
311-
.with_slot_calculator(calc)
312287
.build_cache()?;
313-
let handle = cache.spawn();
288+
let handle = cache.spawn::<SimpleCoder>();
314289

315290
let got = handle
316291
.fetch_blobs(
File renamed without changes.

crates/blobber/src/blobs/error.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use alloy::primitives::B256;
2+
use reth::transaction_pool::BlobStoreError;
3+
4+
/// Result using [`FetchError`] as the default error type.
5+
pub type FetchResult<T> = Result<T, FetchError>;
6+
7+
/// Unrecoverable blob fetching errors. These result in the node shutting
8+
/// down. They occur when the blobstore is down or the sidecar is unretrievable.
9+
#[derive(Debug, thiserror::Error)]
10+
pub enum FetchError {
11+
/// Reqwest error
12+
#[error(transparent)]
13+
Reqwest(#[from] reqwest::Error),
14+
/// Missing sidecar error
15+
#[error("Cannot retrieve sidecar for {0} from any source")]
16+
MissingSidecar(B256),
17+
/// Reth blobstore error.
18+
#[error(transparent)]
19+
BlobStore(BlobStoreError),
20+
/// Url parse error.
21+
#[error(transparent)]
22+
UrlParse(#[from] url::ParseError),
23+
/// Consensus client URL not set error.
24+
#[error("Consensus client URL not set")]
25+
ConsensusClientUrlNotSet,
26+
/// Pylon client URL not set error.
27+
#[error("Pylon client URL not set")]
28+
PylonClientUrlNotSet,
29+
}
30+
31+
impl From<BlobStoreError> for FetchError {
32+
fn from(err: BlobStoreError) -> Self {
33+
match err {
34+
BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx),
35+
_ => FetchError::BlobStore(err),
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)