Skip to content
2 changes: 1 addition & 1 deletion crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl Db {
///
/// This includes the genesis block, which is not technically proven, but treated as such.
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_latest_proven_in_sequence_block_num(&self) -> Result<BlockNumber> {
pub async fn proven_chain_tip(&self) -> Result<BlockNumber> {
self.transact("select latest proven block num", |conn| {
models::queries::select_latest_proven_in_sequence_block_num(conn)
})
Expand Down
3 changes: 2 additions & 1 deletion crates/store/src/server/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::server::api::{
validate_note_commitments,
validate_nullifiers,
};
use crate::state::Finality;

// BLOCK PRODUCER ENDPOINTS
// ================================================================================================
Expand Down Expand Up @@ -207,7 +208,7 @@ impl block_producer_server::BlockProducer for StoreApi {
.inspect_err(|err| tracing::Span::current().set_error(err))
.map_err(|err| tonic::Status::internal(err.as_report()))?;

let block_height = self.state.latest_block_num().await.as_u32();
let block_height = self.state.chain_tip(Finality::Committed).await.as_u32();

Ok(Response::new(proto::store::TransactionInputs {
account_state: Some(proto::store::transaction_inputs::AccountTransactionInputRecord {
Expand Down
120 changes: 83 additions & 37 deletions crates/store/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions};
use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
use miden_node_utils::tracing::grpc::grpc_trace_fn;
use tokio::net::TcpListener;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::trace::TraceLayer;
Expand Down Expand Up @@ -92,35 +93,96 @@ impl Store {
block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_options.request_timeout,
"Loading database");

// Load initial state.
let (termination_ask, mut termination_signal) =
tokio::sync::mpsc::channel::<ApplyBlockError>(1);
let state = Arc::new(
let (state, tx_proven_tip) =
State::load(&self.data_directory, self.storage_options, termination_ask)
.await
.context("failed to load state")?,
);
.context("failed to load state")?;

// Spawn proof scheduler.
let (proof_scheduler_task, chain_tip_sender) = Self::spawn_proof_scheduler(
&state,
self.block_prover_url,
self.max_concurrent_proofs,
tx_proven_tip,
)
.await;

// Initialize local or remote block prover.
let block_prover = if let Some(url) = self.block_prover_url {
// Spawn gRPC Servers.
let mut join_set = Self::spawn_grpc_servers(
state,
chain_tip_sender,
self.grpc_options,
self.rpc_listener,
self.ntx_builder_listener,
self.block_producer_listener,
)?;

// Wait on any workload to finish / error out.
let service = async move {
join_set.join_next().await.expect("joinset is not empty")?.map_err(Into::into)
};
tokio::select! {
result = service => result,
Some(err) = termination_signal.recv() => {
Err(anyhow::anyhow!("received termination signal").context(err))
},
result = proof_scheduler_task => {
match result {
Ok(Ok(())) => Err(anyhow::anyhow!("proof scheduler exited unexpectedly")),
Ok(Err(err)) => Err(err.context("proof scheduler fatal error")),
Err(join_err) => Err(join_err).context("proof scheduler panicked"),
}
}
}
}

/// Initializes the block prover client and spawns the proof scheduler as a background task.
///
/// Returns the scheduler task handle and the chain tip sender (needed by gRPC services to
/// notify the scheduler of new blocks).
async fn spawn_proof_scheduler(
state: &State,
block_prover_url: Option<Url>,
max_concurrent_proofs: NonZeroUsize,
proven_tip_tx: watch::Sender<miden_protocol::block::BlockNumber>,
) -> (
tokio::task::JoinHandle<anyhow::Result<()>>,
watch::Sender<miden_protocol::block::BlockNumber>,
) {
let block_prover = if let Some(url) = block_prover_url {
Arc::new(BlockProver::remote(url))
} else {
Arc::new(BlockProver::local())
};

// Initialize the chain tip watch channel.
let chain_tip = state.latest_block_num().await;
let (chain_tip_sender, chain_tip_rx) = tokio::sync::watch::channel(chain_tip);
let chain_tip = state.chain_tip(crate::state::Finality::Committed).await;
let (chain_tip_tx, chain_tip_rx) = watch::channel(chain_tip);

// Spawn the proof scheduler as a background task. It will immediately pick up any
// unproven blocks from previous runs and begin proving them.
let proof_scheduler_task = proof_scheduler::spawn(
let handle = proof_scheduler::spawn(
state.db().clone(),
block_prover,
state.block_store(),
chain_tip_rx,
self.max_concurrent_proofs,
proven_tip_tx,
max_concurrent_proofs,
);

(handle, chain_tip_tx)
}

/// Spawns the gRPC servers and the DB maintenance background task.
fn spawn_grpc_servers(
state: State,
chain_tip_sender: watch::Sender<miden_protocol::block::BlockNumber>,
grpc_options: GrpcOptionsInternal,
rpc_listener: TcpListener,
ntx_builder_listener: TcpListener,
block_producer_listener: TcpListener,
) -> anyhow::Result<JoinSet<Result<(), tonic::transport::Error>>> {
let state = Arc::new(state);
let rpc_service = store::rpc_server::RpcServer::new(api::StoreApi {
state: Arc::clone(&state),
chain_tip_sender: chain_tip_sender.clone(),
Expand Down Expand Up @@ -150,61 +212,45 @@ impl Store {
//
// 5 minutes seems like a reasonable interval, where this should have minimal database
// IO impact while providing a decent view into table growth over time.
let mut interval = tokio::time::interval(Duration::from_secs(5 * 60));
let database = Arc::clone(&state);
let mut interval = tokio::time::interval(Duration::from_mins(5));
loop {
interval.tick().await;
let _ = database.analyze_table_sizes().await;
let _ = state.analyze_table_sizes().await;
}
});

// Build the gRPC server with the API services and trace layer.
join_set.spawn(
tonic::transport::Server::builder()
.timeout(self.grpc_options.request_timeout)
.timeout(grpc_options.request_timeout)
.layer(CatchPanicLayer::custom(catch_panic_layer_fn))
.layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
.add_service(rpc_service)
.add_service(reflection_service.clone())
.serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
.serve_with_incoming(TcpListenerStream::new(rpc_listener)),
);

join_set.spawn(
tonic::transport::Server::builder()
.timeout(self.grpc_options.request_timeout)
.timeout(grpc_options.request_timeout)
.layer(CatchPanicLayer::custom(catch_panic_layer_fn))
.layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
.add_service(ntx_builder_service)
.add_service(reflection_service.clone())
.serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
.serve_with_incoming(TcpListenerStream::new(ntx_builder_listener)),
);

join_set.spawn(
tonic::transport::Server::builder()
.accept_http1(true)
.timeout(self.grpc_options.request_timeout)
.timeout(grpc_options.request_timeout)
.layer(CatchPanicLayer::custom(catch_panic_layer_fn))
.layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
.add_service(block_producer_service)
.add_service(reflection_service)
.serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
.serve_with_incoming(TcpListenerStream::new(block_producer_listener)),
);

// SAFETY: The joinset is definitely not empty.
let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) };
tokio::select! {
result = service => result,
Some(err) = termination_signal.recv() => {
Err(anyhow::anyhow!("received termination signal").context(err))
},
result = proof_scheduler_task => {
match result {
Ok(Ok(())) => Err(anyhow::anyhow!("proof scheduler exited unexpectedly")),
Ok(Err(err)) => Err(err.context("proof scheduler fatal error")),
Err(join_err) => Err(join_err).context("proof scheduler panicked"),
}
}
}
Ok(join_set)
}
}

Expand Down
11 changes: 6 additions & 5 deletions crates/store/src/server/ntx_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::server::api::{
read_block_range,
read_root,
};
use crate::state::Finality;

// NTX BUILDER ENDPOINTS
// ================================================================================================
Expand Down Expand Up @@ -145,7 +146,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi {
) -> Result<Response<proto::store::NetworkAccountIdList>, Status> {
let request = request.into_inner();

let mut chain_tip = self.state.latest_block_num().await;
let mut chain_tip = self.state.chain_tip(Finality::Committed).await;
let block_range =
read_block_range::<GetNetworkAccountIdsError>(Some(request), "GetNetworkAccountIds")?
.into_inclusive_range::<GetNetworkAccountIdsError>(&chain_tip)?;
Expand All @@ -159,7 +160,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi {
last_block_included = chain_tip;
}

chain_tip = self.state.latest_block_num().await;
chain_tip = self.state.chain_tip(Finality::Committed).await;

Ok(Response::new(proto::store::NetworkAccountIdList {
account_ids,
Expand Down Expand Up @@ -251,7 +252,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi {
let block_num = if let Some(num) = request.block_num {
num.into()
} else {
self.state.latest_block_num().await
self.state.chain_tip(Finality::Committed).await
};

// Retrieve the asset witnesses.
Expand Down Expand Up @@ -305,7 +306,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi {
let block_num = if let Some(num) = request.block_num {
num.into()
} else {
self.state.latest_block_num().await
self.state.chain_tip(Finality::Committed).await
};

// Retrieve the storage map witness.
Expand All @@ -322,7 +323,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi {
key: Some(map_key.into()),
proof: Some(proof.into()),
}),
block_num: self.state.latest_block_num().await.as_u32(),
block_num: self.state.chain_tip(Finality::Committed).await.as_u32(),
}))
}
}
26 changes: 21 additions & 5 deletions crates/store/src/server/proof_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,18 @@ impl ProofTaskJoinSet {
db: &Arc<Db>,
block_prover: &Arc<BlockProver>,
block_store: &Arc<BlockStore>,
proven_tip_tx: &watch::Sender<BlockNumber>,
block_num: BlockNumber,
) {
let db = Arc::clone(db);
let block_prover = Arc::clone(block_prover);
let block_store = Arc::clone(block_store);
self.0
.spawn(async move { prove_block(&db, &block_prover, &block_store, block_num).await });
self.0.spawn({
let proven_tip_tx = proven_tip_tx.clone();
async move {
prove_block(&db, &block_prover, &block_store, &proven_tip_tx, block_num).await
}
});
}

/// Returns the result of the next completed task, or pends forever if the set is empty.
Expand Down Expand Up @@ -98,9 +103,17 @@ pub fn spawn(
block_prover: Arc<BlockProver>,
block_store: Arc<BlockStore>,
chain_tip_rx: watch::Receiver<BlockNumber>,
proven_tip_tx: watch::Sender<BlockNumber>,
max_concurrent_proofs: NonZeroUsize,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(run(db, block_prover, block_store, chain_tip_rx, max_concurrent_proofs))
tokio::spawn(run(
db,
block_prover,
block_store,
chain_tip_rx,
proven_tip_tx,
max_concurrent_proofs,
))
}

/// Main loop of the proof scheduler.
Expand All @@ -117,6 +130,7 @@ async fn run(
block_prover: Arc<BlockProver>,
block_store: Arc<BlockStore>,
mut chain_tip_rx: watch::Receiver<BlockNumber>,
proven_tip_tx: watch::Sender<BlockNumber>,
max_concurrent_proofs: NonZeroUsize,
) -> anyhow::Result<()> {
info!(target: COMPONENT, "Proof scheduler started");
Expand All @@ -127,7 +141,7 @@ async fn run(
// Highest block number that is in-flight or has been proven. Used to avoid re-querying
// blocks we've already scheduled. Initialized from the in-sequence tip so we skip
// already-proven blocks on restart.
let mut highest_scheduled = db.select_latest_proven_in_sequence_block_num().await?;
let mut highest_scheduled = db.proven_chain_tip().await?;

loop {
// Query the DB for unproven blocks beyond what we've already scheduled.
Expand All @@ -140,7 +154,7 @@ async fn run(
}

for block_num in unproven {
join_set.spawn(&db, &block_prover, &block_store, block_num);
join_set.spawn(&db, &block_prover, &block_store, &proven_tip_tx, block_num);
}
}

Expand Down Expand Up @@ -171,6 +185,7 @@ async fn prove_block(
db: &Db,
block_prover: &BlockProver,
block_store: &BlockStore,
proven_tip_tx: &watch::Sender<BlockNumber>,
block_num: BlockNumber,
) -> anyhow::Result<()> {
const MAX_RETRIES: u32 = 10;
Expand All @@ -189,6 +204,7 @@ async fn prove_block(
// Mark the block as proven and advance the sequence in the database.
let advanced_in_sequence = db.mark_proven_and_advance_sequence(block_num).await?;
if let Some(&last) = advanced_in_sequence.last() {
proven_tip_tx.send(last)?;
info!(
target = COMPONENT,
block.number = %block_num,
Expand Down
Loading
Loading