From bca522d4ef037b1fc5ec8d8d5f8374c055e7077b Mon Sep 17 00:00:00 2001 From: James Date: Tue, 25 Nov 2025 16:40:01 -0500 Subject: [PATCH 1/2] feat: filter low nonces when ingesting txns --- src/tasks/cache/tx.rs | 52 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 0043f9f..cfed062 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -1,12 +1,15 @@ //! Transaction service responsible for fetching and sending trasnsactions to the simulator. use crate::config::BuilderConfig; -use alloy::consensus::TxEnvelope; +use alloy::{ + consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, + providers::Provider, +}; use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::{sync::mpsc, task::JoinHandle, time}; -use tracing::{Instrument, debug, debug_span, trace}; +use tracing::{Instrument, debug, debug_span, info_span, trace}; /// Poll interval for the transaction poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -56,6 +59,45 @@ impl TxPoller { Duration::from_millis(self.poll_interval_ms) } + // Spawn a tokio task to check the nonce of a transaction before sending + // it to the cachetask via the outbound channel. + fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender) { + tokio::spawn(async move { + let span = info_span!("check_nonce", tx_id = %tx.tx_hash()); + + let Ok(ru_provider) = + crate::config().connect_ru_provider().instrument(span.clone()).await + else { + span_warn!(span, "Failed to connect to RU provider, stopping noncecheck task."); + return; + }; + + let Ok(sender) = tx.recover_signer() else { + span_warn!(span, "Failed to recover sender from transaction"); + return; + }; + + let Ok(tx_count) = ru_provider + .get_transaction_count(sender) + .into_future() + .instrument(span.clone()) + .await + else { + span_warn!(span, %sender, "Failed to fetch nonce for sender"); + return; + }; + + if tx.nonce() < tx_count { + span_debug!(span, %sender, tx_nonce = %tx.nonce(), ru_nonce = %tx_count, "Dropping transaction with stale nonce"); + return; + } + + if outbound.send(tx).is_err() { + span_warn!(span, "Outbound channel closed, stopping NonceChecker task."); + } + }); + } + /// Polls the transaction cache for transactions. pub async fn check_tx_cache(&mut self) -> Result, Error> { let url: Url = self.config.tx_pool_url.join("transactions")?; @@ -94,11 +136,7 @@ impl TxPoller { let _guard = span.entered(); debug!(count = ?transactions.len(), "found transactions"); for tx in transactions.into_iter() { - if outbound.send(tx).is_err() { - // If there are no receivers, we can shut down - trace!("No receivers left, shutting down"); - break; - } + self.spawn_check_nonce(tx, outbound.clone()); } } From a7d9cded03941455b382e83a13d225d25b44ac04 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 26 Nov 2025 13:16:13 -0500 Subject: [PATCH 2/2] fix: debug it --- src/tasks/cache/tx.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index cfed062..9902a35 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -9,7 +9,7 @@ use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::{sync::mpsc, task::JoinHandle, time}; -use tracing::{Instrument, debug, debug_span, info_span, trace}; +use tracing::{Instrument, debug, debug_span, trace}; /// Poll interval for the transaction poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -63,7 +63,7 @@ impl TxPoller { // it to the cachetask via the outbound channel. fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender) { tokio::spawn(async move { - let span = info_span!("check_nonce", tx_id = %tx.tx_hash()); + let span = debug_span!("check_nonce", tx_id = %tx.tx_hash()); let Ok(ru_provider) = crate::config().connect_ru_provider().instrument(span.clone()).await