Skip to content

Commit

Permalink
fix: katana update gather message remove pending in favor of latest only
Browse files Browse the repository at this point in the history
  • Loading branch information
kwiss committed Dec 6, 2024
1 parent 702b07f commit 2586493
Showing 1 changed file with 37 additions and 174 deletions.
211 changes: 37 additions & 174 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
) -> Result<Vec<EmittedEvent>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut block_to_events: HashMap<u64, Vec<EmittedEvent>> = HashMap::new();
let mut events = vec![];

let filter = EventFilter {
from_block: Some(from_block),
Expand All @@ -98,11 +98,10 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
.and_modify(|v| v.push(event.clone()))
.or_insert(vec![event]);
if event.block_number.is_some() {
// Blocks are processed in order as retrieved by `get_events`.
// This way we keep the order and ensure the messages are executed in order.
events.push(event);
}
});

Expand All @@ -113,73 +112,7 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
}
}

Ok(block_to_events)
}

async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;

loop {
debug!(target: LOG_TARGET, "Fetching pending events with continuation token: {:?}", continuation_token);

let filter = EventFilter {
from_block: Some(BlockId::Tag(BlockTag::Pending)),
to_block: Some(BlockId::Tag(BlockTag::Pending)),
address: Some(self.messaging_contract_address),
keys: None,
};

let event_page = self
.provider
.get_events(filter.clone(), continuation_token.clone(), 200)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e);
Error::SendError
})?;

debug!(target: LOG_TARGET, "Fetched {} pending events", event_page.events.len());

for event in event_page.events {
let event_id = event.transaction_hash.to_string();

// Check if we've already processed this event
let cache = self.event_cache.read().await;
if cache.contains(&event_id) {
debug!(target: LOG_TARGET, "Pending event {} already processed, skipping", event_id);
continue;
}
drop(cache);

if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(&event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;
if is_message_accepted {
debug!(target: LOG_TARGET, "Pending event {} accepted", event_id);
l1_handler_txs.push(tx);

// Add to cache after processing
let mut cache = self.event_cache.write().await;
cache.insert(event_id);
}
}
}
}

continuation_token = event_page.continuation_token;
if continuation_token.is_none() {
break;
}
}

debug!(target: LOG_TARGET, "Total pending transactions gathered: {}", l1_handler_txs.len());
Ok(l1_handler_txs)
Ok(events)
}

async fn send_invoke_tx(&self, calls: Vec<Call>) -> Result<FieldElement> {
Expand Down Expand Up @@ -267,120 +200,50 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
from_block: u64,
max_blocks: u64,
chain_id: ChainId,
) -> MessengerResult<(u64, Vec<L1HandlerTx>)> {
debug!(target: LOG_TARGET, "Starting gather_messages with from_block: {}, max_blocks: {}", from_block, max_blocks);

// First get the latest block number
) -> MessengerResult<(u64, Vec<Self::MessageTransaction>)> {
let chain_latest_block: u64 = match self.provider.block_number().await {
Ok(n) => {
debug!(target: LOG_TARGET, "Latest block number on chain: {}", n);
n
}
Err(e) => {
Ok(n) => n,
Err(_) => {
warn!(
target: LOG_TARGET,
"Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e
"Couldn't fetch settlement chain last block number. \nSkipped, retry at the \
next tick."
);
return Err(Error::SendError);
}
};

let mut l1_handler_txs = Vec::new();

// First gather pending events
debug!(target: LOG_TARGET, "Fetching pending events first");
let pending_txs = self.fetch_pending_events(chain_id).await?;
l1_handler_txs.extend(pending_txs);
debug!(target: LOG_TARGET, "Found {} pending transactions", l1_handler_txs.len());

// Then check if we need to process blocks

if from_block > chain_latest_block {
debug!(target: LOG_TARGET, "from_block ({}) > chain_latest_block ({}), returning with only pending events", from_block, chain_latest_block);
return Ok((chain_latest_block, l1_handler_txs));
// Nothing to fetch, we can skip waiting the next tick.
return Ok((chain_latest_block, vec![]));
}
// Calculate the block range to process
let to_block = if from_block + max_blocks < chain_latest_block {

// +1 as the from_block counts as 1 block fetched.
let to_block = if from_block + max_blocks + 1 < chain_latest_block {
from_block + max_blocks
} else {
chain_latest_block
};
debug!(target: LOG_TARGET, "Fetching confirmed events from block {} to {}", from_block, to_block);

// Now fetch events from confirmed blocks
match self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)).await {
Ok(events_by_block) => {
for (block_number, block_events) in events_by_block.iter() {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Processing confirmed events for block"
);

for event in block_events {
let event_id = event.transaction_hash.to_string();

// Check if we've already processed this event
let cache = self.event_cache.read().await;
if cache.contains(&event_id) {
debug!(target: LOG_TARGET, "Event {} already processed, skipping", event_id);
continue;
}
drop(cache); // Release the read lock

if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
debug!(
target: LOG_TARGET,
"Message from block {} accepted: from {:?} to {:?} with selector {:?}",
block_number, from, to, selector
);
l1_handler_txs.push(tx);

// Add to cache after processing
let mut cache = self.event_cache.write().await;
cache.insert(event_id);
} else {
debug!(
target: LOG_TARGET,
"Message from block {} not accepted: from {:?} to {:?} with selector {:?}",
block_number, from, to, selector
);
}
}
}
}

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
.await
.map_err(|_| Error::SendError)
.unwrap()
.iter()
.for_each(|e| {
debug!(
target: LOG_TARGET,
event = ?e,
"Converting event into L1HandlerTx."
);

if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
}
}
Err(e) => {
error!(target: LOG_TARGET, "Error fetching confirmed events: {:?}", e);
return Err(Error::SendError);
}
}

// We only clear the cache if we've moved to a new latest block to avoid reprocessing events
let previous_block = self.latest_block.load(Ordering::Relaxed);
if previous_block < chain_latest_block {
debug!(target: LOG_TARGET, "Moving to new latest block {} from {}, clearing cache", chain_latest_block, previous_block);
self.event_cache.write().await.clear();
self.latest_block.store(chain_latest_block, Ordering::Relaxed);
}

info!(
target: LOG_TARGET,
"Total messages gathered: {} (including pending)",
l1_handler_txs.len(),
);

});

Ok((to_block, l1_handler_txs))
}

Expand Down

0 comments on commit 2586493

Please sign in to comment.