Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/store/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ impl BlockStore {
#[instrument(
target = COMPONENT,
name = "store.block_store.save_proof",
skip(self, data),
skip_all,
err,
fields(proof_size = data.len())
fields(block.number = block_num.as_u32(), proof_size = data.len())
)]
pub async fn save_proof(&self, block_num: BlockNumber, data: &[u8]) -> std::io::Result<()> {
let (epoch_path, proof_path) = self.epoch_proof_path(block_num)?;
Expand Down
34 changes: 21 additions & 13 deletions crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,15 @@ impl Db {
///
/// Atomically clears `proving_inputs` for the given block, then walks forward from the
/// current proven-in-sequence tip through consecutive proven blocks, marking each as
/// proven-in-sequence. Returns the block numbers that were newly marked in-sequence.
/// proven-in-sequence.
///
/// Returns the new tip of blocks that are proven in-sequence (which may have been unchanged by
/// this function).
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn mark_proven_and_advance_sequence(
&self,
block_num: BlockNumber,
) -> Result<Vec<BlockNumber>> {
) -> Result<BlockNumber> {
self.transact("mark block proven", move |conn| {
mark_proven_and_advance_sequence(conn, block_num)
})
Expand Down Expand Up @@ -829,41 +832,46 @@ impl Db {
/// 3. Walks forward from the current proven-in-sequence tip through consecutive proven blocks and
/// sets `proven_in_sequence = TRUE` for each.
///
/// Returns the new tip of blocks that are proven in-sequence (which may have been unchanged by this
/// function).
///
/// Returns [`DatabaseError::DataCorrupted`] if any proven-but-not-in-sequence block is found at
/// or below the current tip, as that indicates a consistency bug.
pub(crate) fn mark_proven_and_advance_sequence(
conn: &mut SqliteConnection,
block_num: BlockNumber,
) -> Result<Vec<BlockNumber>, DatabaseError> {
) -> Result<BlockNumber, DatabaseError> {
// Clear proving_inputs for the specified block.
models::queries::clear_block_proving_inputs(conn, block_num)?;

// Get the current proven-in-sequence tip (highest in-sequence).
let mut tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?;
let current_tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?;
let mut new_tip = current_tip;

// Get all blocks that are proven but not yet marked in-sequence.
let unsequenced = models::queries::select_proven_not_in_sequence_blocks(conn)?;

// Walk forward from the tip through consecutive proven blocks.
let mut newly_in_sequence = Vec::new();
for candidate in unsequenced {
if candidate <= tip {
if candidate <= current_tip {
return Err(DatabaseError::DataCorrupted(format!(
"block {candidate} is proven but not marked in-sequence while the tip is at {tip}"
"block {candidate} is proven but not marked in-sequence while the tip is at {current_tip}"
)));
}
if candidate == tip + 1 {
tip = candidate;
newly_in_sequence.push(candidate);
if candidate == new_tip.child() {
// Walk the tip forward.
new_tip = candidate;
} else {
// Sequence has been broken. Discontinue walking tip forward.
break;
}
}

// Mark the newly contiguous blocks as proven-in-sequence.
if let (Some(&from), Some(&to)) = (newly_in_sequence.first(), newly_in_sequence.last()) {
models::queries::mark_blocks_as_proven_in_sequence(conn, from, to)?;
if new_tip > current_tip {
let block_from = current_tip.child();
models::queries::mark_blocks_as_proven_in_sequence(conn, block_from, new_tip)?;
}

Ok(newly_in_sequence)
Ok(new_tip)
}
41 changes: 19 additions & 22 deletions crates/store/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3775,9 +3775,9 @@ fn mark_block_proven_advances_in_sequence_for_consecutive_blocks() {

// Mark all three as proven in order. Each call atomically advances the in-sequence tip.
for i in 1u32..=3 {
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(i)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(i)]);
assert_eq!(new_tip, BlockNumber::from(i));
}

let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
Expand All @@ -3795,17 +3795,17 @@ fn mark_block_proven_with_hole_does_not_advance_past_gap() {
}

// Prove block 1 — advances tip to 1.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(1u32)]);
assert_eq!(new_tip, BlockNumber::from(1u32));

// Prove blocks 3, 4 (skipping 2) — cannot advance past the gap.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap();
assert!(advanced.is_empty());
let advanced =
assert_eq!(new_tip, BlockNumber::from(1u32));
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap();
assert!(advanced.is_empty());
assert_eq!(new_tip, BlockNumber::from(1u32));

// Latest proven in sequence should be 1 (blocks 3, 4 are proven but not in sequence).
let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
Expand All @@ -3823,28 +3823,25 @@ fn mark_block_proven_filling_hole_advances_through_all_consecutive() {
}

// Prove blocks out of order: 1, 3, 4 first.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(1u32)]);
let advanced =
assert_eq!(new_tip, BlockNumber::from(1u32));
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap();
assert!(advanced.is_empty());
let advanced =
assert_eq!(new_tip, BlockNumber::from(1u32));
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap();
assert!(advanced.is_empty());
assert_eq!(new_tip, BlockNumber::from(1u32));

assert_eq!(
queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(),
BlockNumber::from(1u32),
);

// Now prove block 2, filling the hole. Should advance through 2, 3, 4.
let advanced =
// Now prove block 2, filling the hole. Should advance tip through to 4.
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(2u32)).unwrap();
assert_eq!(
advanced,
vec![BlockNumber::from(2u32), BlockNumber::from(3u32), BlockNumber::from(4u32)],
);
assert_eq!(new_tip, BlockNumber::from(4u32));

// Now all blocks through 4 are proven in sequence.
let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
Expand Down Expand Up @@ -3894,9 +3891,9 @@ fn mark_block_proven_is_idempotent_for_in_sequence() {
create_unproven_block(&mut conn, BlockNumber::from(1u32));

// First call marks block 1 proven and advances it in-sequence.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(1u32)]);
assert_eq!(new_tip, BlockNumber::from(1u32));

let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
assert_eq!(latest, BlockNumber::from(1u32));
Expand Down
94 changes: 52 additions & 42 deletions crates/store/src/server/proof_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! as proven, the database atomically advances the `proven_in_sequence` column for all blocks
//! that now form a contiguous proven sequence from genesis.
//! 4. On transient errors (DB reads, prover failures, timeouts), the failed block is retried
//! internally within its proving task.
//! internally within its proving task, subject to an overall per-block time budget.
//! 5. On fatal errors (e.g. deserialization failures, missing proving inputs), the scheduler
//! returns the error to the caller for node shutdown.

Expand All @@ -23,7 +23,7 @@ use miden_remote_prover_client::RemoteProverClientError;
use thiserror::Error;
use tokio::sync::watch;
use tokio::task::{JoinHandle, JoinSet};
use tracing::{error, info, instrument};
use tracing::{Instrument, info, instrument};

use crate::COMPONENT;
use crate::blocks::BlockStore;
Expand All @@ -34,8 +34,11 @@ use crate::server::block_prover_client::{BlockProver, StoreProverError};
// CONSTANTS
// ================================================================================================

/// Overall timeout for proving a single block.
const BLOCK_PROVE_TIMEOUT: Duration = Duration::from_mins(4);
/// Timeout for a single block proof attempt (per-retry).
const BLOCK_PROVE_ATTEMPT_TIMEOUT: Duration = Duration::from_mins(4);

/// Overall timeout for proving a single block (across all retries).
const BLOCK_PROVE_OVERALL_TIMEOUT: Duration = Duration::from_mins(12);

/// Default maximum number of blocks being proven concurrently.
pub const DEFAULT_MAX_CONCURRENT_PROOFS: NonZeroUsize = NonZeroUsize::new(8).unwrap();
Expand Down Expand Up @@ -166,52 +169,59 @@ async fn run(

/// Proves a single block, saves the proof to the block store, marks the block as proven in the
/// DB, and advances the proven-in-sequence tip.
#[instrument(target = COMPONENT, name = "prove_block", skip_all, fields(block.number=block_num.as_u32()), err)]
#[instrument(target = COMPONENT, name = "prove_block", skip_all,
fields(
block.number=block_num.as_u32(),
proven_chain_tip = tracing::field::Empty
), err)]
async fn prove_block(
db: &Db,
block_prover: &BlockProver,
block_store: &BlockStore,
block_num: BlockNumber,
) -> anyhow::Result<()> {
const MAX_RETRIES: u32 = 10;

for _ in 0..MAX_RETRIES {
match tokio::time::timeout(
BLOCK_PROVE_TIMEOUT,
generate_block_proof(db, block_prover, block_num),
)
.await
{
Ok(Ok(proof)) => {
// Save the block proof to file.
block_store.save_proof(block_num, &proof.to_bytes()).await?;

// Mark the block as proven and advance the sequence in the database.
let advanced_in_sequence = db.mark_proven_and_advance_sequence(block_num).await?;
if let Some(&last) = advanced_in_sequence.last() {
info!(
target = COMPONENT,
block.number = %block_num,
proven_in_sequence_tip = %last,
"Block proven and in-sequence advanced",
);
} else {
info!(target = COMPONENT, block.number = %block_num, "Block proven");
}
tokio::time::timeout(BLOCK_PROVE_OVERALL_TIMEOUT, async {
let mut attempt: u32 = 0;
loop {
attempt += 1;
let attempt_span = tracing::info_span!(
target: COMPONENT,
"prove_attempt",
attempt,
error = tracing::field::Empty,
timed_out = tracing::field::Empty,
);
let result = tokio::time::timeout(
BLOCK_PROVE_ATTEMPT_TIMEOUT,
generate_block_proof(db, block_prover, block_num),
)
.instrument(attempt_span.clone())
.await;
match result {
Ok(Ok(proof)) => {
// Save the block proof to file.
block_store.save_proof(block_num, &proof.to_bytes()).await?;

// Mark the block as proven and advance the sequence in the database.
let tip = db.mark_proven_and_advance_sequence(block_num).await?;
tracing::Span::current().record("proven_chain_tip", tip.as_u32());

return Ok(());
},
Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?,
Ok(Err(ProveBlockError::Transient(err))) => {
error!(target = COMPONENT, block.number = %block_num, err = ?err, "transient error proving block, retrying");
},
Err(elapsed) => {
error!(target = COMPONENT, block.number = %block_num, %elapsed, "block proving timed out, retrying");
},
return Ok(());
},
Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?,
Ok(Err(ProveBlockError::Transient(err))) => {
attempt_span.record("error", tracing::field::display(&err));
},
Err(elapsed) => {
attempt_span.record("timed_out", elapsed.to_string());
},
}
}
}

anyhow::bail!("maximum retries ({MAX_RETRIES}) exceeded");
})
.await
.context(format!(
"block proving overall timeout ({BLOCK_PROVE_OVERALL_TIMEOUT:?}) exceeded"
))?
}

/// Generates a block proof by loading inputs from the DB and invoking the block prover.
Expand Down
Loading