Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.10.0"
version = "0.10.1"
edition = "2024"
rust-version = "1.88"
authors = ["init4"]
Expand Down Expand Up @@ -41,13 +41,14 @@ signet-rpc = { version = "0.10.0", path = "crates/rpc" }

init4-bin-base = { version = "0.11.0", features = ["alloy"] }

signet-bundle = "0.10.0"
signet-constants = "0.10.0"
signet-evm = "0.10.0"
signet-extract = "0.10.0"
signet-tx-cache = "0.10.0"
signet-types = "0.10.0"
signet-zenith = "0.10.0"
signet-bundle = "0.10.1"
signet-constants = "0.10.1"
signet-evm = "0.10.1"
signet-extract = "0.10.1"
signet-journal = "0.10.1"
signet-tx-cache = "0.10.1"
signet-types = "0.10.1"
signet-zenith = "0.10.1"

# ajj
ajj = { version = "0.3.4" }
Expand Down
3 changes: 1 addition & 2 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ repository.workspace = true
signet-node-types.workspace = true

signet-evm.workspace = true
signet-journal.workspace = true
signet-types.workspace = true
signet-zenith.workspace = true

trevm.workspace = true

alloy.workspace = true

reth.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use alloy::consensus::TxReceipt;

/// Trait for types that can be converted into other types as they're already compatible.
/// Uswed for converting between alloy/reth/signet types.
/// Used for converting between alloy/reth/signet types.
pub trait DataCompat<Other: DataCompat<Self>>: Sized {
/// Convert `self` into the target type.
fn convert(self) -> Other;
Expand Down
72 changes: 72 additions & 0 deletions crates/db/src/journal/ingestor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::{SignetDbRw, journal::JournalDb};
use futures_util::StreamExt;
use reth::providers::ProviderResult;
use signet_journal::{Journal, JournalStream};
use signet_node_types::NodeTypesDbTrait;
use std::sync::Arc;
use tokio::task::JoinHandle;

/// A task that ingests journals into a reth database.
#[derive(Debug)]
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
db: Arc<SignetDbRw<Db>>,
}

impl<Db: NodeTypesDbTrait> From<SignetDbRw<Db>> for JournalIngestor<Db> {
fn from(value: SignetDbRw<Db>) -> Self {
Self::new(value.into())
}
}

impl<Db: NodeTypesDbTrait> From<Arc<SignetDbRw<Db>>> for JournalIngestor<Db> {
fn from(value: Arc<SignetDbRw<Db>>) -> Self {
Self::new(value)
}
}

impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
/// Create a new `JournalIngestor` with the given database provider.
pub const fn new(db: Arc<SignetDbRw<Db>>) -> Self {
Self { db }
}

async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
where
S: JournalStream<'static> + Send + Unpin + 'static,
{
while let Some(Journal::V1(journal)) = stream.next().await {
// FUTURE: Sanity check that the header height matches the update
// height. Sanity check that both heights are 1 greater than the
// last height in the database.

let db = self.db.clone();

// DB interaction is sync, so we spawn a blocking task for it. We
// immediately await that task. This prevents blocking the worker
// thread
tokio::task::spawn_blocking(move || db.ingest(journal))
.await
.expect("ingestion should not panic")?;
}
// Stream has ended, return Ok
Ok(())
}

/// Spawn a task to ingest journals from the provided stream.
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
where
S: JournalStream<'static> + Send + Unpin + 'static,
{
tokio::spawn(self.task_future(stream))
}
}

/// Ingest journals from a stream into a reth database.
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
where
Db: NodeTypesDbTrait,
S: JournalStream<'static> + Send + Unpin + 'static,
{
let ingestor = JournalIngestor::new(db);
ingestor.task_future(stream).await
}
76 changes: 4 additions & 72 deletions crates/db/src/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,8 @@
mod r#trait;
pub use r#trait::JournalDb;

use crate::SignetDbRw;
use futures_util::{Stream, StreamExt};
use reth::providers::ProviderResult;
use signet_node_types::NodeTypesDbTrait;
use std::sync::Arc;
use tokio::task::JoinHandle;
use trevm::journal::BlockUpdate;
mod provider;
pub use provider::JournalProviderTask;

/// A task that ingests journals into a reth database.
#[derive(Debug)]
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
db: Arc<SignetDbRw<Db>>,
}

impl<Db: NodeTypesDbTrait> From<SignetDbRw<Db>> for JournalIngestor<Db> {
fn from(value: SignetDbRw<Db>) -> Self {
Self::new(value.into())
}
}

impl<Db: NodeTypesDbTrait> From<Arc<SignetDbRw<Db>>> for JournalIngestor<Db> {
fn from(value: Arc<SignetDbRw<Db>>) -> Self {
Self::new(value)
}
}

impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
/// Create a new `JournalIngestor` with the given database provider.
pub const fn new(db: Arc<SignetDbRw<Db>>) -> Self {
Self { db }
}

async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
where
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
{
while let Some(item) = stream.next().await {
// FUTURE: Sanity check that the header height matches the update
// height. Sanity check that both heights are 1 greater than the
// last height in the database.

let db = self.db.clone();
let (header, block_update) = item;

// DB interaction is sync, so we spawn a blocking task for it. We
// immediately await that task. This prevents blocking the worker
// thread
tokio::task::spawn_blocking(move || db.ingest(&header, block_update))
.await
.expect("ingestion should not panic")?;
}
// Stream has ended, return Ok
Ok(())
}

/// Spawn a task to ingest journals from the provided stream.
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
where
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
{
tokio::spawn(self.task_future(stream))
}
}

/// Ingest journals from a stream into a reth database.
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
where
Db: NodeTypesDbTrait,
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
{
let ingestor = JournalIngestor::new(db);
ingestor.task_future(stream).await
}
mod ingestor;
pub use ingestor::{JournalIngestor, ingest_journals};
80 changes: 80 additions & 0 deletions crates/db/src/journal/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::journal::JournalDb;
use futures_util::StreamExt;
use reth::{
primitives::SealedHeader,
providers::{
CanonChainTracker, DatabaseProviderFactory, DatabaseProviderRW, ProviderResult,
providers::BlockchainProvider,
},
rpc::types::engine::ForkchoiceState,
};
use signet_journal::{Journal, JournalStream};
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
use tokio::task::JoinHandle;

/// A task that processes journal updates for a specific database, and calls
/// the appropriate methods on a [`BlockchainProvider`] to update the in-memory
/// chain view.
#[derive(Debug, Clone)]
pub struct JournalProviderTask<Db: NodeTypesDbTrait> {
provider: BlockchainProvider<SignetNodeTypes<Db>>,
}

impl<Db: NodeTypesDbTrait> JournalProviderTask<Db> {
/// Instantiate a new task.
pub const fn new(provider: BlockchainProvider<SignetNodeTypes<Db>>) -> Self {
Self { provider }
}

/// Get a reference to the provider.
pub const fn provider(&self) -> &BlockchainProvider<SignetNodeTypes<Db>> {
&self.provider
}

/// Deconstruct the task into its provider.
pub fn into_inner(self) -> BlockchainProvider<SignetNodeTypes<Db>> {
self.provider
}

/// Create a future for the task, suitable for [`tokio::spawn`] or another
/// task-spawning system.
pub async fn task_future<S>(self, mut journals: S) -> ProviderResult<()>
where
S: JournalStream<'static> + Send + Unpin + 'static,
{
loop {
let Some(Journal::V1(journal)) = journals.next().await else { break };

let rw = self.provider.database_provider_rw().map(DatabaseProviderRW);

let r_header = SealedHeader::new_unhashed(journal.header().clone());
let block_hash = r_header.hash();

// DB interaction is sync, so we spawn a blocking task for it. We
// immediately await that task. This prevents blocking the worker
// thread
tokio::task::spawn_blocking(move || rw?.ingest(journal))
.await
.expect("ingestion should not panic")?;

self.provider.set_canonical_head(r_header.clone());
self.provider.set_safe(r_header.clone());
self.provider.set_finalized(r_header);
self.provider.on_forkchoice_update_received(&ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
});
}

Ok(())
}

/// Spawn the journal provider task.
pub fn spawn<S>(self, journals: S) -> JoinHandle<ProviderResult<()>>
where
S: JournalStream<'static> + Send + Unpin + 'static,
{
tokio::spawn(self.task_future(journals))
}
}
24 changes: 16 additions & 8 deletions crates/db/src/journal/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::RuWriter;
use alloy::consensus::{BlockHeader, Header};
use reth::{providers::ProviderResult, revm::db::BundleState};
use signet_evm::{BlockResult, ExecutionOutcome};
use signet_journal::HostJournal;
use signet_types::primitives::{RecoveredBlock, SealedBlock, SealedHeader, TransactionSigned};
use trevm::journal::BlockUpdate;

/// A database that can be updated with journals.
pub trait JournalDb: RuWriter {
Expand All @@ -18,20 +18,26 @@ pub trait JournalDb: RuWriter {
///
/// This is intended to be used for tx simulation, and other purposes that
/// need fast state access WITHTOUT needing to retrieve historical data.
fn ingest(&self, header: &Header, update: BlockUpdate<'_>) -> ProviderResult<()> {
let journal_hash = update.journal_hash();
fn ingest(&self, journal: HostJournal<'static>) -> ProviderResult<()> {
let journal_hash = journal.journal_hash();

let (meta, bsi) = journal.into_parts();
let (host_height, _, header) = meta.into_parts();

// TODO: remove the clone in future versions. This can be achieved by
// _NOT_ making a `BlockResult` and instead manually updating relevan
// tables. However, this means diverging more fro the underlying reth
// logic that we are currently re-using.
let bundle_state: BundleState = update.journal().clone().into();
let bundle_state: BundleState = bsi.into();
let execution_outcome = ExecutionOutcome::new(bundle_state, vec![], header.number());

let block: SealedBlock<TransactionSigned, Header> =
SealedBlock { header: SealedHeader::new(header.to_owned()), body: Default::default() };
let block_result =
BlockResult { sealed_block: RecoveredBlock::new(block, vec![]), execution_outcome };
SealedBlock { header: SealedHeader::new(header), body: Default::default() };
let block_result = BlockResult {
sealed_block: RecoveredBlock::new(block, vec![]),
execution_outcome,
host_height,
};

self.append_host_block(
None,
Expand All @@ -40,7 +46,9 @@ pub trait JournalDb: RuWriter {
std::iter::empty(),
&block_result,
journal_hash,
)
)?;

Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ where
//
// last reviewed at tag v1.5.1

let BlockResult { sealed_block: block, execution_outcome } = block_result;
let BlockResult { sealed_block: block, execution_outcome, .. } = block_result;

let ru_height = block.number();
self.insert_signet_block(header, block, journal_hash, StorageLocation::Database)?;
Expand Down
Loading
Loading