diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index fc7b864975..c0b7f1745d 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -12,7 +12,7 @@ use starknet::macros::{felt, selector}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{AnyProvider, JsonRpcClient, Provider}; use starknet::signers::{LocalWallet, SigningKey}; -use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use tokio::sync::RwLock as AsyncRwLock; use tracing::{debug, error, info, trace, warn}; @@ -20,11 +20,6 @@ use url::Url; use super::{Error, MessagingConfig, Messenger, MessengerResult, LOG_TARGET}; -/// As messaging in starknet is only possible with EthAddress in the `to_address` -/// field, we have to set magic value to understand what the user want to do. -/// In the case of execution -> the felt 'EXE' will be passed. -/// And for normal messages, the felt 'MSG' is used. -/// Those values are very not likely a valid account address on starknet. const MSG_MAGIC: FieldElement = felt!("0x4d5347"); const EXE_MAGIC: FieldElement = felt!("0x455845"); @@ -37,6 +32,7 @@ pub struct StarknetMessaging sender_account_address: FieldElement, messaging_contract_address: FieldElement, hooker: Arc + Send + Sync>>, + event_cache: Arc>>, } impl StarknetMessaging { @@ -65,44 +61,55 @@ impl StarknetMessaging { sender_account_address, messaging_contract_address, hooker, + event_cache: Arc::new(AsyncRwLock::new(HashSet::new())), }) } - /// Fetches events for the given blocks range. - pub async fn fetch_events( - &self, - from_block: BlockId, - to_block: BlockId, - ) -> Result>> { - trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); - - let mut block_to_events: HashMap> = HashMap::new(); - - let filter = EventFilter { - from_block: Some(from_block), - to_block: Some(to_block), - address: Some(self.messaging_contract_address), - // TODO: this might come from the configuration actually. - keys: None, - }; - - // TODO: this chunk_size may also come from configuration? - let chunk_size = 200; + async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult> { + let mut l1_handler_txs: Vec = vec![]; let mut continuation_token: Option = None; loop { - let event_page = - self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?; - - event_page.events.into_iter().for_each(|event| { - // We ignore events without the block number - if let Some(block_number) = event.block_number { - block_to_events - .entry(block_number) - .and_modify(|v| v.push(event.clone())) - .or_insert(vec![event]); + let filter = EventFilter { + from_block: Some(BlockId::Tag(BlockTag::Pending)), + to_block: Some(BlockId::Tag(BlockTag::Pending)), + address: Some(self.messaging_contract_address), + keys: None, + }; + + let event_page = self + .provider + .get_events(filter.clone(), continuation_token.clone(), 200) + .await + .map_err(|e| { + error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e); + Error::SendError + })?; + + for event in event_page.events { + let event_id = event.transaction_hash.to_string(); // Assuming `transaction_hash` is the unique identifier for the event + + let mut cache = self.event_cache.write().await; + if cache.contains(&event_id) { + continue; } - }); + + if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) { + if let Ok((from, to, selector)) = info_from_event(&event) { + let hooker = Arc::clone(&self.hooker); + let is_message_accepted = hooker + .read() + .await + .verify_message_to_appchain(from, to, selector) + .await; + + if is_message_accepted { + l1_handler_txs.push(tx); + cache.insert(event_id); + } + } + } + } continuation_token = event_page.continuation_token; @@ -111,10 +118,9 @@ impl StarknetMessaging { } } - Ok(block_to_events) + Ok(l1_handler_txs) } - /// Sends an invoke TX on starknet. async fn send_invoke_tx(&self, calls: Vec) -> Result { let signer = Arc::new(&self.wallet); @@ -128,7 +134,6 @@ impl StarknetMessaging { account.set_block_id(BlockId::Tag(BlockTag::Pending)); - // TODO: we need to have maximum fee configurable. let execution = account.execute(calls).fee_estimate_multiplier(10f64); let estimated_fee = (execution.estimate_fee().await?.overall_fee) * 10u64.into(); let execution_with_fee = execution.max_fee(estimated_fee); @@ -138,8 +143,6 @@ impl StarknetMessaging { match execution_with_fee.send().await { Ok(tx) => { info!(target: LOG_TARGET, "Transaction successful: {:?}", tx); - println!("tx: {:?}", tx); - println!("tx_hash: {:?}", tx.transaction_hash); Ok(tx.transaction_hash) } Err(e) => { @@ -149,7 +152,6 @@ impl StarknetMessaging { } } - /// Sends messages hashes to settlement layer by sending a transaction. async fn send_hashes(&self, mut hashes: Vec) -> MessengerResult { hashes.retain(|&x| x != HASH_EXEC); @@ -192,7 +194,7 @@ impl Messenger for StarknetM from_block: u64, max_blocks: u64, chain_id: ChainId, - ) -> MessengerResult<(u64, Vec)> { + ) -> MessengerResult<(u64, Vec)> { let chain_latest_block: u64 = match self.provider.block_number().await { Ok(n) => n, Err(e) => { @@ -204,57 +206,13 @@ impl Messenger for StarknetM } }; - if from_block > chain_latest_block { - // Nothing to fetch, we can skip waiting the next tick. - return Ok((chain_latest_block, vec![])); - } - - // +1 as the from_block counts as 1 block fetched. - let to_block = if from_block + max_blocks + 1 < chain_latest_block { - from_block + max_blocks - } else { - chain_latest_block - }; - - let mut l1_handler_txs: Vec = vec![]; + let pending_txs = self.fetch_pending_events(chain_id).await?; - info!(target: LOG_TARGET, "Gathering messages from block {} to block {}", from_block, to_block); - - let block_to_events = self - .fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) - .await - .map_err(|e| { - error!(target: LOG_TARGET, "Error fetching events: {:?}", e); - Error::SendError - })?; - - for (block_number, block_events) in block_to_events.iter() { - debug!( - target: LOG_TARGET, - block_number = %block_number, - events_count = %block_events.len(), - "Converting events of block into L1HandlerTx." - ); - - for event in block_events.iter() { - if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) { - if let Ok((from, to, selector)) = info_from_event(event) { - let hooker = Arc::clone(&self.hooker); - let is_message_accepted = hooker - .read() - .await - .verify_message_to_appchain(from, to, selector) - .await; - - if is_message_accepted { - l1_handler_txs.push(tx); - } - } - } - } + if from_block != chain_latest_block { + self.event_cache.write().await.clear(); } - Ok((to_block, l1_handler_txs)) + Ok((chain_latest_block, pending_txs)) } async fn send_messages( @@ -297,18 +255,11 @@ impl Messenger for StarknetM } } -/// Parses messages sent by cairo contracts to compute their hashes. -/// -/// Messages can also be labelled as EXE, which in this case generate a `Call` -/// additionally to the hash. fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec, Vec)> { let mut hashes: Vec = vec![]; let mut calls: Vec = vec![]; for m in messages { - // Field `to_address` is restricted to eth addresses space. So the - // `to_address` is set to 'EXE'/'MSG' to indicate that the message - // has to be executed or sent normally. let magic = m.to_address; if magic == EXE_MAGIC { @@ -325,7 +276,6 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec= 3 { calldata.extend(m.payload[2..].to_vec()); } @@ -333,15 +283,7 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec = vec![]; @@ -354,7 +296,6 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec Result Result Result<(FieldElement, FieldElement, error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted"); } - // See contract appchain_messaging.cairo for MessageSentToAppchain event. let from_address = event.keys[2]; let to_address = event.keys[3]; let entry_point_selector = event.data[0]; @@ -426,7 +362,6 @@ fn info_from_event(event: &EmittedEvent) -> Result<(FieldElement, FieldElement, #[cfg(test)] mod tests { - use katana_primitives::utils::transaction::compute_l1_handler_tx_hash; use starknet::macros::felt;