Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,12 @@ impl Db {
///
/// Atomically clears `proving_inputs` for the given block, then walks forward from the
/// current proven-in-sequence tip through consecutive proven blocks, marking each as
/// proven-in-sequence. Returns the block numbers that were newly marked in-sequence.
/// proven-in-sequence. Returns the block number of the last marked in-sequence block.
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn mark_proven_and_advance_sequence(
&self,
block_num: BlockNumber,
) -> Result<Vec<BlockNumber>> {
) -> Result<BlockNumber> {
self.transact("mark block proven", move |conn| {
mark_proven_and_advance_sequence(conn, block_num)
})
Expand Down Expand Up @@ -829,41 +829,42 @@ impl Db {
/// 3. Walks forward from the current proven-in-sequence tip through consecutive proven blocks and
/// sets `proven_in_sequence = TRUE` for each.
///
/// Returns the new tip of blocks that are proven in-sequence.
///
/// Returns [`DatabaseError::DataCorrupted`] if any proven-but-not-in-sequence block is found at
/// or below the current tip, as that indicates a consistency bug.
pub(crate) fn mark_proven_and_advance_sequence(
conn: &mut SqliteConnection,
block_num: BlockNumber,
) -> Result<Vec<BlockNumber>, DatabaseError> {
) -> Result<BlockNumber, DatabaseError> {
// Clear proving_inputs for the specified block.
models::queries::clear_block_proving_inputs(conn, block_num)?;

// Get the current proven-in-sequence tip (highest in-sequence).
let mut tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?;
let current_tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?;
let mut new_tip = current_tip;

// Get all blocks that are proven but not yet marked in-sequence.
let unsequenced = models::queries::select_proven_not_in_sequence_blocks(conn)?;

// Walk forward from the tip through consecutive proven blocks.
let mut newly_in_sequence = Vec::new();
for candidate in unsequenced {
if candidate <= tip {
if candidate <= current_tip {
return Err(DatabaseError::DataCorrupted(format!(
"block {candidate} is proven but not marked in-sequence while the tip is at {tip}"
"block {candidate} is proven but not marked in-sequence while the tip is at {current_tip}"
)));
}
if candidate == tip + 1 {
tip = candidate;
newly_in_sequence.push(candidate);
if candidate == new_tip + 1 {
Comment thread
sergerad marked this conversation as resolved.
Outdated
new_tip = candidate;
} else {
break;
}
}

// Mark the newly contiguous blocks as proven-in-sequence.
if let (Some(&from), Some(&to)) = (newly_in_sequence.first(), newly_in_sequence.last()) {
models::queries::mark_blocks_as_proven_in_sequence(conn, from, to)?;
if new_tip > current_tip {
models::queries::mark_blocks_as_proven_in_sequence(conn, current_tip + 1, new_tip)?;
}

Ok(newly_in_sequence)
Ok(new_tip)
}
41 changes: 19 additions & 22 deletions crates/store/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3775,9 +3775,9 @@ fn mark_block_proven_advances_in_sequence_for_consecutive_blocks() {

// Mark all three as proven in order. Each call atomically advances the in-sequence tip.
for i in 1u32..=3 {
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(i)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(i)]);
assert_eq!(new_tip, BlockNumber::from(i));
}

let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
Expand All @@ -3795,17 +3795,17 @@ fn mark_block_proven_with_hole_does_not_advance_past_gap() {
}

// Prove block 1 — advances tip to 1.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(1u32)]);
assert_eq!(new_tip, BlockNumber::from(1u32));

// Prove blocks 3, 4 (skipping 2) — cannot advance past the gap.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap();
assert!(advanced.is_empty());
let advanced =
assert_eq!(new_tip, BlockNumber::from(1u32));
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap();
assert!(advanced.is_empty());
assert_eq!(new_tip, BlockNumber::from(1u32));

// Latest proven in sequence should be 1 (blocks 3, 4 are proven but not in sequence).
let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
Expand All @@ -3823,28 +3823,25 @@ fn mark_block_proven_filling_hole_advances_through_all_consecutive() {
}

// Prove blocks out of order: 1, 3, 4 first.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(1u32)]);
let advanced =
assert_eq!(new_tip, BlockNumber::from(1u32));
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap();
assert!(advanced.is_empty());
let advanced =
assert_eq!(new_tip, BlockNumber::from(1u32));
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap();
assert!(advanced.is_empty());
assert_eq!(new_tip, BlockNumber::from(1u32));

assert_eq!(
queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(),
BlockNumber::from(1u32),
);

// Now prove block 2, filling the hole. Should advance through 2, 3, 4.
let advanced =
// Now prove block 2, filling the hole. Should advance tip through to 4.
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(2u32)).unwrap();
assert_eq!(
advanced,
vec![BlockNumber::from(2u32), BlockNumber::from(3u32), BlockNumber::from(4u32)],
);
assert_eq!(new_tip, BlockNumber::from(4u32));

// Now all blocks through 4 are proven in sequence.
let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
Expand Down Expand Up @@ -3894,9 +3891,9 @@ fn mark_block_proven_is_idempotent_for_in_sequence() {
create_unproven_block(&mut conn, BlockNumber::from(1u32));

// First call marks block 1 proven and advances it in-sequence.
let advanced =
let new_tip =
super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap();
assert_eq!(advanced, vec![BlockNumber::from(1u32)]);
assert_eq!(new_tip, BlockNumber::from(1u32));

let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap();
assert_eq!(latest, BlockNumber::from(1u32));
Expand Down
85 changes: 45 additions & 40 deletions crates/store/src/server/proof_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! as proven, the database atomically advances the `proven_in_sequence` column for all blocks
//! that now form a contiguous proven sequence from genesis.
//! 4. On transient errors (DB reads, prover failures, timeouts), the failed block is retried
//! internally within its proving task.
//! internally within its proving task, subject to an overall per-block time budget.
//! 5. On fatal errors (e.g. deserialization failures, missing proving inputs), the scheduler
//! returns the error to the caller for node shutdown.

Expand All @@ -34,8 +34,11 @@ use crate::server::block_prover_client::{BlockProver, StoreProverError};
// CONSTANTS
// ================================================================================================

/// Overall timeout for proving a single block.
const BLOCK_PROVE_TIMEOUT: Duration = Duration::from_mins(4);
/// Timeout for a single block proof attempt (per-retry).
const BLOCK_PROVE_ATTEMPT_TIMEOUT: Duration = Duration::from_mins(4);

/// Overall timeout for proving a single block (across all retries).
const BLOCK_PROVE_OVERALL_TIMEOUT: Duration = Duration::from_mins(12);

/// Default maximum number of blocks being proven concurrently.
pub const DEFAULT_MAX_CONCURRENT_PROOFS: NonZeroUsize = NonZeroUsize::new(8).unwrap();
Expand Down Expand Up @@ -173,45 +176,47 @@ async fn prove_block(
block_store: &BlockStore,
block_num: BlockNumber,
) -> anyhow::Result<()> {
const MAX_RETRIES: u32 = 10;

for _ in 0..MAX_RETRIES {
match tokio::time::timeout(
BLOCK_PROVE_TIMEOUT,
generate_block_proof(db, block_prover, block_num),
)
.await
{
Ok(Ok(proof)) => {
// Save the block proof to file.
block_store.save_proof(block_num, &proof.to_bytes()).await?;

// Mark the block as proven and advance the sequence in the database.
let advanced_in_sequence = db.mark_proven_and_advance_sequence(block_num).await?;
if let Some(&last) = advanced_in_sequence.last() {
info!(
target = COMPONENT,
block.number = %block_num,
proven_in_sequence_tip = %last,
"Block proven and in-sequence advanced",
);
} else {
info!(target = COMPONENT, block.number = %block_num, "Block proven");
}
tokio::time::timeout(BLOCK_PROVE_OVERALL_TIMEOUT, async {
let mut attempt: u32 = 0;
loop {
attempt += 1;
match tokio::time::timeout(
BLOCK_PROVE_ATTEMPT_TIMEOUT,
generate_block_proof(db, block_prover, block_num),
Comment thread
SantiagoPittella marked this conversation as resolved.
)
.await
{
Ok(Ok(proof)) => {
// Save the block proof to file.
block_store.save_proof(block_num, &proof.to_bytes()).await?;

// Mark the block as proven and advance the sequence in the database.
let new_tip = db.mark_proven_and_advance_sequence(block_num).await?;
if new_tip == block_num {
info!(target = COMPONENT, block.number = %block_num, "Block proven");
} else {
info!(
target = COMPONENT,
block.number = %block_num,
%new_tip,
Comment thread
sergerad marked this conversation as resolved.
Outdated
"Block proven and in-sequence advanced",
);
}

return Ok(());
},
Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?,
Ok(Err(ProveBlockError::Transient(err))) => {
error!(target = COMPONENT, block.number = %block_num, err = ?err, "transient error proving block, retrying");
},
Err(elapsed) => {
error!(target = COMPONENT, block.number = %block_num, %elapsed, "block proving timed out, retrying");
},
return Ok(());
},
Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?,
Ok(Err(ProveBlockError::Transient(err))) => {
error!(target = COMPONENT, block.number = %block_num, attempt, err = ?err, "transient error proving block, retrying");
},
Err(elapsed) => {
error!(target = COMPONENT, block.number = %block_num, attempt, %elapsed, "block proving attempt timed out, retrying");
},
}
}
}

anyhow::bail!("maximum retries ({MAX_RETRIES}) exceeded");
})
.await
.context(format!("block proving overall timeout ({BLOCK_PROVE_OVERALL_TIMEOUT:?}) exceeded"))?
}

/// Generates a block proof by loading inputs from the DB and invoking the block prover.
Expand Down
Loading