diff --git a/Cargo.lock b/Cargo.lock index 067016cb..1d345def 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2603,6 +2603,7 @@ dependencies = [ "clap", "common", "dotenvy", + "futures", "ops", "paladin-core", "proof_gen", diff --git a/leader/Cargo.toml b/leader/Cargo.toml index 7efadacf..b1ca703a 100644 --- a/leader/Cargo.toml +++ b/leader/Cargo.toml @@ -20,6 +20,7 @@ tokio = { workspace = true } proof_gen = { workspace = true } serde_json = { workspace = true } serde_path_to_error = { workspace = true } +futures = { workspace = true } alloy.workspace = true axum = "0.7.4" toml = "0.8.12" diff --git a/leader/src/http.rs b/leader/src/http.rs index 56c46bb0..63f9543f 100644 --- a/leader/src/http.rs +++ b/leader/src/http.rs @@ -4,7 +4,7 @@ use alloy::primitives::U256; use anyhow::{bail, Result}; use axum::{http::StatusCode, routing::post, Json, Router}; use paladin::runtime::Runtime; -use proof_gen::{proof_types::GeneratedBlockProof, types::PlonkyProofIntern}; +use proof_gen::proof_types::GeneratedBlockProof; use prover::BlockProverInput; use serde::{Deserialize, Serialize}; use serde_json::to_writer; @@ -46,7 +46,7 @@ fn write_to_file( match file { Ok(file) => { - to_writer(file, &generated_block_proof.intern)?; + to_writer(file, &generated_block_proof)?; Ok(fully_qualified_file_name) } Err(e) => { @@ -58,7 +58,7 @@ fn write_to_file( #[derive(Serialize, Deserialize, Debug)] struct HttpProverInput { prover_input: BlockProverInput, - previous: Option, + previous: Option, } async fn prove( @@ -73,7 +73,11 @@ async fn prove( match payload .prover_input - .prove(&runtime, payload.previous, save_inputs_on_error) + .prove( + &runtime, + payload.previous.map(futures::future::ok), + save_inputs_on_error, + ) .await { Ok(b_proof) => match write_to_file(output_dir, block_number, &b_proof) { diff --git a/leader/src/jerigon.rs b/leader/src/jerigon.rs index 802bd218..4059ec18 100644 --- a/leader/src/jerigon.rs +++ b/leader/src/jerigon.rs @@ -9,7 +9,6 @@ use anyhow::Result; use common::block_interval::BlockInterval; use paladin::runtime::Runtime; use proof_gen::proof_types::GeneratedBlockProof; -use proof_gen::types::PlonkyProofIntern; /// The main function for the jerigon mode. pub(crate) async fn jerigon_main( @@ -17,7 +16,7 @@ pub(crate) async fn jerigon_main( rpc_url: &str, block_interval: BlockInterval, checkpoint_block_number: u64, - previous_proof: Option, + previous_proof: Option, proof_output_dir_opt: Option, save_inputs_on_error: bool, ) -> Result<()> { @@ -28,18 +27,13 @@ pub(crate) async fn jerigon_main( ) .await?; - let previous_block_proof = previous_proof.map(|proof| GeneratedBlockProof { - b_height: checkpoint_block_number, - intern: proof, - }); - let block_proofs = prover_input - .prove(&runtime, previous_block_proof, save_inputs_on_error) + .prove(&runtime, previous_proof, save_inputs_on_error) .await?; runtime.close().await?; for block_proof in block_proofs { - let blokck_proof_str = serde_json::to_vec(&block_proof.intern)?; + let blokck_proof_str = serde_json::to_vec(&block_proof)?; write_proof( blokck_proof_str, proof_output_dir_opt.clone().map(|mut path| { diff --git a/leader/src/main.rs b/leader/src/main.rs index c21bac5d..0f241f22 100644 --- a/leader/src/main.rs +++ b/leader/src/main.rs @@ -9,7 +9,7 @@ use common::prover_state::TableLoadStrategy; use dotenvy::dotenv; use ops::register; use paladin::runtime::Runtime; -use proof_gen::types::PlonkyProofIntern; +use proof_gen::proof_types::GeneratedBlockProof; use tracing::info; use crate::utils::get_package_version; @@ -21,7 +21,7 @@ mod jerigon; mod stdio; mod utils; -fn get_previous_proof(path: Option) -> Result> { +fn get_previous_proof(path: Option) -> Result> { if path.is_none() { return Ok(None); } @@ -29,7 +29,7 @@ fn get_previous_proof(path: Option) -> Result let path = path.unwrap(); let file = File::open(path)?; let des = &mut serde_json::Deserializer::from_reader(&file); - let proof: PlonkyProofIntern = serde_path_to_error::deserialize(des)?; + let proof: GeneratedBlockProof = serde_path_to_error::deserialize(des)?; Ok(Some(proof)) } diff --git a/leader/src/stdio.rs b/leader/src/stdio.rs index 6013f225..b49223dc 100644 --- a/leader/src/stdio.rs +++ b/leader/src/stdio.rs @@ -2,13 +2,13 @@ use std::io::{Read, Write}; use anyhow::Result; use paladin::runtime::Runtime; -use proof_gen::types::PlonkyProofIntern; +use proof_gen::proof_types::GeneratedBlockProof; use prover::BlockProverInput; /// The main function for the stdio mode. pub(crate) async fn stdio_main( runtime: Runtime, - previous: Option, + previous: Option, save_inputs_on_error: bool, ) -> Result<()> { let mut buffer = String::new(); @@ -16,11 +16,17 @@ pub(crate) async fn stdio_main( let des = &mut serde_json::Deserializer::from_str(&buffer); let input: BlockProverInput = serde_path_to_error::deserialize(des)?; - let proof = input.prove(&runtime, previous, save_inputs_on_error).await; + let proof = input + .prove( + &runtime, + previous.map(futures::future::ok), + save_inputs_on_error, + ) + .await; runtime.close().await?; let proof = proof?; - std::io::stdout().write_all(&serde_json::to_vec(&proof.intern)?)?; + std::io::stdout().write_all(&serde_json::to_vec(&proof)?)?; Ok(()) } diff --git a/prover/src/lib.rs b/prover/src/lib.rs index c27132d3..028c9408 100644 --- a/prover/src/lib.rs +++ b/prover/src/lib.rs @@ -1,19 +1,19 @@ -use std::collections::HashMap; -use std::sync::Arc; +use std::future::Future; use alloy::primitives::U256; use anyhow::Result; #[cfg(feature = "test_only")] use futures::stream::TryStreamExt; +use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; use num_traits::ToPrimitive as _; use ops::TxProof; use paladin::{ directive::{Directive, IndexedStream}, runtime::Runtime, }; -use proof_gen::{proof_types::GeneratedBlockProof, types::PlonkyProofIntern}; +use proof_gen::proof_types::GeneratedBlockProof; use serde::{Deserialize, Serialize}; -use tokio::sync::Mutex; +use tokio::sync::oneshot; use trace_decoder::{ processed_block_trace::ProcessingMeta, trace_protocol::BlockTrace, @@ -39,7 +39,7 @@ impl BlockProverInput { pub async fn prove( self, runtime: &Runtime, - previous: Option, + previous: Option>>, save_inputs_on_error: bool, ) -> Result { use anyhow::Context as _; @@ -67,10 +67,11 @@ impl BlockProverInput { let block_number = block_number .to_u64() .context("block number overflows u64")?; - let prev = previous.map(|p| GeneratedBlockProof { - b_height: block_number - 1, - intern: p, - }); + let prev = if let Some(prev) = previous { + Some(prev.await?) + } else { + None + }; let block_proof = paladin::directive::Literal(proof) .map(&ops::BlockProof { @@ -91,7 +92,7 @@ impl BlockProverInput { pub async fn prove( self, runtime: &Runtime, - _previous: Option, + _previous: Option>>, save_inputs_on_error: bool, ) -> Result { let block_number = self.get_block_number(); @@ -133,67 +134,41 @@ impl ProverInput { pub async fn prove( self, runtime: &Runtime, - checkpoint_proof: Option, + previous_proof: Option, save_inputs_on_error: bool, ) -> Result> { - let results: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); - if let Some(checkpoint_proof) = checkpoint_proof { - results - .lock() - .await - .insert(checkpoint_proof.b_height, Some(checkpoint_proof.intern)); - }; - - for block in self.blocks { - //todo this will be further reorganized with the new BlockProofFuture, - // running multiple block proof generation in parallel, and awaiting for the - // previous block proof when needed to prove the next block. - // For now prove blocks one by one sequentially and assume previous proof is - // already available - let results = results.clone(); - async move { + let mut prev: Option>> = + previous_proof.map(|proof| Box::pin(futures::future::ok(proof)) as BoxFuture<_>); + + let results: FuturesOrdered<_> = self + .blocks + .into_iter() + .map(|block| { let block_number = block.get_block_number(); info!("Proving block {block_number}"); - // For genesis block we don't have a previous proof, so - // previous_block_number would be None - let previous_block_number: Option = block_number - .checked_sub(U256::from(1)) - .and_then(|n| n.to_u64()); - let previous_proof = if let Some(number) = previous_block_number { - //todo could we optimize this to avoid this large proof cloning? - results.lock().await.get(&number).cloned().flatten() - } else { - None - }; + + let (tx, rx) = oneshot::channel::(); + // Prove the block - let block_proof = block - .prove(runtime, previous_proof, save_inputs_on_error) - .await?; - results.lock().await.insert( - block_proof - .b_height - .to_u64() - .ok_or(anyhow::Error::msg("block number u64 overflow"))?, - Some(block_proof.intern), - ); - info!("Proving block {block_number} finished!"); - Result::<(), anyhow::Error>::Ok(()) - } - .await?; - } + let fut = block + .prove(runtime, prev.take(), save_inputs_on_error) + .then(|proof| async { + let proof = proof?; - let mut results = results.lock().await; - results - .drain() - .map(|(block_number, intern)| { - Ok::(GeneratedBlockProof { - b_height: block_number, - intern: intern.ok_or_else(|| { - anyhow::Error::msg("missing proof for block {block_number}") - })?, - }) + if tx.send(proof.clone()).is_err() { + anyhow::bail!("Failed to send proof"); + } + + Ok(proof) + }) + .boxed(); + + prev = Some(Box::pin(rx.map_err(anyhow::Error::new))); + + fut }) - .collect::, _>>() + .collect(); + + results.try_collect().await } } diff --git a/verifier/src/main.rs b/verifier/src/main.rs index 271a1346..5e740255 100644 --- a/verifier/src/main.rs +++ b/verifier/src/main.rs @@ -3,7 +3,7 @@ use std::fs::File; use anyhow::Result; use clap::Parser; use dotenvy::dotenv; -use proof_gen::types::PlonkyProofIntern; +use proof_gen::proof_types::GeneratedBlockProof; use serde_json::Deserializer; use tracing::info; @@ -17,14 +17,14 @@ fn main() -> Result<()> { let args = cli::Cli::parse(); let file = File::open(args.file_path)?; let des = &mut Deserializer::from_reader(&file); - let input: PlonkyProofIntern = serde_path_to_error::deserialize(des)?; + let input: GeneratedBlockProof = serde_path_to_error::deserialize(des)?; let verifer = args .prover_state_config .into_prover_state_manager() .verifier()?; - match verifer.verify(&input) { + match verifer.verify(&input.intern) { Ok(_) => info!("Proof verified successfully!"), Err(e) => info!("Proof verification failed with error: {:?}", e), };