Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ TIPS_UI_S3_CONFIG_TYPE=manual
TIPS_UI_S3_ENDPOINT=http://localhost:7000
TIPS_UI_S3_ACCESS_KEY_ID=minioadmin
TIPS_UI_S3_SECRET_ACCESS_KEY=minioadmin

# User Operations (EIP-4337)
TIPS_INGRESS_KAFKA_USER_OPS_PROPERTIES_FILE=/app/docker/user-operations-kafka-properties
TIPS_INGRESS_KAFKA_USER_OPS_TOPIC=tips-user-operations
2 changes: 2 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod kafka;
pub mod logger;
pub mod types;
pub mod user_operation;

#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
Expand All @@ -9,3 +10,4 @@ pub use types::{
AcceptedBundle, Bundle, BundleExtensions, BundleHash, BundleTxs, CancelBundle,
MeterBundleResponse,
};
pub use user_operation::{UserOperation, UserOperationWithMetadata};
144 changes: 144 additions & 0 deletions crates/core/src/user_operation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//! EIP-4337 Account Abstraction User Operation types
use alloy_primitives::{Address, Bytes, U256, B256, keccak256};
use serde::{Deserialize, Serialize};

/// User Operation as defined by EIP-4337 v0.6
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UserOperationV06 {
pub sender: Address,
pub nonce: U256,
pub init_code: Bytes,
pub call_data: Bytes,
pub call_gas_limit: U256,
pub verification_gas_limit: U256,
pub pre_verification_gas: U256,
pub max_fee_per_gas: U256,
pub max_priority_fee_per_gas: U256,
pub paymaster_and_data: Bytes,
pub signature: Bytes,
}

/// User Operation as defined by EIP-4337 v0.7+
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UserOperationV07 {
pub sender: Address,
pub nonce: U256,
pub factory: Address,
pub factory_data: Bytes,
pub call_data: Bytes,
pub call_gas_limit: U256,
pub verification_gas_limit: U256,
pub pre_verification_gas: U256,
pub max_fee_per_gas: U256,
pub max_priority_fee_per_gas: U256,
pub paymaster: Address,
pub paymaster_verification_gas_limit: U256,
pub paymaster_post_op_gas_limit: U256,
pub paymaster_data: Bytes,
pub signature: Bytes,
}

/// User Operation that can be either v0.6 or v0.7+
/// Automatically deserializes based on fields present
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum UserOperation {
V06(UserOperationV06),
V07(UserOperationV07),
}

impl UserOperation {
/// Get the sender address
pub fn sender(&self) -> Address {
match self {
UserOperation::V06(op) => op.sender,
UserOperation::V07(op) => op.sender,
}
}

/// Get the nonce
pub fn nonce(&self) -> U256 {
match self {
UserOperation::V06(op) => op.nonce,
UserOperation::V07(op) => op.nonce,
}
}

/// Calculate the user operation hash (for use as Kafka key and tracking)
/// This is a simplified hash - for production, use the full EIP-4337 hash algorithm
pub fn user_op_hash(&self, entry_point: &Address, chain_id: u64) -> B256 {
let mut data = Vec::new();

// Include chain ID and entry point
data.extend_from_slice(&chain_id.to_be_bytes());
data.extend_from_slice(entry_point.as_slice());

match self {
UserOperation::V06(op) => {
data.extend_from_slice(op.sender.as_slice());
data.extend_from_slice(&op.nonce.to_be_bytes::<32>());
data.extend_from_slice(&keccak256(&op.init_code).0);
data.extend_from_slice(&keccak256(&op.call_data).0);
data.extend_from_slice(&op.call_gas_limit.to_be_bytes::<32>());
data.extend_from_slice(&op.verification_gas_limit.to_be_bytes::<32>());
data.extend_from_slice(&op.pre_verification_gas.to_be_bytes::<32>());
data.extend_from_slice(&op.max_fee_per_gas.to_be_bytes::<32>());
data.extend_from_slice(&op.max_priority_fee_per_gas.to_be_bytes::<32>());
data.extend_from_slice(&keccak256(&op.paymaster_and_data).0);
}
UserOperation::V07(op) => {
data.extend_from_slice(op.sender.as_slice());
data.extend_from_slice(&op.nonce.to_be_bytes::<32>());
data.extend_from_slice(op.factory.as_slice());
data.extend_from_slice(&keccak256(&op.factory_data).0);
data.extend_from_slice(&keccak256(&op.call_data).0);
data.extend_from_slice(&op.call_gas_limit.to_be_bytes::<32>());
data.extend_from_slice(&op.verification_gas_limit.to_be_bytes::<32>());
data.extend_from_slice(&op.pre_verification_gas.to_be_bytes::<32>());
data.extend_from_slice(&op.max_fee_per_gas.to_be_bytes::<32>());
data.extend_from_slice(&op.max_priority_fee_per_gas.to_be_bytes::<32>());
data.extend_from_slice(op.paymaster.as_slice());
data.extend_from_slice(&op.paymaster_verification_gas_limit.to_be_bytes::<32>());
data.extend_from_slice(&op.paymaster_post_op_gas_limit.to_be_bytes::<32>());
data.extend_from_slice(&keccak256(&op.paymaster_data).0);
}
}

keccak256(&data)
}
}

/// Wrapper for UserOperation with metadata for Kafka queue
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UserOperationWithMetadata {
pub user_operation: UserOperation,
pub entry_point: Address,
pub user_op_hash: B256,
pub received_at: u64, // Unix timestamp
pub chain_id: u64,
}

impl UserOperationWithMetadata {
pub fn new(
user_operation: UserOperation,
entry_point: Address,
chain_id: u64,
) -> Self {
let user_op_hash = user_operation.user_op_hash(&entry_point, chain_id);
let received_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();

Self {
user_operation,
entry_point,
user_op_hash,
received_at,
chain_id,
}
}
}
31 changes: 22 additions & 9 deletions crates/ingress-rpc/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tips_core::logger::init_logger;
use tips_ingress_rpc::Config;
use tips_ingress_rpc::connect_ingress_to_builder;
use tips_ingress_rpc::metrics::init_prometheus_exporter;
use tips_ingress_rpc::queue::KafkaQueuePublisher;
use tips_ingress_rpc::queue::{BundleQueuePublisher, UserOperationQueuePublisher};
use tips_ingress_rpc::service::{IngressApiServer, IngressService};
use tokio::sync::{broadcast, mpsc};
use tracing::info;
Expand Down Expand Up @@ -47,19 +47,31 @@ async fn main() -> anyhow::Result<()> {
.network::<Optimism>()
.connect_http(config.simulation_rpc);

// Bundle queue setup
let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file(
&config.ingress_kafka_properties,
)?);

let queue_producer: FutureProducer = ingress_client_config.create()?;

let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic);

let bundle_producer: FutureProducer = ingress_client_config.create()?;
let bundle_queue = BundleQueuePublisher::new(bundle_producer, config.ingress_topic.clone());

// User operation queue setup
let user_op_queue = if let Some(user_ops_props) = &config.user_ops_kafka_properties {
// Use dedicated user ops configuration if provided
let user_ops_config = ClientConfig::from_iter(
load_kafka_config_from_file(user_ops_props)?
);
let user_ops_producer: FutureProducer = user_ops_config.create()?;
UserOperationQueuePublisher::new(user_ops_producer, config.user_ops_topic.clone())
} else {
// Fall back to using the same config as bundles
let user_ops_producer: FutureProducer = ingress_client_config.create()?;
UserOperationQueuePublisher::new(user_ops_producer, config.user_ops_topic.clone())
};

// Audit setup
let audit_client_config =
ClientConfig::from_iter(load_kafka_config_from_file(&config.audit_kafka_properties)?);

let audit_producer: FutureProducer = audit_client_config.create()?;

let audit_publisher = KafkaBundleEventPublisher::new(audit_producer, config.audit_topic);
let (audit_tx, audit_rx) = mpsc::unbounded_channel::<BundleEvent>();
connect_audit_to_publisher(audit_rx, audit_publisher);
Expand All @@ -74,7 +86,8 @@ async fn main() -> anyhow::Result<()> {
let service = IngressService::new(
provider,
simulation_provider,
queue,
bundle_queue,
user_op_queue,
audit_tx,
builder_tx,
cfg,
Expand Down
12 changes: 12 additions & 0 deletions crates/ingress-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ pub struct Config {
)]
pub audit_topic: String,

/// Kafka properties file for user operations (optional, defaults to ingress properties)
#[arg(long, env = "TIPS_INGRESS_KAFKA_USER_OPS_PROPERTIES_FILE")]
pub user_ops_kafka_properties: Option<String>,

/// Kafka topic for user operations
#[arg(
long,
env = "TIPS_INGRESS_KAFKA_USER_OPS_TOPIC",
default_value = "tips-user-operations"
)]
pub user_ops_topic: String,

#[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")]
pub log_level: String,

Expand Down
92 changes: 65 additions & 27 deletions crates/ingress-rpc/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,55 @@
use alloy_primitives::B256;
use anyhow::Result;
use async_trait::async_trait;
use backon::{ExponentialBuilder, Retryable};
use rdkafka::producer::{FutureProducer, FutureRecord};
use tips_core::AcceptedBundle;
use tips_core::{AcceptedBundle, UserOperationWithMetadata};
use tokio::time::Duration;
use tracing::{error, info};

/// A queue to buffer transactions
#[async_trait]
pub trait QueuePublisher: Send + Sync {
async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()>;
}

/// A queue to buffer transactions
pub struct KafkaQueuePublisher {
/// Internal Kafka implementation - handles the low-level Kafka publishing with retry logic
struct KafkaPublisher {
producer: FutureProducer,
topic: String,
}

impl KafkaQueuePublisher {
pub fn new(producer: FutureProducer, topic: String) -> Self {
impl KafkaPublisher {
fn new(producer: FutureProducer, topic: String) -> Self {
Self { producer, topic }
}
}

#[async_trait]
impl QueuePublisher for KafkaQueuePublisher {
async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()> {
let key = bundle_hash.to_string();
let payload = serde_json::to_vec(&bundle)?;

/// Publish any message with a key to Kafka with automatic retry
async fn publish(
&self,
key: &str,
payload_bytes: Vec<u8>,
entity_type: &str,
) -> Result<()> {
let enqueue = || async {
let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
let record = FutureRecord::to(&self.topic)
.key(key)
.payload(&payload_bytes);

match self.producer.send(record, Duration::from_secs(5)).await {
Ok((partition, offset)) => {
info!(
bundle_hash = %bundle_hash,
key = %key,
partition = partition,
offset = offset,
topic = %self.topic,
"Successfully enqueued bundle"
entity_type = entity_type,
"Successfully published to Kafka"
);
Ok(())
}
Err((err, _)) => {
error!(
bundle_hash = %bundle_hash,
key = %key,
error = %err,
topic = %self.topic,
"Failed to enqueue bundle"
entity_type = entity_type,
"Failed to publish to Kafka"
);
Err(anyhow::anyhow!("Failed to enqueue bundle: {err}"))
Err(anyhow::anyhow!("Failed to publish: {err}"))
}
}
};
Expand All @@ -65,12 +62,53 @@ impl QueuePublisher for KafkaQueuePublisher {
.with_max_times(3),
)
.notify(|err: &anyhow::Error, dur: Duration| {
info!("retrying to enqueue bundle {:?} after {:?}", err, dur);
info!("Retrying Kafka publish {:?} after {:?}", err, dur);
})
.await
}
}

/// Publisher for bundle queues - handles bundle-specific publishing logic
pub struct BundleQueuePublisher {
kafka: KafkaPublisher,
}

impl BundleQueuePublisher {
pub fn new(producer: FutureProducer, topic: String) -> Self {
Self {
kafka: KafkaPublisher::new(producer, topic),
}
}

pub async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()> {
let payload_bytes = serde_json::to_vec(bundle)?;
self.kafka
.publish(&bundle_hash.to_string(), payload_bytes, "bundle")
.await
}
}

/// Publisher for user operations - handles user operation-specific publishing logic
pub struct UserOperationQueuePublisher {
kafka: KafkaPublisher,
}

impl UserOperationQueuePublisher {
pub fn new(producer: FutureProducer, topic: String) -> Self {
Self {
kafka: KafkaPublisher::new(producer, topic),
}
}

pub async fn publish(&self, user_op: &UserOperationWithMetadata) -> Result<()> {
let key = user_op.user_op_hash.to_string();
let payload_bytes = serde_json::to_vec(user_op)?;
self.kafka
.publish(&key, payload_bytes, "user_operation")
.await
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -93,7 +131,7 @@ mod tests {
.create()
.expect("Producer creation failed");

let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string());
let publisher = BundleQueuePublisher::new(producer, "tips-ingress-rpc".to_string());
let bundle = create_test_bundle();
let accepted_bundle = AcceptedBundle::new(
bundle.try_into().unwrap(),
Expand Down
Loading
Loading