Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
9f9619d
An outline
Mirko-von-Leipzig Mar 16, 2026
c1e8295
Impl graph skeleton
Mirko-von-Leipzig Mar 16, 2026
bfd590c
Impl batch skeletons
Mirko-von-Leipzig Mar 16, 2026
7ce6f61
Delete vestigial
Mirko-von-Leipzig Mar 16, 2026
ed8b0b5
Mempool::add_transaction
Mirko-von-Leipzig Mar 17, 2026
62f2637
Mempool::PartialEq
Mirko-von-Leipzig Mar 17, 2026
8ac5241
Mempool::select_batch
Mirko-von-Leipzig Mar 17, 2026
41dc2f2
Mempool::rollback_batch
Mirko-von-Leipzig Mar 17, 2026
a6f6c6b
Mempool::commit_batch
Mirko-von-Leipzig Mar 17, 2026
4f82e3a
Mempool::select_block
Mirko-von-Leipzig Mar 17, 2026
b0f568d
Mempool::pending_block
Mirko-von-Leipzig Mar 17, 2026
9c1cb0e
Mempool::prune
Mirko-von-Leipzig Mar 17, 2026
b838067
Mempool::revert_expired
Mirko-von-Leipzig Mar 17, 2026
be61536
Mempool::rollback_block
Mirko-von-Leipzig Mar 17, 2026
28e8890
Mempool::authentication_staleness_check
Mirko-von-Leipzig Mar 18, 2026
6fcfb8d
Drop old graph impl
Mirko-von-Leipzig Mar 18, 2026
cdcae37
Track selected internally in graph
Mirko-von-Leipzig Mar 18, 2026
51f9e99
Move append checks into graph
Mirko-von-Leipzig Mar 18, 2026
ffdaea7
Implement pruning
Mirko-von-Leipzig Mar 18, 2026
cda6296
Implement descendents
Mirko-von-Leipzig Mar 18, 2026
0dbcd5f
Don't pop roots
Mirko-von-Leipzig Mar 18, 2026
e049301
Consider selected for expiration reversion
Mirko-von-Leipzig Mar 18, 2026
94aa884
Requeue transactions
Mirko-von-Leipzig Mar 18, 2026
682c8af
Some telemetry
Mirko-von-Leipzig Mar 18, 2026
a71d097
Fix account state tracking
Mirko-von-Leipzig Mar 19, 2026
a197fe6
Fix reversion
Mirko-von-Leipzig Mar 19, 2026
8a1b7fb
Lints
Mirko-von-Leipzig Mar 19, 2026
14fee6c
Move account states into separate file
Mirko-von-Leipzig Mar 19, 2026
c3fa580
Submodules
Mirko-von-Leipzig Mar 19, 2026
c96e116
Fix reverts
Mirko-von-Leipzig Mar 19, 2026
48103a4
flatten modules
Mirko-von-Leipzig Mar 19, 2026
73d3c00
Make selection candidates explicit
Mirko-von-Leipzig Mar 19, 2026
7c5ed5d
Fix batch word commitment
Mirko-von-Leipzig Mar 19, 2026
424a5bc
AI suggestions
Mirko-von-Leipzig Mar 20, 2026
93dd09d
Re-enable lints
Mirko-von-Leipzig Mar 20, 2026
a2097dd
Update errors
Mirko-von-Leipzig Mar 23, 2026
348a528
Fix bugs found by tests
Mirko-von-Leipzig Mar 23, 2026
0334290
Re-add telemetry
Mirko-von-Leipzig Mar 23, 2026
8bba7a8
AI tests
Mirko-von-Leipzig Mar 23, 2026
fa897ae
Address Serge comments
Mirko-von-Leipzig Mar 24, 2026
a6239aa
Revert txs after 3 failures
Mirko-von-Leipzig Mar 24, 2026
ccecfa5
Update tests to match reversion strategy
Mirko-von-Leipzig Mar 24, 2026
f01d124
Review comments
Mirko-von-Leipzig Mar 26, 2026
59877ef
Merge next
Mirko-von-Leipzig Mar 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/block-producer/src/domain/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl SelectedBatch {
self.txs
}

pub(crate) fn transactions(&self) -> &[Arc<AuthenticatedTransaction>] {
&self.txs
}

/// The aggregated list of account transitions this batch causes given as tuples of `(AccountId,
/// initial commitment, final commitment, Option<store commitment>)`.
///
Expand Down
66 changes: 49 additions & 17 deletions crates/block-producer/src/mempool/graph/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use miden_protocol::Word;
Expand Down Expand Up @@ -70,9 +70,17 @@ impl GraphNode for Arc<AuthenticatedTransaction> {
#[derive(Clone, Debug, PartialEq, Default)]
pub struct TransactionGraph {
inner: Graph<Arc<AuthenticatedTransaction>>,
/// The number of failures a transaction has participated in.
///
/// These are batch or block proving errors in which the transaction was a part of. This is
/// used to identify potentially buggy transactions that should be evicted.
failures: HashMap<TransactionId, u32>,
}

impl TransactionGraph {
/// Transactions are evicted after failing this number of times.
pub const FAILURE_LIMIT: u32 = 3;

pub fn append(&mut self, tx: Arc<AuthenticatedTransaction>) -> Result<(), StateConflict> {
self.inner.append(tx)
}
Expand Down Expand Up @@ -102,19 +110,7 @@ impl TransactionGraph {
/// Only unselected transactions are considered; selected transactions are assumed to be in
/// committed blocks and should not be reverted.
///
/// This is because we don't distinguish between committed and selected transactions. If we
/// didn't ignore selected transactions here, we would revert committed ones as well, which
/// breaks the state.
///
/// Returns the identifiers of transactions that were removed from the graph.
///
/// # Note
///
/// Since this _ignores_ selected transactions, and the purpose is to revert expired
/// transactions after a block is committed, the caller **must** ensure that selected
/// transactions from expired batches (and therefore not committed) are deselected
/// _before_ calling this function. i.e. first revert expired batches and deselect their
/// transactions, then call this.
pub fn revert_expired(&mut self, chain_tip: BlockNumber) -> HashSet<TransactionId> {
self.inner
.revert_expired_unselected(chain_tip)
Expand All @@ -134,24 +130,59 @@ impl TransactionGraph {
return Vec::default();
}

self.inner
let reverted = self
.inner
.revert_node_and_descendants(transaction)
.into_iter()
.map(|tx| tx.id())
.collect()
.collect();

for tx in &reverted {
self.failures.remove(tx);
}

reverted
}

/// Marks the batch's transactions are ready for selection again.
///
/// # Panics
///
/// Panics if the given batch has any child batches which are still in flight.
pub fn requeue_transactions(&mut self, batch: SelectedBatch) {
for tx in batch.into_transactions().iter().rev() {
pub fn requeue_transactions(&mut self, batch: &SelectedBatch) {
for tx in batch.transactions().iter().rev() {
self.inner.deselect(tx.id());
}
}

/// Increments each transaction's failure counter, and reverts transactions which exceed the
/// failure limit.
///
/// This weeds out transactions which participate in batch and block failures, and might be the
/// root cause.
pub fn increment_failure_count(
&mut self,
txs: impl Iterator<Item = TransactionId>,
) -> HashSet<TransactionId> {
let mut to_revert = Vec::default();

for tx in txs {
let count = self.failures.entry(tx).or_default();
*count += 1;

if *count >= Self::FAILURE_LIMIT {
to_revert.push(tx);
}
}

let mut reverted = HashSet::default();
for tx in to_revert {
reverted.extend(self.revert_tx_and_descendants(tx));
}

reverted
}

/// Prunes the given transaction.
///
/// # Panics
Expand All @@ -160,6 +191,7 @@ impl TransactionGraph {
/// graph.
pub fn prune(&mut self, transaction: TransactionId) {
self.inner.prune(transaction);
self.failures.remove(&transaction);
}

/// Number of transactions which have not been selected for inclusion in a batch.
Expand Down
66 changes: 38 additions & 28 deletions crates/block-producer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,20 @@ impl Mempool {
}

let reverted_batches = self.batches.revert_batch_and_descendants(batch);
for reverted in reverted_batches {
for reverted in &reverted_batches {
self.transactions.requeue_transactions(reverted);
}

// Find rolled back batch to mark its txs as failed.
//
// Note that its possible it doesn't exist, since this batch could have already been
// reverted as part of a separate rollback.
if let Some(batch) = reverted_batches.iter().find(|reverted| reverted.id() == batch) {
let failed_txs = batch.transactions().iter().map(|tx| tx.id());
let reverted_txs = self.transactions.increment_failure_count(failed_txs);
self.subscription.txs_reverted(reverted_txs);
}

self.inject_telemetry();
}

Expand Down Expand Up @@ -310,7 +321,6 @@ impl Mempool {
let block_number = self.chain_tip.child();
let batches = self.batches.select_block(self.config.block_budget);
let block = SelectedBlock { block_number, batches };

self.pending_block = Some(block.clone());
self.inject_telemetry();
block
Expand All @@ -324,9 +334,9 @@ impl Mempool {
/// Sends a [`MempoolEvent::BlockCommitted`] event to subscribers, as well as a
/// [`MempoolEvent::TransactionsReverted`] for transactions that are now considered expired.
///
/// On success the internal state is updated in place: the chain tip advances, expired data is
/// pruned, and subscribers are notified about the committed block and any reverted
/// transactions.
/// On success the internal state is updated in place: the chain tip advances, expired data
/// is pruned, and subscribers are notified about the committed block and any
/// reverted transactions.
///
/// # Panics
///
Expand All @@ -338,6 +348,7 @@ impl Mempool {
.pending_block
.take_if(|pending| pending.block_number == block_header.block_num())
.expect("block must be in progress to commit");

let tx_ids = block
.batches
.iter()
Expand All @@ -362,8 +373,8 @@ impl Mempool {
///
/// Sends a [`MempoolEvent::TransactionsReverted`] event to subscribers.
///
/// The in-flight block state and all related transactions are discarded, and subscribers are
/// notified about the reverted transactions.
/// The in-flight block state and all related transactions are discarded, and subscribers
/// are notified about the reverted transactions.
///
/// # Panics
///
Expand All @@ -373,31 +384,28 @@ impl Mempool {
// Only revert if the given block is actually inflight.
//
// FIXME: We should consider a more robust check here to identify the block by a hash.
// If multiple jobs are possible, then so are multiple variants with the same block
// number.
// If multiple jobs are possible, then so are multiple variants with the same
// block number.
if self.pending_block.as_ref().is_none_or(|pending| pending.block_number != block) {
return;
}

// Remove all descendants _without_ reinserting the transactions.
//
// This is done to prevent a system bug from causing repeated failures if we keep retrying
// the same transactions. Since we can't trivially identify the cause of the block
// failure, we take the safe route and nuke all associated state.
// Revert the batches, and requeue the transactions for batch selection.
//
// A more refined approach could be to tag the offending transactions and then evict them
// once a certain failure threshold has been met.
let mut reverted_txs = HashSet::default();
// Transactions which have failed excessively are also reverted.
let block = self.pending_block.take().expect("we just checked it is some");
for batch in block.batches {
for batch in &block.batches {
let reverted = self.batches.revert_batch_and_descendants(batch.id());

for batch in reverted {
for tx in batch.into_transactions() {
reverted_txs.extend(self.transactions.revert_tx_and_descendants(tx.id()));
}
self.transactions.requeue_transactions(&batch);
}
}
let failed_txs = block
.batches
.iter()
.flat_map(|batch| batch.transactions().as_slice().iter().map(TransactionHeader::id));
let reverted_txs = self.transactions.increment_failure_count(failed_txs);

self.subscription.txs_reverted(reverted_txs);
self.inject_telemetry();
Expand All @@ -406,7 +414,8 @@ impl Mempool {
// EVENTS & SUBSCRIPTIONS
// --------------------------------------------------------------------------------------------

/// Creates a subscription to [`MempoolEvent`] which will be emitted in the order they occur.
/// Creates a subscription to [`MempoolEvent`] which will be emitted in the order they
/// occur.
///
/// Only emits events which occurred after the current committed block.
#[instrument(target = COMPONENT, name = "mempool.subscribe", skip_all)]
Expand Down Expand Up @@ -498,21 +507,22 @@ impl Mempool {
///
/// Expired batch descendants are also reverted since these are now invalid.
///
/// Transactions from batches are requeued. Expired transactions and their descendants are then
/// reverted as well.
/// Transactions from batches are requeued. Expired transactions and their descendants are
/// then reverted as well.
fn revert_expired(&mut self) -> HashSet<TransactionId> {
let batches = self.batches.revert_expired(self.chain_tip);
for batch in batches {
self.transactions.requeue_transactions(batch);
self.transactions.requeue_transactions(&batch);
}
self.transactions.revert_expired(self.chain_tip)
}

/// Rejects authentication heights that fall outside the overlap guaranteed by the locally
/// retained state.
///
/// The acceptable window is `[chain_tip - state_retention + 1, chain_tip]`; values below this
/// range are rejected as stale because the mempool no longer tracks the intermediate history.
/// The acceptable window is `[chain_tip - state_retention + 1, chain_tip]`; values below
/// this range are rejected as stale because the mempool no longer tracks the
/// intermediate history.
///
/// # Panics
///
Expand All @@ -526,7 +536,7 @@ impl Mempool {
let limit = self
.chain_tip
.checked_sub(self.committed_blocks.len() as u32)
.expect("number of committed blocks cannot exceed the chain tip");
.expect("amount of committed blocks cannot exceed the chain tip");

if authentication_height < limit {
return Err(AddTransactionError::StaleInputs {
Expand Down
51 changes: 46 additions & 5 deletions crates/block-producer/src/mempool/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use std::sync::Arc;

use miden_protocol::Word;
use miden_protocol::block::{BlockHeader, BlockNumber};
use miden_protocol::transaction::TransactionHeader;
use pretty_assertions::assert_eq;
use serial_test::serial;

use super::*;
use crate::mempool::graph::TransactionGraph;
use crate::test_utils::MockProvenTxBuilder;
use crate::test_utils::batch::TransactionBatchConstructor;

Expand Down Expand Up @@ -114,6 +116,9 @@ fn failed_batch_transactions_are_requeued() {
reference.select_batch().unwrap();
reference.add_transaction(txs[1].clone()).unwrap();
reference.add_transaction(txs[2].clone()).unwrap();
reference
.transactions
.increment_failure_count(failed_batch.txs().iter().map(|tx| tx.id()));

assert_eq!(uut, reference);
}
Expand Down Expand Up @@ -208,11 +213,9 @@ fn rollbacks_of_already_proven_batches_are_ignored() {
// BLOCK FAILED TESTS
// ================================================================================================

/// A failed block should have all of its transactions reverted.
#[test]
fn block_failure_reverts_its_transactions() {
// We will revert everything so the reference should be the empty mempool.
let (mut uut, reference) = Mempool::for_tests();
fn block_failure_increments_tx_failures() {
let (mut uut, mut reference) = Mempool::for_tests();

let reverted_txs = MockProvenTxBuilder::sequential();

Expand All @@ -231,12 +234,44 @@ fn block_failure_reverts_its_transactions() {
// Create another dependent transaction.
uut.add_transaction(reverted_txs[2].clone()).unwrap();

// Fail the block which should result in everything reverting.
uut.rollback_block(block.block_number);

// Reference should contain all transactions, no batches, with tx failure from just that block.
reference.add_transaction(reverted_txs[0].clone()).unwrap();
reference.add_transaction(reverted_txs[1].clone()).unwrap();
reference.add_transaction(reverted_txs[2].clone()).unwrap();

reference.transactions.increment_failure_count(
block
.batches
.iter()
.flat_map(|batch| batch.transactions().as_slice().iter().map(TransactionHeader::id)),
);

assert_eq!(uut, reference);
}

#[test]
fn transactions_exceeding_failure_limit_are_removed() {
let (mut uut, _) = Mempool::for_tests();

let failing_tx = MockProvenTxBuilder::with_account_index(0).build();
let failing_tx = Arc::new(AuthenticatedTransaction::from_inner(failing_tx));
let tx_id = failing_tx.id();

uut.add_transaction(failing_tx).unwrap();

for _ in 0..TransactionGraph::FAILURE_LIMIT - 1 {
let reverted = uut.transactions.increment_failure_count(std::iter::once(tx_id));
assert!(reverted.is_empty());
assert_eq!(uut.unbatched_transactions_count(), 1);
}

let reverted = uut.transactions.increment_failure_count(std::iter::once(tx_id));
assert!(reverted.contains(&tx_id));
assert_eq!(uut.unbatched_transactions_count(), 0);
}

/// We've decided that transactions from a rolled back batch should be requeued.
///
/// This test checks this at a basic level by ensuring that rolling back a batch is the same as
Expand Down Expand Up @@ -267,6 +302,9 @@ fn transactions_from_reverted_batches_are_requeued() {
reference.add_transaction(tx_set_a[1].clone()).unwrap();
reference.add_transaction(tx_set_b[2].clone()).unwrap();
reference.add_transaction(tx_set_a[2].clone()).unwrap();
reference
.transactions
.increment_failure_count([tx_set_a[1].id(), tx_set_b[1].id()].into_iter());

assert_eq!(uut, reference);
}
Expand Down Expand Up @@ -362,6 +400,9 @@ fn pass_through_txs_with_note_dependencies() {
uut.rollback_batch(batch_a.id());
reference.add_transaction(tx_pass_through_a).unwrap();
reference.add_transaction(tx_pass_through_b).unwrap();
reference
.transactions
.increment_failure_count(batch_a.txs().iter().map(|tx| tx.id()));

assert_eq!(uut, reference);
}
Loading