Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

feat: implement concurrent block proving #88 #89 #96

Merged
merged 7 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions leader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio = { workspace = true }
proof_gen = { workspace = true }
serde_json = { workspace = true }
serde_path_to_error = { workspace = true }
futures = { workspace = true }
alloy.workspace = true
axum = "0.7.4"
toml = "0.8.12"
Expand Down
12 changes: 8 additions & 4 deletions leader/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy::primitives::U256;
use anyhow::{bail, Result};
use axum::{http::StatusCode, routing::post, Json, Router};
use paladin::runtime::Runtime;
use proof_gen::{proof_types::GeneratedBlockProof, types::PlonkyProofIntern};
use proof_gen::proof_types::GeneratedBlockProof;
use prover::BlockProverInput;
use serde::{Deserialize, Serialize};
use serde_json::to_writer;
Expand Down Expand Up @@ -46,7 +46,7 @@ fn write_to_file(

match file {
Ok(file) => {
to_writer(file, &generated_block_proof.intern)?;
to_writer(file, &generated_block_proof)?;
Ok(fully_qualified_file_name)
}
Err(e) => {
Expand All @@ -58,7 +58,7 @@ fn write_to_file(
#[derive(Serialize, Deserialize, Debug)]
struct HttpProverInput {
prover_input: BlockProverInput,
previous: Option<PlonkyProofIntern>,
previous: Option<GeneratedBlockProof>,
}

async fn prove(
Expand All @@ -73,7 +73,11 @@ async fn prove(

match payload
.prover_input
.prove(&runtime, payload.previous, save_inputs_on_error)
.prove(
&runtime,
payload.previous.map(futures::future::ok),
save_inputs_on_error,
)
.await
{
Ok(b_proof) => match write_to_file(output_dir, block_number, &b_proof) {
Expand Down
12 changes: 3 additions & 9 deletions leader/src/jerigon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ use anyhow::Result;
use common::block_interval::BlockInterval;
use paladin::runtime::Runtime;
use proof_gen::proof_types::GeneratedBlockProof;
use proof_gen::types::PlonkyProofIntern;

/// The main function for the jerigon mode.
pub(crate) async fn jerigon_main(
runtime: Runtime,
rpc_url: &str,
block_interval: BlockInterval,
checkpoint_block_number: u64,
previous_proof: Option<PlonkyProofIntern>,
previous_proof: Option<GeneratedBlockProof>,
proof_output_dir_opt: Option<PathBuf>,
save_inputs_on_error: bool,
) -> Result<()> {
Expand All @@ -28,18 +27,13 @@ pub(crate) async fn jerigon_main(
)
.await?;

let checkpoint_block_proof = previous_proof.map(|proof| GeneratedBlockProof {
b_height: checkpoint_block_number,
intern: proof,
});

let block_proofs = prover_input
.prove(&runtime, checkpoint_block_proof, save_inputs_on_error)
.prove(&runtime, previous_proof, save_inputs_on_error)
.await?;
runtime.close().await?;

for block_proof in block_proofs {
let block_proof_str = serde_json::to_vec(&block_proof.intern)?;
let block_proof_str = serde_json::to_vec(&block_proof)?;
write_proof(
block_proof_str,
proof_output_dir_opt.clone().map(|mut path| {
Expand Down
6 changes: 3 additions & 3 deletions leader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use common::prover_state::TableLoadStrategy;
use dotenvy::dotenv;
use ops::register;
use paladin::runtime::Runtime;
use proof_gen::types::PlonkyProofIntern;
use proof_gen::proof_types::GeneratedBlockProof;
use tracing::info;

use crate::utils::get_package_version;
Expand All @@ -21,15 +21,15 @@ mod jerigon;
mod stdio;
mod utils;

fn get_previous_proof(path: Option<PathBuf>) -> Result<Option<PlonkyProofIntern>> {
fn get_previous_proof(path: Option<PathBuf>) -> Result<Option<GeneratedBlockProof>> {
if path.is_none() {
return Ok(None);
}

let path = path.unwrap();
let file = File::open(path)?;
let des = &mut serde_json::Deserializer::from_reader(&file);
let proof: PlonkyProofIntern = serde_path_to_error::deserialize(des)?;
let proof: GeneratedBlockProof = serde_path_to_error::deserialize(des)?;
Ok(Some(proof))
}

Expand Down
14 changes: 10 additions & 4 deletions leader/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@ use std::io::{Read, Write};

use anyhow::Result;
use paladin::runtime::Runtime;
use proof_gen::types::PlonkyProofIntern;
use proof_gen::proof_types::GeneratedBlockProof;
use prover::BlockProverInput;

/// The main function for the stdio mode.
pub(crate) async fn stdio_main(
runtime: Runtime,
previous: Option<PlonkyProofIntern>,
previous: Option<GeneratedBlockProof>,
save_inputs_on_error: bool,
) -> Result<()> {
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;

let des = &mut serde_json::Deserializer::from_str(&buffer);
let input: BlockProverInput = serde_path_to_error::deserialize(des)?;
let proof = input.prove(&runtime, previous, save_inputs_on_error).await;
let proof = input
.prove(
&runtime,
previous.map(futures::future::ok),
save_inputs_on_error,
)
.await;
runtime.close().await?;
let proof = proof?;

std::io::stdout().write_all(&serde_json::to_vec(&proof.intern)?)?;
std::io::stdout().write_all(&serde_json::to_vec(&proof)?)?;

Ok(())
}
105 changes: 40 additions & 65 deletions prover/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::future::Future;

use alloy::primitives::U256;
use anyhow::Result;
#[cfg(feature = "test_only")]
use futures::stream::TryStreamExt;
use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt};
use num_traits::ToPrimitive as _;
use ops::TxProof;
use paladin::{
directive::{Directive, IndexedStream},
runtime::Runtime,
};
use proof_gen::{proof_types::GeneratedBlockProof, types::PlonkyProofIntern};
use proof_gen::proof_types::GeneratedBlockProof;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use trace_decoder::{
processed_block_trace::ProcessingMeta,
trace_protocol::BlockTrace,
Expand All @@ -39,7 +39,7 @@ impl BlockProverInput {
pub async fn prove(
self,
runtime: &Runtime,
previous: Option<PlonkyProofIntern>,
previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
save_inputs_on_error: bool,
) -> Result<GeneratedBlockProof> {
use anyhow::Context as _;
Expand Down Expand Up @@ -67,10 +67,11 @@ impl BlockProverInput {
let block_number = block_number
.to_u64()
.context("block number overflows u64")?;
let prev = previous.map(|p| GeneratedBlockProof {
b_height: block_number - 1,
intern: p,
});
let prev = if let Some(prev) = previous {
Some(prev.await?)
} else {
None
};

cpubot marked this conversation as resolved.
Show resolved Hide resolved
let block_proof = paladin::directive::Literal(proof)
.map(&ops::BlockProof {
Expand All @@ -91,7 +92,7 @@ impl BlockProverInput {
pub async fn prove(
self,
runtime: &Runtime,
_previous: Option<PlonkyProofIntern>,
_previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
save_inputs_on_error: bool,
) -> Result<GeneratedBlockProof> {
let block_number = self.get_block_number();
Expand Down Expand Up @@ -133,67 +134,41 @@ impl ProverInput {
pub async fn prove(
self,
runtime: &Runtime,
checkpoint_proof: Option<GeneratedBlockProof>,
previous_proof: Option<GeneratedBlockProof>,
save_inputs_on_error: bool,
) -> Result<Vec<GeneratedBlockProof>> {
let results: Arc<Mutex<HashMap<u64, Option<PlonkyProofIntern>>>> =
Arc::new(Mutex::new(HashMap::new()));
if let Some(checkpoint_proof) = checkpoint_proof {
results
.lock()
.await
.insert(checkpoint_proof.b_height, Some(checkpoint_proof.intern));
};

for block in self.blocks {
//todo this will be further reorganized with the new BlockProofFuture,
// running multiple block proof generation in parallel, and awaiting for the
// previous block proof when needed to prove the next block.
// For now prove blocks one by one sequentially and assume previous proof is
// already available
let results = results.clone();
async move {
let mut prev: Option<BoxFuture<Result<GeneratedBlockProof>>> =
previous_proof.map(|proof| Box::pin(futures::future::ok(proof)) as BoxFuture<_>);

let results: FuturesOrdered<_> = self
.blocks
.into_iter()
.map(|block| {
let block_number = block.get_block_number();
info!("Proving block {block_number}");
// For genesis block we don't have a previous proof, so
// previous_block_number would be None
let previous_block_number: Option<u64> = block_number
.checked_sub(U256::from(1))
.and_then(|n| n.to_u64());
let previous_proof = if let Some(number) = previous_block_number {
//todo could we optimize this to avoid this large proof cloning?
results.lock().await.get(&number).cloned().flatten()
} else {
None
};

let (tx, rx) = oneshot::channel::<GeneratedBlockProof>();

// Prove the block
let block_proof = block
.prove(runtime, previous_proof, save_inputs_on_error)
.await?;
results.lock().await.insert(
block_proof
.b_height
.to_u64()
.ok_or(anyhow::Error::msg("block number u64 overflow"))?,
Some(block_proof.intern),
);
info!("Proving block {block_number} finished!");
Result::<(), anyhow::Error>::Ok(())
}
.await?;
}
let fut = block
.prove(runtime, prev.take(), save_inputs_on_error)
.then(|proof| async {
let proof = proof?;

let mut results = results.lock().await;
results
.drain()
.map(|(block_number, intern)| {
Ok::<GeneratedBlockProof, anyhow::Error>(GeneratedBlockProof {
b_height: block_number,
intern: intern.ok_or_else(|| {
anyhow::Error::msg("missing proof for block {block_number}")
})?,
})
if tx.send(proof.clone()).is_err() {
anyhow::bail!("Failed to send proof");
}

Ok(proof)
})
.boxed();

prev = Some(Box::pin(rx.map_err(anyhow::Error::new)));

fut
})
.collect::<Result<Vec<_>, _>>()
.collect();

results.try_collect().await
}
}
6 changes: 3 additions & 3 deletions verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fs::File;
use anyhow::Result;
use clap::Parser;
use dotenvy::dotenv;
use proof_gen::types::PlonkyProofIntern;
use proof_gen::proof_types::GeneratedBlockProof;
use serde_json::Deserializer;
use tracing::info;

Expand All @@ -17,14 +17,14 @@ fn main() -> Result<()> {
let args = cli::Cli::parse();
let file = File::open(args.file_path)?;
let des = &mut Deserializer::from_reader(&file);
let input: PlonkyProofIntern = serde_path_to_error::deserialize(des)?;
let input: GeneratedBlockProof = serde_path_to_error::deserialize(des)?;

let verifer = args
.prover_state_config
.into_prover_state_manager()
.verifier()?;

match verifer.verify(&input) {
match verifer.verify(&input.intern) {
Ok(_) => info!("Proof verified successfully!"),
Err(e) => info!("Proof verification failed with error: {:?}", e),
};
Expand Down
Loading