Skip to content

Commit

Permalink
feat(solis): update gather message to test only pending
Browse files Browse the repository at this point in the history
  • Loading branch information
kwiss committed May 28, 2024
1 parent f6705a4 commit 64538f8
Showing 1 changed file with 50 additions and 115 deletions.
165 changes: 50 additions & 115 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@ use starknet::macros::{felt, selector};
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{AnyProvider, JsonRpcClient, Provider};
use starknet::signers::{LocalWallet, SigningKey};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, trace, warn};
use url::Url;

use super::{Error, MessagingConfig, Messenger, MessengerResult, LOG_TARGET};

/// As messaging in starknet is only possible with EthAddress in the `to_address`
/// field, we have to set magic value to understand what the user want to do.
/// In the case of execution -> the felt 'EXE' will be passed.
/// And for normal messages, the felt 'MSG' is used.
/// Those values are very not likely a valid account address on starknet.
const MSG_MAGIC: FieldElement = felt!("0x4d5347");
const EXE_MAGIC: FieldElement = felt!("0x455845");

Expand All @@ -37,6 +32,7 @@ pub struct StarknetMessaging<EF: katana_executor::ExecutorFactory + Send + Sync>
sender_account_address: FieldElement,
messaging_contract_address: FieldElement,
hooker: Arc<AsyncRwLock<dyn KatanaHooker<EF> + Send + Sync>>,
event_cache: Arc<AsyncRwLock<HashSet<String>>>,
}

impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
Expand Down Expand Up @@ -65,44 +61,55 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
sender_account_address,
messaging_contract_address,
hooker,
event_cache: Arc::new(AsyncRwLock::new(HashSet::new())),
})
}

/// Fetches events for the given blocks range.
pub async fn fetch_events(
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut block_to_events: HashMap<u64, Vec<EmittedEvent>> = HashMap::new();

let filter = EventFilter {
from_block: Some(from_block),
to_block: Some(to_block),
address: Some(self.messaging_contract_address),
// TODO: this might come from the configuration actually.
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;

loop {
let event_page =
self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?;

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
.and_modify(|v| v.push(event.clone()))
.or_insert(vec![event]);
let filter = EventFilter {
from_block: Some(BlockId::Tag(BlockTag::Pending)),
to_block: Some(BlockId::Tag(BlockTag::Pending)),
address: Some(self.messaging_contract_address),
keys: None,
};

let event_page = self
.provider
.get_events(filter.clone(), continuation_token.clone(), 200)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e);
Error::SendError
})?;

for event in event_page.events {
let event_id = event.transaction_hash.to_string(); // Assuming `transaction_hash` is the unique identifier for the event

let mut cache = self.event_cache.write().await;
if cache.contains(&event_id) {
continue;
}
});

if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(&event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
l1_handler_txs.push(tx);
cache.insert(event_id);
}
}
}
}

continuation_token = event_page.continuation_token;

Expand All @@ -111,10 +118,9 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
}
}

Ok(block_to_events)
Ok(l1_handler_txs)
}

/// Sends an invoke TX on starknet.
async fn send_invoke_tx(&self, calls: Vec<Call>) -> Result<FieldElement> {
let signer = Arc::new(&self.wallet);

Expand All @@ -128,7 +134,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {

account.set_block_id(BlockId::Tag(BlockTag::Pending));

// TODO: we need to have maximum fee configurable.
let execution = account.execute(calls).fee_estimate_multiplier(10f64);
let estimated_fee = (execution.estimate_fee().await?.overall_fee) * 10u64.into();
let execution_with_fee = execution.max_fee(estimated_fee);
Expand All @@ -138,8 +143,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
match execution_with_fee.send().await {
Ok(tx) => {
info!(target: LOG_TARGET, "Transaction successful: {:?}", tx);
println!("tx: {:?}", tx);
println!("tx_hash: {:?}", tx.transaction_hash);
Ok(tx.transaction_hash)
}
Err(e) => {
Expand All @@ -149,7 +152,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
}
}

/// Sends messages hashes to settlement layer by sending a transaction.
async fn send_hashes(&self, mut hashes: Vec<FieldElement>) -> MessengerResult<FieldElement> {
hashes.retain(|&x| x != HASH_EXEC);

Expand Down Expand Up @@ -192,7 +194,7 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
from_block: u64,
max_blocks: u64,
chain_id: ChainId,
) -> MessengerResult<(u64, Vec<Self::MessageTransaction>)> {
) -> MessengerResult<(u64, Vec<L1HandlerTx>)> {
let chain_latest_block: u64 = match self.provider.block_number().await {
Ok(n) => n,
Err(e) => {
Expand All @@ -204,57 +206,13 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
};

if from_block > chain_latest_block {
// Nothing to fetch, we can skip waiting the next tick.
return Ok((chain_latest_block, vec![]));
}

// +1 as the from_block counts as 1 block fetched.
let to_block = if from_block + max_blocks + 1 < chain_latest_block {
from_block + max_blocks
} else {
chain_latest_block
};

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let pending_txs = self.fetch_pending_events(chain_id).await?;

info!(target: LOG_TARGET, "Gathering messages from block {} to block {}", from_block, to_block);

let block_to_events = self
.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error fetching events: {:?}", e);
Error::SendError
})?;

for (block_number, block_events) in block_to_events.iter() {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
);

for event in block_events.iter() {
if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
l1_handler_txs.push(tx);
}
}
}
}
if from_block != chain_latest_block {
self.event_cache.write().await.clear();
}

Ok((to_block, l1_handler_txs))
Ok((chain_latest_block, pending_txs))
}

async fn send_messages(
Expand Down Expand Up @@ -297,18 +255,11 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
}

/// Parses messages sent by cairo contracts to compute their hashes.
///
/// Messages can also be labelled as EXE, which in this case generate a `Call`
/// additionally to the hash.
fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement>, Vec<Call>)> {
let mut hashes: Vec<FieldElement> = vec![];
let mut calls: Vec<Call> = vec![];

for m in messages {
// Field `to_address` is restricted to eth addresses space. So the
// `to_address` is set to 'EXE'/'MSG' to indicate that the message
// has to be executed or sent normally.
let magic = m.to_address;

if magic == EXE_MAGIC {
Expand All @@ -325,23 +276,14 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement
let selector = m.payload[1];

let mut calldata = vec![];
// We must exclude the `to_address` and `selector` from the actual payload.
if m.payload.len() >= 3 {
calldata.extend(m.payload[2..].to_vec());
}

calls.push(Call { to, selector, calldata });
hashes.push(HASH_EXEC);
} else if magic == MSG_MAGIC {
// In the case of a regular message, we compute the message's hash
// which will then be sent in a transaction to be registered.

// As `to_address` is used by the magic, the `to_address` we want
// is the first element of the payload.
let to_address = m.payload[0];

// Then, the payload must be changed to only keep the rest of the
// data, without the first element that was the `to_address`.
let payload = &m.payload[1..];

let mut buf: Vec<u8> = vec![];
Expand All @@ -354,7 +296,6 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement

hashes.push(starknet_keccak(&buf));
} else {
// Skip the message if no valid magic number is found.
warn!(target: LOG_TARGET, magic = ?magic, "Invalid message to_address magic value.");
continue;
}
Expand All @@ -377,14 +318,11 @@ fn l1_handler_tx_from_event(event: &EmittedEvent, chain_id: ChainId) -> Result<L
error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted.");
}

// See contract appchain_messaging.cairo for MessageSentToAppchain event.
let from_address = event.keys[2];
let to_address = event.keys[3];
let entry_point_selector = event.data[0];
let nonce = event.data[1];

// Skip the length of the serialized array for the payload which is data[2].
// Payload starts at data[3].
let mut calldata = vec![from_address];
calldata.extend(&event.data[3..]);

Expand All @@ -395,7 +333,6 @@ fn l1_handler_tx_from_event(event: &EmittedEvent, chain_id: ChainId) -> Result<L
calldata,
chain_id,
message_hash,
// This is the min value paid on L1 for the message to be sent to L2.
paid_fee_on_l1: 30000_u128,
entry_point_selector,
version: FieldElement::ZERO,
Expand All @@ -416,7 +353,6 @@ fn info_from_event(event: &EmittedEvent) -> Result<(FieldElement, FieldElement,
error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted");
}

// See contract appchain_messaging.cairo for MessageSentToAppchain event.
let from_address = event.keys[2];
let to_address = event.keys[3];
let entry_point_selector = event.data[0];
Expand All @@ -426,7 +362,6 @@ fn info_from_event(event: &EmittedEvent) -> Result<(FieldElement, FieldElement,

#[cfg(test)]
mod tests {

use katana_primitives::utils::transaction::compute_l1_handler_tx_hash;
use starknet::macros::felt;

Expand Down

0 comments on commit 64538f8

Please sign in to comment.