Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions src/tasks/cache/tx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
//! Transaction service responsible for fetching and sending trasnsactions to the simulator.
use crate::config::BuilderConfig;
use alloy::consensus::TxEnvelope;
use alloy::{
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
providers::Provider,
};
use eyre::Error;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle, time};
use tracing::{Instrument, debug, debug_span, trace};
use tracing::{Instrument, debug, debug_span, info_span, trace};

/// Poll interval for the transaction poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -56,6 +59,45 @@ impl TxPoller {
Duration::from_millis(self.poll_interval_ms)
}

// Spawn a tokio task to check the nonce of a transaction before sending
// it to the cachetask via the outbound channel.
fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<TxEnvelope>) {
tokio::spawn(async move {
let span = info_span!("check_nonce", tx_id = %tx.tx_hash());

let Ok(ru_provider) =
crate::config().connect_ru_provider().instrument(span.clone()).await
else {
span_warn!(span, "Failed to connect to RU provider, stopping noncecheck task.");
return;
};

let Ok(sender) = tx.recover_signer() else {
span_warn!(span, "Failed to recover sender from transaction");
return;
};

let Ok(tx_count) = ru_provider
.get_transaction_count(sender)
.into_future()
.instrument(span.clone())
.await
else {
span_warn!(span, %sender, "Failed to fetch nonce for sender");
return;
};

if tx.nonce() < tx_count {
span_debug!(span, %sender, tx_nonce = %tx.nonce(), ru_nonce = %tx_count, "Dropping transaction with stale nonce");
return;
}

if outbound.send(tx).is_err() {
span_warn!(span, "Outbound channel closed, stopping NonceChecker task.");
}
});
}

/// Polls the transaction cache for transactions.
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
let url: Url = self.config.tx_pool_url.join("transactions")?;
Expand Down Expand Up @@ -94,11 +136,7 @@ impl TxPoller {
let _guard = span.entered();
debug!(count = ?transactions.len(), "found transactions");
for tx in transactions.into_iter() {
if outbound.send(tx).is_err() {
// If there are no receivers, we can shut down
trace!("No receivers left, shutting down");
break;
}
self.spawn_check_nonce(tx, outbound.clone());
}
}

Expand Down