diff --git a/crates/store/src/blocks.rs b/crates/store/src/blocks.rs index ad34ff0dd..94ae9166a 100644 --- a/crates/store/src/blocks.rs +++ b/crates/store/src/blocks.rs @@ -102,9 +102,9 @@ impl BlockStore { #[instrument( target = COMPONENT, name = "store.block_store.save_proof", - skip(self, data), + skip_all, err, - fields(proof_size = data.len()) + fields(block.number = block_num.as_u32(), proof_size = data.len()) )] pub async fn save_proof(&self, block_num: BlockNumber, data: &[u8]) -> std::io::Result<()> { let (epoch_path, proof_path) = self.epoch_proof_path(block_num)?; diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 51e487cab..316f38e22 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -565,12 +565,15 @@ impl Db { /// /// Atomically clears `proving_inputs` for the given block, then walks forward from the /// current proven-in-sequence tip through consecutive proven blocks, marking each as - /// proven-in-sequence. Returns the block numbers that were newly marked in-sequence. + /// proven-in-sequence. + /// + /// Returns the new tip of blocks that are proven in-sequence (which may have been unchanged by + /// this function). #[instrument(target = COMPONENT, skip_all, err)] pub async fn mark_proven_and_advance_sequence( &self, block_num: BlockNumber, - ) -> Result> { + ) -> Result { self.transact("mark block proven", move |conn| { mark_proven_and_advance_sequence(conn, block_num) }) @@ -829,41 +832,46 @@ impl Db { /// 3. Walks forward from the current proven-in-sequence tip through consecutive proven blocks and /// sets `proven_in_sequence = TRUE` for each. /// +/// Returns the new tip of blocks that are proven in-sequence (which may have been unchanged by this +/// function). +/// /// Returns [`DatabaseError::DataCorrupted`] if any proven-but-not-in-sequence block is found at /// or below the current tip, as that indicates a consistency bug. pub(crate) fn mark_proven_and_advance_sequence( conn: &mut SqliteConnection, block_num: BlockNumber, -) -> Result, DatabaseError> { +) -> Result { // Clear proving_inputs for the specified block. models::queries::clear_block_proving_inputs(conn, block_num)?; // Get the current proven-in-sequence tip (highest in-sequence). - let mut tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?; + let current_tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?; + let mut new_tip = current_tip; // Get all blocks that are proven but not yet marked in-sequence. let unsequenced = models::queries::select_proven_not_in_sequence_blocks(conn)?; // Walk forward from the tip through consecutive proven blocks. - let mut newly_in_sequence = Vec::new(); for candidate in unsequenced { - if candidate <= tip { + if candidate <= current_tip { return Err(DatabaseError::DataCorrupted(format!( - "block {candidate} is proven but not marked in-sequence while the tip is at {tip}" + "block {candidate} is proven but not marked in-sequence while the tip is at {current_tip}" ))); } - if candidate == tip + 1 { - tip = candidate; - newly_in_sequence.push(candidate); + if candidate == new_tip.child() { + // Walk the tip forward. + new_tip = candidate; } else { + // Sequence has been broken. Discontinue walking tip forward. break; } } // Mark the newly contiguous blocks as proven-in-sequence. - if let (Some(&from), Some(&to)) = (newly_in_sequence.first(), newly_in_sequence.last()) { - models::queries::mark_blocks_as_proven_in_sequence(conn, from, to)?; + if new_tip > current_tip { + let block_from = current_tip.child(); + models::queries::mark_blocks_as_proven_in_sequence(conn, block_from, new_tip)?; } - Ok(newly_in_sequence) + Ok(new_tip) } diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index 3fa465a0a..28ddb54d7 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -3775,9 +3775,9 @@ fn mark_block_proven_advances_in_sequence_for_consecutive_blocks() { // Mark all three as proven in order. Each call atomically advances the in-sequence tip. for i in 1u32..=3 { - let advanced = + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(i)).unwrap(); - assert_eq!(advanced, vec![BlockNumber::from(i)]); + assert_eq!(new_tip, BlockNumber::from(i)); } let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); @@ -3795,17 +3795,17 @@ fn mark_block_proven_with_hole_does_not_advance_past_gap() { } // Prove block 1 — advances tip to 1. - let advanced = + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap(); - assert_eq!(advanced, vec![BlockNumber::from(1u32)]); + assert_eq!(new_tip, BlockNumber::from(1u32)); // Prove blocks 3, 4 (skipping 2) — cannot advance past the gap. - let advanced = + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap(); - assert!(advanced.is_empty()); - let advanced = + assert_eq!(new_tip, BlockNumber::from(1u32)); + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap(); - assert!(advanced.is_empty()); + assert_eq!(new_tip, BlockNumber::from(1u32)); // Latest proven in sequence should be 1 (blocks 3, 4 are proven but not in sequence). let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); @@ -3823,28 +3823,25 @@ fn mark_block_proven_filling_hole_advances_through_all_consecutive() { } // Prove blocks out of order: 1, 3, 4 first. - let advanced = + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap(); - assert_eq!(advanced, vec![BlockNumber::from(1u32)]); - let advanced = + assert_eq!(new_tip, BlockNumber::from(1u32)); + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap(); - assert!(advanced.is_empty()); - let advanced = + assert_eq!(new_tip, BlockNumber::from(1u32)); + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap(); - assert!(advanced.is_empty()); + assert_eq!(new_tip, BlockNumber::from(1u32)); assert_eq!( queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(), BlockNumber::from(1u32), ); - // Now prove block 2, filling the hole. Should advance through 2, 3, 4. - let advanced = + // Now prove block 2, filling the hole. Should advance tip through to 4. + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(2u32)).unwrap(); - assert_eq!( - advanced, - vec![BlockNumber::from(2u32), BlockNumber::from(3u32), BlockNumber::from(4u32)], - ); + assert_eq!(new_tip, BlockNumber::from(4u32)); // Now all blocks through 4 are proven in sequence. let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); @@ -3894,9 +3891,9 @@ fn mark_block_proven_is_idempotent_for_in_sequence() { create_unproven_block(&mut conn, BlockNumber::from(1u32)); // First call marks block 1 proven and advances it in-sequence. - let advanced = + let new_tip = super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap(); - assert_eq!(advanced, vec![BlockNumber::from(1u32)]); + assert_eq!(new_tip, BlockNumber::from(1u32)); let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); assert_eq!(latest, BlockNumber::from(1u32)); diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index 38fae3670..e9df2a509 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -8,7 +8,7 @@ //! as proven, the database atomically advances the `proven_in_sequence` column for all blocks //! that now form a contiguous proven sequence from genesis. //! 4. On transient errors (DB reads, prover failures, timeouts), the failed block is retried -//! internally within its proving task. +//! internally within its proving task, subject to an overall per-block time budget. //! 5. On fatal errors (e.g. deserialization failures, missing proving inputs), the scheduler //! returns the error to the caller for node shutdown. @@ -23,7 +23,7 @@ use miden_remote_prover_client::RemoteProverClientError; use thiserror::Error; use tokio::sync::watch; use tokio::task::{JoinHandle, JoinSet}; -use tracing::{error, info, instrument}; +use tracing::{Instrument, info, instrument}; use crate::COMPONENT; use crate::blocks::BlockStore; @@ -34,8 +34,11 @@ use crate::server::block_prover_client::{BlockProver, StoreProverError}; // CONSTANTS // ================================================================================================ -/// Overall timeout for proving a single block. -const BLOCK_PROVE_TIMEOUT: Duration = Duration::from_mins(4); +/// Timeout for a single block proof attempt (per-retry). +const BLOCK_PROVE_ATTEMPT_TIMEOUT: Duration = Duration::from_mins(4); + +/// Overall timeout for proving a single block (across all retries). +const BLOCK_PROVE_OVERALL_TIMEOUT: Duration = Duration::from_mins(12); /// Default maximum number of blocks being proven concurrently. pub const DEFAULT_MAX_CONCURRENT_PROOFS: NonZeroUsize = NonZeroUsize::new(8).unwrap(); @@ -166,52 +169,59 @@ async fn run( /// Proves a single block, saves the proof to the block store, marks the block as proven in the /// DB, and advances the proven-in-sequence tip. -#[instrument(target = COMPONENT, name = "prove_block", skip_all, fields(block.number=block_num.as_u32()), err)] +#[instrument(target = COMPONENT, name = "prove_block", skip_all, + fields( + block.number=block_num.as_u32(), + proven_chain_tip = tracing::field::Empty + ), err)] async fn prove_block( db: &Db, block_prover: &BlockProver, block_store: &BlockStore, block_num: BlockNumber, ) -> anyhow::Result<()> { - const MAX_RETRIES: u32 = 10; - - for _ in 0..MAX_RETRIES { - match tokio::time::timeout( - BLOCK_PROVE_TIMEOUT, - generate_block_proof(db, block_prover, block_num), - ) - .await - { - Ok(Ok(proof)) => { - // Save the block proof to file. - block_store.save_proof(block_num, &proof.to_bytes()).await?; - - // Mark the block as proven and advance the sequence in the database. - let advanced_in_sequence = db.mark_proven_and_advance_sequence(block_num).await?; - if let Some(&last) = advanced_in_sequence.last() { - info!( - target = COMPONENT, - block.number = %block_num, - proven_in_sequence_tip = %last, - "Block proven and in-sequence advanced", - ); - } else { - info!(target = COMPONENT, block.number = %block_num, "Block proven"); - } + tokio::time::timeout(BLOCK_PROVE_OVERALL_TIMEOUT, async { + let mut attempt: u32 = 0; + loop { + attempt += 1; + let attempt_span = tracing::info_span!( + target: COMPONENT, + "prove_attempt", + attempt, + error = tracing::field::Empty, + timed_out = tracing::field::Empty, + ); + let result = tokio::time::timeout( + BLOCK_PROVE_ATTEMPT_TIMEOUT, + generate_block_proof(db, block_prover, block_num), + ) + .instrument(attempt_span.clone()) + .await; + match result { + Ok(Ok(proof)) => { + // Save the block proof to file. + block_store.save_proof(block_num, &proof.to_bytes()).await?; + + // Mark the block as proven and advance the sequence in the database. + let tip = db.mark_proven_and_advance_sequence(block_num).await?; + tracing::Span::current().record("proven_chain_tip", tip.as_u32()); - return Ok(()); - }, - Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?, - Ok(Err(ProveBlockError::Transient(err))) => { - error!(target = COMPONENT, block.number = %block_num, err = ?err, "transient error proving block, retrying"); - }, - Err(elapsed) => { - error!(target = COMPONENT, block.number = %block_num, %elapsed, "block proving timed out, retrying"); - }, + return Ok(()); + }, + Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?, + Ok(Err(ProveBlockError::Transient(err))) => { + attempt_span.record("error", tracing::field::display(&err)); + }, + Err(elapsed) => { + attempt_span.record("timed_out", elapsed.to_string()); + }, + } } - } - - anyhow::bail!("maximum retries ({MAX_RETRIES}) exceeded"); + }) + .await + .context(format!( + "block proving overall timeout ({BLOCK_PROVE_OVERALL_TIMEOUT:?}) exceeded" + ))? } /// Generates a block proof by loading inputs from the DB and invoking the block prover.