From dc5d70e0f1324148ceb1ae1188ca017eaab4a7cd Mon Sep 17 00:00:00 2001 From: James Date: Fri, 5 Sep 2025 12:25:53 -0400 Subject: [PATCH] feat: migrate block processor from the node --- Cargo.toml | 9 +- crates/block-processor/Cargo.toml | 32 +++ crates/block-processor/README.md | 13 + crates/block-processor/src/lib.rs | 25 ++ crates/block-processor/src/v1/mod.rs | 2 + crates/block-processor/src/v1/processor.rs | 296 +++++++++++++++++++++ crates/db/Cargo.toml | 2 + crates/db/src/aliases.rs | 8 + crates/db/src/lib.rs | 2 +- crates/rpc/Cargo.toml | 1 + crates/rpc/src/ctx/full.rs | 3 +- crates/rpc/src/ctx/mod.rs | 8 - crates/rpc/src/ctx/signet.rs | 2 +- crates/rpc/src/lib.rs | 2 +- 14 files changed, 389 insertions(+), 16 deletions(-) create mode 100644 crates/block-processor/Cargo.toml create mode 100644 crates/block-processor/README.md create mode 100644 crates/block-processor/src/lib.rs create mode 100644 crates/block-processor/src/v1/mod.rs create mode 100644 crates/block-processor/src/v1/processor.rs diff --git a/Cargo.toml b/Cargo.toml index 45128f6..6c4c9d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,10 +34,11 @@ debug = false incremental = false [workspace.dependencies] -signet-blobber = { version = "0.10.0", path = "crates/blobber" } -signet-db = { version = "0.10.0", path = "crates/db" } -signet-node-types = { version = "0.10.0", path = "crates/node-types" } -signet-rpc = { version = "0.10.0", path = "crates/rpc" } +signet-blobber = { version = "0.10", path = "crates/blobber" } +signet-block-processor = { version = "0.10", path = "crates/block-processor" } +signet-db = { version = "0.10", path = "crates/db" } +signet-node-types = { version = "0.10", path = "crates/node-types" } +signet-rpc = { version = "0.10", path = "crates/rpc" } init4-bin-base = { version = "0.11.0", features = ["alloy"] } diff --git a/crates/block-processor/Cargo.toml b/crates/block-processor/Cargo.toml new file mode 100644 index 0000000..da0e12b --- /dev/null +++ b/crates/block-processor/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "signet-block-processor" +description = "High-level flows for Signet blob extraction and EVM invocation" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +signet-evm.workspace = true +signet-extract.workspace = true +signet-journal.workspace = true + +init4-bin-base.workspace = true + +signet-blobber.workspace = true +signet-db.workspace = true +signet-node-types.workspace = true + +reth.workspace = true +reth-exex.workspace = true +reth-node-api.workspace = true + +tracing.workspace = true +eyre.workspace = true +alloy.workspace = true +signet-constants.workspace = true +trevm.workspace = true +reth-chainspec.workspace = true diff --git a/crates/block-processor/README.md b/crates/block-processor/README.md new file mode 100644 index 0000000..c239a6d --- /dev/null +++ b/crates/block-processor/README.md @@ -0,0 +1,13 @@ +# Signet Block Processor + +Block processing logic for the Signet Node. This crate takes a reth `Chain`, +runs the Signet EVM, and commits the results to a database. + +# Significant Types + +- A few convenience type aliases: + - `PrimitivesOf` - The primitives type used by the host. + - `Chain` - A reth `Chain` using the host's primitives. + - `ExExNotification` - A reth `ExExNotification` using the host's + primitives. +- `SignetBlockProcessorV1` - The first version of the block processor. diff --git a/crates/block-processor/src/lib.rs b/crates/block-processor/src/lib.rs new file mode 100644 index 0000000..36d0ee5 --- /dev/null +++ b/crates/block-processor/src/lib.rs @@ -0,0 +1,25 @@ +#![doc = include_str!("../README.md")] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + missing_docs, + unreachable_pub, + clippy::missing_const_for_fn, + rustdoc::all +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod v1; +pub use v1::SignetBlockProcessor as SignetBlockProcessorV1; + +/// Primitives used by the host. +pub type PrimitivesOf = + <::Types as reth_node_api::NodeTypes>::Primitives; + +/// A [`reth::providers::Chain`] using the host primitives. +pub type Chain = reth::providers::Chain>; + +/// A [`reth_exex::ExExNotification`] using the host primitives. +pub type ExExNotification = reth_exex::ExExNotification>; diff --git a/crates/block-processor/src/v1/mod.rs b/crates/block-processor/src/v1/mod.rs new file mode 100644 index 0000000..8977efa --- /dev/null +++ b/crates/block-processor/src/v1/mod.rs @@ -0,0 +1,2 @@ +mod processor; +pub use processor::SignetBlockProcessor; diff --git a/crates/block-processor/src/v1/processor.rs b/crates/block-processor/src/v1/processor.rs new file mode 100644 index 0000000..c0fbd07 --- /dev/null +++ b/crates/block-processor/src/v1/processor.rs @@ -0,0 +1,296 @@ +use crate::Chain; +use alloy::{consensus::BlockHeader, primitives::B256}; +use eyre::ContextCompat; +use init4_bin_base::utils::calc::SlotCalculator; +use reth::{ + primitives::EthPrimitives, + providers::{BlockNumReader, BlockReader, ExecutionOutcome, HeaderProvider, ProviderFactory}, + revm::{database::StateProviderDatabase, db::StateBuilder}, +}; +use reth_chainspec::{ChainSpec, EthereumHardforks}; +use reth_node_api::{FullNodeComponents, NodeTypes}; +use signet_blobber::{CacheHandle, ExtractableChainShim}; +use signet_constants::SignetSystemConstants; +use signet_db::{DataCompat, DbProviderExt, RuChain, RuRevmState, RuWriter}; +use signet_evm::{BlockResult, EvmNeedsCfg, SignetDriver}; +use signet_extract::{Extractor, Extracts}; +use signet_journal::HostJournal; +use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes}; +use std::collections::VecDeque; +use std::sync::Arc; +use tracing::{Instrument, debug, error, info, info_span, instrument}; +use trevm::revm::primitives::hardfork::SpecId; + +/// A block processor that listens to host chain commits and processes +/// Signet blocks accordingly. +#[derive(Debug)] +pub struct SignetBlockProcessor +where + Db: NodeTypesDbTrait, +{ + /// Signet System Constants + constants: SignetSystemConstants, + + /// The chain specification, used to determine active hardforks. + chain_spec: Arc, + + /// A [`ProviderFactory`] instance to allow RU database access. + ru_provider: ProviderFactory>, + + /// The slot calculator. + slot_calculator: SlotCalculator, + + /// A handle to the blob cacher. + blob_cacher: CacheHandle, +} + +impl SignetBlockProcessor +where + Db: NodeTypesDbTrait, +{ + /// Create a new [`SignetBlockProcessor`]. + pub const fn new( + constants: SignetSystemConstants, + chain_spec: Arc, + ru_provider: ProviderFactory>, + slot_calculator: SlotCalculator, + blob_cacher: CacheHandle, + ) -> Self { + Self { constants, chain_spec, ru_provider, slot_calculator, blob_cacher } + } + + /// Get the active spec id at the given timestamp. + fn spec_id(&self, timestamp: u64) -> SpecId { + if self.chain_spec.is_prague_active_at_timestamp(timestamp) { + SpecId::PRAGUE + } else { + SpecId::CANCUN + } + } + + /// Make a [`StateProviderDatabase`] from the read-write provider, suitable + /// for use with Trevm. + fn state_provider_database(&self, height: u64) -> eyre::Result { + // Get the state provider for the block number + let sp = self.ru_provider.history_by_block_number(height)?; + + // Wrap in Revm comatibility layer + let spd = StateProviderDatabase::new(sp); + let builder = StateBuilder::new_with_database(spd); + + Ok(builder.with_bundle_update().build()) + } + + /// Make a new Trevm instance, building on the given height. + fn trevm(&self, parent_height: u64, spec_id: SpecId) -> eyre::Result> { + let db = self.state_provider_database(parent_height)?; + + let mut trevm = signet_evm::signet_evm(db, self.constants.clone()); + + trevm.set_spec_id(spec_id); + + Ok(trevm) + } + + /// Called when the host chain has committed a block or set of blocks. + #[instrument(skip_all, fields(count = chain.len(), first = chain.first().number(), tip = chain.tip().number()))] + pub async fn on_host_commit(&self, chain: &Chain) -> eyre::Result> + where + Host: FullNodeComponents, + Host::Types: NodeTypes, + { + let highest = chain.tip().number(); + if highest < self.constants.host_deploy_height() { + return Ok(None); + } + + // this should never happen but we want to handle it anyway + if chain.is_empty() { + return Ok(None); + } + + let extractor = Extractor::new(self.constants.clone()); + let shim = ExtractableChainShim::new(chain); + let outputs = extractor.extract_signet(&shim); + + // TODO: ENG-481 Inherit prune modes from Reth configuration. + // https://linear.app/initiates/issue/ENG-481/inherit-prune-modes-from-reth-node + + // The extractor will filter out blocks at or before the deployment + // height, so we don't need compute the start from the notification. + let mut start = None; + let mut current = 0; + let mut prev_block_journal = self.ru_provider.provider_rw()?.latest_journal_hash()?; + + let mut net_outcome = ExecutionOutcome::default(); + let last_ru_height = self.ru_provider.last_block_number()?; + + // There might be a case where we can get a notification that starts + // "lower" than our last processed block, + // but contains new information beyond one point. In this case, we + // should simply skip the block. + for block_extracts in outputs.skip_while(|extract| extract.ru_height <= last_ru_height) { + // If we haven't set the start yet, set it to the first block. + if start.is_none() { + let new_ru_height = block_extracts.ru_height; + + // If the above condition passes, we should always be + // committing without skipping a range of blocks. + if new_ru_height != last_ru_height + 1 { + error!( + %new_ru_height, + %last_ru_height, + "missing range of DB blocks" + ); + eyre::bail!("missing range of DB blocks"); + } + start = Some(new_ru_height); + } + current = block_extracts.ru_height; + let spec_id = self.spec_id(block_extracts.host_block.timestamp()); + + let span = info_span!( + "signet::handle_zenith_outputs::block_processing", + start = start.unwrap(), + ru_height = block_extracts.ru_height, + host_height = block_extracts.host_block.number(), + has_ru_block = block_extracts.submitted.is_some(), + ); + + tracing::trace!("Running EVM"); + let block_result = self.run_evm(&block_extracts, spec_id).instrument(span).await?; + tracing::trace!("Committing EVM results"); + let journal = + self.commit_evm_results(&block_extracts, &block_result, prev_block_journal)?; + + prev_block_journal = journal.journal_hash(); + net_outcome.extend(block_result.execution_outcome.convert()); + } + info!("committed blocks"); + + // If we didn't process any blocks, we don't need to return anything. + // In practice, this should never happen, as we should always have at + // least one block to process. + if start.is_none() { + return Ok(None); + } + let start = start.expect("checked by early return"); + + // Return the range of blocks we processed + let provider = self.ru_provider.provider_rw()?; + + let ru_info = provider.get_extraction_results(start..=current)?; + + let inner = + Chain::::new(provider.recovered_block_range(start..=current)?, net_outcome, None); + + Ok(Some(RuChain { inner, ru_info })) + } + + /// ========================== + /// ========================== + /// ██████ ██ ██ ███ ██ + /// ██ ██ ██ ██ ████ ██ + /// ██████ ██ ██ ██ ██ ██ + /// ██ ██ ██ ██ ██ ██ ██ + /// ██ ██ ██████ ██ ████ + /// + /// + /// ███████ ██ ██ ███ ███ + /// ██ ██ ██ ████ ████ + /// █████ ██ ██ ██ ████ ██ + /// ██ ██ ██ ██ ██ ██ + /// ███████ ████ ██ ██ + /// =========================== + /// =========================== + async fn run_evm( + &self, + block_extracts: &Extracts<'_, ExtractableChainShim<'_>>, + spec_id: SpecId, + ) -> eyre::Result { + let ru_height = block_extracts.ru_height; + let host_height = block_extracts.host_block.number(); + let timestamp = block_extracts.host_block.timestamp(); + + let parent_header = self + .ru_provider + .sealed_header(block_extracts.ru_height.saturating_sub(1))? + .wrap_err("parent ru block not present in DB") + .inspect_err(|e| error!(%e))?; + + let slot = self.slot_calculator.slot_ending_at(timestamp).expect("host chain has started"); + + let txns = match &block_extracts.submitted { + Some(submitted) => { + self.blob_cacher + .signet_block(block_extracts.host_block.number(), slot, submitted) + .await + .map(|block| block.into_parts().1) + .unwrap_or_default() + .into_iter() + .filter(|tx| !tx.is_eip4844()) // redundant, but let's be sure + .map(|tx| tx.into()) + .collect::>() + } + None => VecDeque::new(), + }; + + let mut driver = SignetDriver::new( + block_extracts, + txns, + parent_header.convert(), + self.constants.clone(), + ); + + let trevm = self.trevm(driver.parent().number(), spec_id)?.fill_cfg(&driver); + + let trevm = match trevm.drive_block(&mut driver) { + Ok(t) => t, + Err(e) => return Err(e.into_error().into()), + }; + + let (sealed_block, receipts) = driver.finish(); + let bundle = trevm.finish(); + + Ok(BlockResult { + sealed_block, + execution_outcome: signet_evm::ExecutionOutcome::new(bundle, vec![receipts], ru_height), + host_height, + }) + } + + /// Commit the outputs of a zenith block to the database. + #[instrument(skip_all)] + fn commit_evm_results<'a>( + &self, + extracts: &Extracts<'_, ExtractableChainShim<'_>>, + block_result: &'a BlockResult, + prev_block_journal: B256, + ) -> eyre::Result> { + let journal = block_result.make_host_journal(prev_block_journal); + let time = std::time::Instant::now(); + let jh = journal.journal_hash(); + + debug!( + target: "signet::journal::serialize", + bytes = journal.serialized().len(), + hash = %jh, + elapsed_micros = %time.elapsed().as_micros(), + "journal produced" + ); + + self.ru_provider.provider_rw()?.update(|writer| { + // add execution results to database + writer.append_host_block( + extracts.ru_header(), + extracts.transacts().cloned(), + extracts.enters(), + extracts.enter_tokens(), + block_result, + jh, + )?; + Ok(()) + })?; + Ok(journal) + } +} diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index cf52191..1006923 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -16,6 +16,8 @@ signet-journal.workspace = true signet-types.workspace = true signet-zenith.workspace = true +trevm.workspace = true + alloy.workspace = true reth.workspace = true diff --git a/crates/db/src/aliases.rs b/crates/db/src/aliases.rs index baa59ce..da9a7c5 100644 --- a/crates/db/src/aliases.rs +++ b/crates/db/src/aliases.rs @@ -3,3 +3,11 @@ use signet_node_types::SignetNodeTypes; /// A Convenience alias for a [`DatabaseProviderRW`] using [`SignetNodeTypes`]. pub type SignetDbRw = DatabaseProviderRW>; + +/// Type alias for EVMs using a [`StateProviderBox`] as the `DB` type for +/// trevm. +/// +/// [`StateProviderBox`]: reth::providers::StateProviderBox +pub type RuRevmState = trevm::revm::database::State< + reth::revm::database::StateProviderDatabase, +>; diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 361810e..f61b193 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -12,7 +12,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod aliases; -pub use aliases::SignetDbRw; +pub use aliases::{RuRevmState, SignetDbRw}; mod chain; pub use chain::{DbExtractionResults, RuChain}; diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 711f1ec..1ae8e49 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -43,6 +43,7 @@ serde_json.workspace = true futures-util = "0.3.31" itertools.workspace = true revm-inspectors = "0.26.5" +signet-db.workspace = true [dev-dependencies] signet-zenith.workspace = true diff --git a/crates/rpc/src/ctx/full.rs b/crates/rpc/src/ctx/full.rs index 6bebc1c..76efb68 100644 --- a/crates/rpc/src/ctx/full.rs +++ b/crates/rpc/src/ctx/full.rs @@ -1,4 +1,4 @@ -use crate::{RuRevmState, SignetCtx}; +use crate::SignetCtx; use alloy::{consensus::Header, eips::BlockId}; use reth::{ providers::{ProviderResult, providers::BlockchainProvider}, @@ -7,6 +7,7 @@ use reth::{ tasks::{TaskExecutor, TaskSpawner}, }; use reth_node_api::FullNodeComponents; +use signet_db::RuRevmState; use signet_evm::EvmNeedsTx; use signet_node_types::Pnt; use signet_tx_cache::client::TxCache; diff --git a/crates/rpc/src/ctx/mod.rs b/crates/rpc/src/ctx/mod.rs index af0d93e..ea63b73 100644 --- a/crates/rpc/src/ctx/mod.rs +++ b/crates/rpc/src/ctx/mod.rs @@ -6,11 +6,3 @@ pub use full::{LoadState, RpcCtx}; mod fee_hist; pub(crate) use fee_hist::strip_signet_system_txns; - -/// Type alias for EVMs using a [`StateProviderBox`] as the `DB` type for -/// trevm. -/// -/// [`StateProviderBox`]: reth::providers::StateProviderBox -pub type RuRevmState = trevm::revm::database::State< - reth::revm::database::StateProviderDatabase, ->; diff --git a/crates/rpc/src/ctx/signet.rs b/crates/rpc/src/ctx/signet.rs index 8a3a6be..02661d0 100644 --- a/crates/rpc/src/ctx/signet.rs +++ b/crates/rpc/src/ctx/signet.rs @@ -1,5 +1,4 @@ use crate::{ - RuRevmState, ctx::strip_signet_system_txns, eth::EthError, interest::{ActiveFilter, FilterManager, FilterOutput, SubscriptionManager}, @@ -41,6 +40,7 @@ use reth::{ use reth_chainspec::{BaseFeeParams, ChainSpec, ChainSpecProvider}; use reth_node_api::BlockBody; use reth_rpc_eth_api::{RpcBlock, RpcConvert, RpcReceipt, RpcTransaction}; +use signet_db::RuRevmState; use signet_node_types::Pnt; use signet_tx_cache::client::TxCache; use signet_types::{MagicSig, constants::SignetSystemConstants}; diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 1a17819..0a5ff8a 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -54,7 +54,7 @@ mod config; pub use config::{RpcServerGuard, ServeConfig}; mod ctx; -pub use ctx::{LoadState, RpcCtx, RuRevmState, SignetCtx}; +pub use ctx::{LoadState, RpcCtx, SignetCtx}; mod debug; pub use debug::{DebugError, debug};