diff --git a/crates/bundle-pool/src/pool.rs b/crates/bundle-pool/src/pool.rs index 4a529c9..92a3493 100644 --- a/crates/bundle-pool/src/pool.rs +++ b/crates/bundle-pool/src/pool.rs @@ -4,6 +4,7 @@ use std::fmt::Debug; use std::sync::{Arc, Mutex}; use tips_audit::{BundleEvent, DropReason}; use tips_core::AcceptedBundle; +use tips_core::types::BundleType; use tokio::sync::mpsc; use tracing::warn; use uuid::Uuid; @@ -33,6 +34,7 @@ impl ProcessedBundle { pub trait BundleStore { fn add_bundle(&mut self, bundle: AcceptedBundle); fn get_bundles(&self) -> Vec; + fn get_backrun_bundles(&self) -> HashMap; fn built_flashblock( &mut self, block_number: u64, @@ -45,6 +47,7 @@ pub trait BundleStore { struct BundleData { flashblocks_in_block: HashMap>, bundles: HashMap, + backrun_bundles: HashMap, } #[derive(Clone)] @@ -69,6 +72,7 @@ impl InMemoryBundlePool { inner: Arc::new(Mutex::new(BundleData { flashblocks_in_block: Default::default(), bundles: Default::default(), + backrun_bundles: Default::default(), })), audit_log, builder_id, @@ -78,8 +82,19 @@ impl InMemoryBundlePool { impl BundleStore for InMemoryBundlePool { fn add_bundle(&mut self, bundle: AcceptedBundle) { - let mut inner = self.inner.lock().unwrap(); - inner.bundles.insert(*bundle.uuid(), bundle); + if bundle.bundle_type == BundleType::Backrun { + self.inner + .lock() + .unwrap() + .backrun_bundles + .insert(bundle.txs[0].tx_hash(), bundle); + } else { + self.inner + .lock() + .unwrap() + .bundles + .insert(*bundle.uuid(), bundle); + } } fn get_bundles(&self) -> Vec { @@ -87,6 +102,11 @@ impl BundleStore for InMemoryBundlePool { inner.bundles.values().cloned().collect() } + fn get_backrun_bundles(&self) -> HashMap { + let inner = self.inner.lock().unwrap(); + inner.backrun_bundles.clone() + } + fn built_flashblock( &mut self, block_number: u64, diff --git a/crates/core/src/test_utils.rs b/crates/core/src/test_utils.rs index edf3b8e..2e63e40 100644 --- a/crates/core/src/test_utils.rs +++ b/crates/core/src/test_utils.rs @@ -1,3 +1,4 @@ +use crate::types::BundleType; use crate::{AcceptedBundle, Bundle, MeterBundleResponse}; use alloy_consensus::SignableTransaction; use alloy_primitives::{Address, B256, Bytes, TxHash, U256, b256, bytes}; @@ -23,6 +24,7 @@ pub fn create_bundle_from_txn_data() -> AcceptedBundle { } .try_into() .unwrap(), + BundleType::Standard, create_test_meter_bundle_response(), ) } @@ -60,7 +62,11 @@ pub fn create_test_bundle( }; let meter_bundle_response = create_test_meter_bundle_response(); - AcceptedBundle::new(bundle.try_into().unwrap(), meter_bundle_response) + AcceptedBundle::new( + bundle.try_into().unwrap(), + BundleType::Standard, + meter_bundle_response, + ) } pub fn create_test_meter_bundle_response() -> MeterBundleResponse { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 536098c..678dd8f 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -134,6 +134,17 @@ pub struct CancelBundle { pub replacement_uuid: String, } +// Add BundleType enum +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "lowercase")] +pub enum BundleType { + #[default] + Standard, + + /// Backrun bundle: txs[0] is target, txs[1..] execute after target succeeds + Backrun, +} + /// `AcceptedBundle` is the type that is sent over the wire. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AcceptedBundle { @@ -141,6 +152,10 @@ pub struct AcceptedBundle { pub txs: Vec>, + /// Bundle type - determines execution strategy + #[serde(default)] + pub bundle_type: BundleType, + #[serde(with = "alloy_serde::quantity")] pub block_number: u64, @@ -242,10 +257,15 @@ impl BundleTxs for AcceptedBundle { } impl AcceptedBundle { - pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self { + pub fn new( + bundle: ParsedBundle, + bundle_type: BundleType, + meter_bundle_response: MeterBundleResponse, + ) -> Self { AcceptedBundle { uuid: bundle.replacement_uuid.unwrap_or_else(Uuid::new_v4), txs: bundle.txs, + bundle_type: bundle_type, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, flashblock_number_max: bundle.flashblock_number_max, @@ -326,6 +346,7 @@ mod tests { } .try_into() .unwrap(), + BundleType::Standard, create_test_meter_bundle_response(), ); @@ -355,6 +376,7 @@ mod tests { } .try_into() .unwrap(), + BundleType::Standard, create_test_meter_bundle_response(), ); diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index a13ad4f..b8608b1 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -77,6 +77,7 @@ mod tests { use rdkafka::config::ClientConfig; use tips_core::{ AcceptedBundle, Bundle, BundleExtensions, test_utils::create_test_meter_bundle_response, + types::BundleType, }; use tokio::time::{Duration, Instant}; @@ -97,6 +98,7 @@ mod tests { let bundle = create_test_bundle(); let accepted_bundle = AcceptedBundle::new( bundle.try_into().unwrap(), + BundleType::Standard, create_test_meter_bundle_response(), ); let bundle_hash = &accepted_bundle.bundle_hash(); diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8398bfb..fecc634 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,7 +11,7 @@ use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::BundleEvent; -use tips_core::types::ParsedBundle; +use tips_core::types::{BundleType, ParsedBundle}; use tips_core::{ AcceptedBundle, Bundle, BundleExtensions, BundleHash, CancelBundle, MeterBundleResponse, }; @@ -30,6 +30,9 @@ pub trait IngressApi { #[method(name = "sendBundle")] async fn send_bundle(&self, bundle: Bundle) -> RpcResult; + #[method(name = "sendBackrunBundle")] + async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult; + /// `eth_cancelBundle` is used to prevent a submitted bundle from being included on-chain. #[method(name = "cancelBundle")] async fn cancel_bundle(&self, request: CancelBundle) -> RpcResult<()>; @@ -79,44 +82,13 @@ impl IngressApiServer for IngressService where Queue: QueuePublisher + Sync + Send + 'static, { - async fn send_bundle(&self, bundle: Bundle) -> RpcResult { - self.validate_bundle(&bundle).await?; - let parsed_bundle: ParsedBundle = bundle - .clone() - .try_into() - .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; - let bundle_hash = &parsed_bundle.bundle_hash(); - let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await?; - let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response); - - if let Err(e) = self - .bundle_queue - .publish(&accepted_bundle, bundle_hash) - .await - { - warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); - return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err()); - } - - info!( - message = "queued bundle", - bundle_hash = %bundle_hash, - ); - - let audit_event = BundleEvent::Received { - bundle_id: *accepted_bundle.uuid(), - bundle: Box::new(accepted_bundle.clone()), - }; - if let Err(e) = self.audit_channel.send(audit_event) { - warn!(message = "Failed to send audit event", error = %e); - return Err( - EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(), - ); - } + async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult { + info!(message = "sending backrun bundle"); + self.parse_and_process_bundle(bundle, true).await + } - Ok(BundleHash { - bundle_hash: *bundle_hash, - }) + async fn send_bundle(&self, bundle: Bundle) -> RpcResult { + self.parse_and_process_bundle(bundle, false).await } async fn cancel_bundle(&self, _request: CancelBundle) -> RpcResult<()> { @@ -161,7 +133,8 @@ where let bundle_hash = &parsed_bundle.bundle_hash(); let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await?; - let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response); + let accepted_bundle = + AcceptedBundle::new(parsed_bundle, BundleType::Standard, meter_bundle_response); if let Err(e) = self .bundle_queue @@ -300,6 +273,91 @@ where } Ok(res) } + + async fn parse_and_process_bundle( + &self, + bundle: Bundle, + is_backrun: bool, + ) -> RpcResult { + self.validate_bundle(&bundle).await.inspect_err(|e| { + tracing::error!( + error = ?e, + is_backrun = is_backrun, + "Bundle validation failed" + ); + })?; + + let parsed_bundle: ParsedBundle = bundle + .clone() + .try_into() + .inspect_err(|e| { + tracing::error!( + error = %e, + is_backrun = is_backrun, + "ParsedBundle conversion failed" + ); + }) + .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; + + let bundle_hash = &parsed_bundle.bundle_hash(); + + let meter_bundle_response = + self.meter_bundle(&bundle, bundle_hash) + .await + .inspect_err(|e| { + tracing::error!( + error = ?e, + bundle_hash = ?bundle_hash, + is_backrun = is_backrun, + "Bundle metering failed" + ); + })?; + + let bundle_type = if is_backrun { + BundleType::Backrun + } else { + BundleType::Standard + }; + + let accepted_bundle = + AcceptedBundle::new(parsed_bundle, bundle_type, meter_bundle_response); + + tracing::info!( + bundle_hash = ?bundle_hash, + bundle_type = ?bundle_type, + num_txs = bundle.txs.len(), + "Bundle successfully processed" + ); + + if let Err(e) = self + .bundle_queue + .publish(&accepted_bundle, bundle_hash) + .await + { + warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); + return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err()); + } + + info!( + message = "queued bundle", + bundle_hash = %bundle_hash, + ); + + let audit_event = BundleEvent::Received { + bundle_id: *accepted_bundle.uuid(), + bundle: Box::new(accepted_bundle.clone()), + }; + if let Err(e) = self.audit_channel.send(audit_event) { + warn!(message = "Failed to send audit event", error = %e); + return Err( + EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(), + ); + } + + Ok(BundleHash { + bundle_hash: *bundle_hash, + }) + } } #[cfg(test)] diff --git a/justfile b/justfile index 2d4398e..521301c 100644 --- a/justfile +++ b/justfile @@ -99,6 +99,9 @@ get-blocks: sender := "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" +backrunner := "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" +backrunner_key := "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a" + send-txn: #!/usr/bin/env bash set -euxo pipefail @@ -108,3 +111,67 @@ send-txn: hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result") cast receipt $hash -r {{ sequencer_url }} | grep status cast receipt $hash -r {{ builder_url }} | grep status + +send-txn-with-backrun: + #!/usr/bin/env bash + set -euxo pipefail + + # 1. Get nonce and send target transaction from sender account + nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) + echo "Sending target transaction from sender (nonce=$nonce)..." + target_txn=$(cast mktx --private-key {{ sender_key }} \ + 0x0000000000000000000000000000000000000000 \ + --value 0.01ether \ + --nonce $nonce \ + --chain-id 13 \ + -r {{ builder_url }}) + + target_hash=$(curl -s {{ ingress_url }} -X POST \ + -H "Content-Type: application/json" \ + --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" \ + | jq -r ".result") + echo "Target tx sent: $target_hash" + + # 2. Build backrun transaction from backrunner account (different account!) + backrun_nonce=$(cast nonce {{ backrunner }} -r {{ builder_url }}) + echo "Building backrun transaction from backrunner (nonce=$backrun_nonce)..." + backrun_txn=$(cast mktx --private-key {{ backrunner_key }} \ + 0x0000000000000000000000000000000000000001 \ + --value 0.001ether \ + --nonce $backrun_nonce \ + --chain-id 13 \ + -r {{ builder_url }}) + + # 3. Compute tx hashes for reverting_tx_hashes + backrun_hash_computed=$(cast keccak $backrun_txn) + echo "Target tx hash: $target_hash" + echo "Backrun tx hash: $backrun_hash_computed" + + # 4. Construct and send bundle with reverting_tx_hashes + echo "Sending backrun bundle..." + bundle_json=$(jq -n \ + --arg target "$target_txn" \ + --arg backrun "$backrun_txn" \ + --arg target_hash "$target_hash" \ + --arg backrun_hash "$backrun_hash_computed" \ + '{ + txs: [$target, $backrun], + blockNumber: 0, + revertingTxHashes: [$target_hash, $backrun_hash] + }') + + bundle_hash=$(curl -s {{ ingress_url }} -X POST \ + -H "Content-Type: application/json" \ + --data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" \ + | jq -r ".result") + echo "Bundle sent: $bundle_hash" + + # 5. Wait and verify both transactions + echo "Waiting for transactions to land..." + sleep 5 + + echo "=== Target transaction (from sender) ===" + cast receipt $target_hash -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" + + echo "=== Backrun transaction (from backrunner) ===" + cast receipt $backrun_hash_computed -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" || echo "Backrun tx not found yet"