Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions crates/bundle-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use tips_audit::{BundleEvent, DropReason};
use tips_core::AcceptedBundle;
use tips_core::types::BundleType;
use tokio::sync::mpsc;
use tracing::warn;
use uuid::Uuid;
Expand Down Expand Up @@ -33,6 +34,7 @@ impl ProcessedBundle {
pub trait BundleStore {
fn add_bundle(&mut self, bundle: AcceptedBundle);
fn get_bundles(&self) -> Vec<AcceptedBundle>;
fn get_backrun_bundles(&self) -> HashMap<TxHash, AcceptedBundle>;
fn built_flashblock(
&mut self,
block_number: u64,
Expand All @@ -45,6 +47,7 @@ pub trait BundleStore {
struct BundleData {
flashblocks_in_block: HashMap<u64, Vec<ProcessedBundle>>,
bundles: HashMap<Uuid, AcceptedBundle>,
backrun_bundles: HashMap<TxHash, AcceptedBundle>,
}

#[derive(Clone)]
Expand All @@ -69,6 +72,7 @@ impl InMemoryBundlePool {
inner: Arc::new(Mutex::new(BundleData {
flashblocks_in_block: Default::default(),
bundles: Default::default(),
backrun_bundles: Default::default(),
})),
audit_log,
builder_id,
Expand All @@ -78,15 +82,31 @@ impl InMemoryBundlePool {

impl BundleStore for InMemoryBundlePool {
fn add_bundle(&mut self, bundle: AcceptedBundle) {
let mut inner = self.inner.lock().unwrap();
inner.bundles.insert(*bundle.uuid(), bundle);
if bundle.bundle_type == BundleType::Backrun {
self.inner
.lock()
.unwrap()
.backrun_bundles
.insert(bundle.txs[0].tx_hash(), bundle);
} else {
self.inner
.lock()
.unwrap()
.bundles
.insert(*bundle.uuid(), bundle);
}
}

fn get_bundles(&self) -> Vec<AcceptedBundle> {
let inner = self.inner.lock().unwrap();
inner.bundles.values().cloned().collect()
}

fn get_backrun_bundles(&self) -> HashMap<TxHash, AcceptedBundle> {
let inner = self.inner.lock().unwrap();
inner.backrun_bundles.clone()
}

fn built_flashblock(
&mut self,
block_number: u64,
Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::types::BundleType;
use crate::{AcceptedBundle, Bundle, MeterBundleResponse};
use alloy_consensus::SignableTransaction;
use alloy_primitives::{Address, B256, Bytes, TxHash, U256, b256, bytes};
Expand All @@ -23,6 +24,7 @@ pub fn create_bundle_from_txn_data() -> AcceptedBundle {
}
.try_into()
.unwrap(),
BundleType::Standard,
create_test_meter_bundle_response(),
)
}
Expand Down Expand Up @@ -60,7 +62,11 @@ pub fn create_test_bundle(
};
let meter_bundle_response = create_test_meter_bundle_response();

AcceptedBundle::new(bundle.try_into().unwrap(), meter_bundle_response)
AcceptedBundle::new(
bundle.try_into().unwrap(),
BundleType::Standard,
meter_bundle_response,
)
}

pub fn create_test_meter_bundle_response() -> MeterBundleResponse {
Expand Down
24 changes: 23 additions & 1 deletion crates/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,28 @@ pub struct CancelBundle {
pub replacement_uuid: String,
}

// Add BundleType enum
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum BundleType {
#[default]
Standard,

/// Backrun bundle: txs[0] is target, txs[1..] execute after target succeeds
Backrun,
}

/// `AcceptedBundle` is the type that is sent over the wire.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AcceptedBundle {
pub uuid: Uuid,

pub txs: Vec<Recovered<OpTxEnvelope>>,

/// Bundle type - determines execution strategy
#[serde(default)]
pub bundle_type: BundleType,

#[serde(with = "alloy_serde::quantity")]
pub block_number: u64,

Expand Down Expand Up @@ -242,10 +257,15 @@ impl BundleTxs for AcceptedBundle {
}

impl AcceptedBundle {
pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self {
pub fn new(
bundle: ParsedBundle,
bundle_type: BundleType,
meter_bundle_response: MeterBundleResponse,
) -> Self {
AcceptedBundle {
uuid: bundle.replacement_uuid.unwrap_or_else(Uuid::new_v4),
txs: bundle.txs,
bundle_type: bundle_type,
block_number: bundle.block_number,
flashblock_number_min: bundle.flashblock_number_min,
flashblock_number_max: bundle.flashblock_number_max,
Expand Down Expand Up @@ -326,6 +346,7 @@ mod tests {
}
.try_into()
.unwrap(),
BundleType::Standard,
create_test_meter_bundle_response(),
);

Expand Down Expand Up @@ -355,6 +376,7 @@ mod tests {
}
.try_into()
.unwrap(),
BundleType::Standard,
create_test_meter_bundle_response(),
);

Expand Down
2 changes: 2 additions & 0 deletions crates/ingress-rpc/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ mod tests {
use rdkafka::config::ClientConfig;
use tips_core::{
AcceptedBundle, Bundle, BundleExtensions, test_utils::create_test_meter_bundle_response,
types::BundleType,
};
use tokio::time::{Duration, Instant};

Expand All @@ -97,6 +98,7 @@ mod tests {
let bundle = create_test_bundle();
let accepted_bundle = AcceptedBundle::new(
bundle.try_into().unwrap(),
BundleType::Standard,
create_test_meter_bundle_response(),
);
let bundle_hash = &accepted_bundle.bundle_hash();
Expand Down
136 changes: 97 additions & 39 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use op_alloy_network::Optimism;
use reth_rpc_eth_types::EthApiError;
use std::time::{SystemTime, UNIX_EPOCH};
use tips_audit::BundleEvent;
use tips_core::types::ParsedBundle;
use tips_core::types::{BundleType, ParsedBundle};
use tips_core::{
AcceptedBundle, Bundle, BundleExtensions, BundleHash, CancelBundle, MeterBundleResponse,
};
Expand All @@ -30,6 +30,9 @@ pub trait IngressApi {
#[method(name = "sendBundle")]
async fn send_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash>;

#[method(name = "sendBackrunBundle")]
async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash>;

/// `eth_cancelBundle` is used to prevent a submitted bundle from being included on-chain.
#[method(name = "cancelBundle")]
async fn cancel_bundle(&self, request: CancelBundle) -> RpcResult<()>;
Expand Down Expand Up @@ -79,44 +82,13 @@ impl<Queue> IngressApiServer for IngressService<Queue>
where
Queue: QueuePublisher + Sync + Send + 'static,
{
async fn send_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash> {
self.validate_bundle(&bundle).await?;
let parsed_bundle: ParsedBundle = bundle
.clone()
.try_into()
.map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?;
let bundle_hash = &parsed_bundle.bundle_hash();
let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await?;
let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response);

if let Err(e) = self
.bundle_queue
.publish(&accepted_bundle, bundle_hash)
.await
{
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err());
}

info!(
message = "queued bundle",
bundle_hash = %bundle_hash,
);

let audit_event = BundleEvent::Received {
bundle_id: *accepted_bundle.uuid(),
bundle: Box::new(accepted_bundle.clone()),
};
if let Err(e) = self.audit_channel.send(audit_event) {
warn!(message = "Failed to send audit event", error = %e);
return Err(
EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(),
);
}
async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash> {
info!(message = "sending backrun bundle");
self.parse_and_process_bundle(bundle, true).await
}

Ok(BundleHash {
bundle_hash: *bundle_hash,
})
async fn send_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash> {
self.parse_and_process_bundle(bundle, false).await
}

async fn cancel_bundle(&self, _request: CancelBundle) -> RpcResult<()> {
Expand Down Expand Up @@ -161,7 +133,8 @@ where
let bundle_hash = &parsed_bundle.bundle_hash();
let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await?;

let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response);
let accepted_bundle =
AcceptedBundle::new(parsed_bundle, BundleType::Standard, meter_bundle_response);

if let Err(e) = self
.bundle_queue
Expand Down Expand Up @@ -300,6 +273,91 @@ where
}
Ok(res)
}

async fn parse_and_process_bundle(
&self,
bundle: Bundle,
is_backrun: bool,
) -> RpcResult<BundleHash> {
self.validate_bundle(&bundle).await.inspect_err(|e| {
tracing::error!(
error = ?e,
is_backrun = is_backrun,
"Bundle validation failed"
);
})?;

let parsed_bundle: ParsedBundle = bundle
.clone()
.try_into()
.inspect_err(|e| {
tracing::error!(
error = %e,
is_backrun = is_backrun,
"ParsedBundle conversion failed"
);
})
.map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?;

let bundle_hash = &parsed_bundle.bundle_hash();

let meter_bundle_response =
self.meter_bundle(&bundle, bundle_hash)
.await
.inspect_err(|e| {
tracing::error!(
error = ?e,
bundle_hash = ?bundle_hash,
is_backrun = is_backrun,
"Bundle metering failed"
);
})?;

let bundle_type = if is_backrun {
BundleType::Backrun
} else {
BundleType::Standard
};

let accepted_bundle =
AcceptedBundle::new(parsed_bundle, bundle_type, meter_bundle_response);

tracing::info!(
bundle_hash = ?bundle_hash,
bundle_type = ?bundle_type,
num_txs = bundle.txs.len(),
"Bundle successfully processed"
);

if let Err(e) = self
.bundle_queue
.publish(&accepted_bundle, bundle_hash)
.await
{
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err());
}

info!(
message = "queued bundle",
bundle_hash = %bundle_hash,
);

let audit_event = BundleEvent::Received {
bundle_id: *accepted_bundle.uuid(),
bundle: Box::new(accepted_bundle.clone()),
};
if let Err(e) = self.audit_channel.send(audit_event) {
warn!(message = "Failed to send audit event", error = %e);
return Err(
EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(),
);
}

Ok(BundleHash {
bundle_hash: *bundle_hash,
})
}
}

#[cfg(test)]
Expand Down
Loading
Loading