Skip to content

Commit 267b27d

Browse files
authored
feat: improve UpdateFcsHead to add reverted transations to tx pool (#406)
1 parent e7ba7aa commit 267b27d

File tree

2 files changed

+229
-20
lines changed

2 files changed

+229
-20
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use rollup_node_providers::L1MessageProvider;
2121
use rollup_node_sequencer::{Sequencer, SequencerEvent};
2222
use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle};
2323
use rollup_node_watcher::L1Notification;
24-
use scroll_alloy_consensus::TxL1Message;
24+
use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
2525
use scroll_alloy_hardforks::ScrollHardforks;
2626
use scroll_alloy_network::Scroll;
2727
use scroll_alloy_provider::ScrollEngineApi;
@@ -356,13 +356,23 @@ impl<
356356
let _ = tx.send(self.network.handle().clone());
357357
}
358358
ChainOrchestratorCommand::UpdateFcsHead((head, sender)) => {
359+
// Collect transactions of reverted blocks from l2 client.
360+
let reverted_transactions = self
361+
.collect_reverted_txs_in_range(
362+
head.number.saturating_add(1),
363+
self.engine.fcs().head_block_info().number,
364+
)
365+
.await?;
359366
self.engine.update_fcs(Some(head), None, None).await?;
360367
self.database
361368
.tx_mut(move |tx| async move {
362369
tx.purge_l1_message_to_l2_block_mappings(Some(head.number + 1)).await?;
363370
tx.set_l2_head_block_number(head.number).await
364371
})
365372
.await?;
373+
374+
// Add all reverted transactions to the transaction pool.
375+
self.reinsert_txs_into_pool(reverted_transactions).await;
366376
self.notify(ChainOrchestratorEvent::FcsHeadUpdated(head));
367377
let _ = sender.send(());
368378
}
@@ -560,6 +570,43 @@ impl<
560570
Ok(Some(ChainOrchestratorEvent::NewL1Block(block_number)))
561571
}
562572

573+
/// Collects reverted L2 transactions in [from, to], excluding L1 messages.
574+
async fn collect_reverted_txs_in_range(
575+
&self,
576+
from: u64,
577+
to: u64,
578+
) -> Result<Vec<ScrollTxEnvelope>, ChainOrchestratorError> {
579+
let mut reverted_transactions: Vec<ScrollTxEnvelope> = Vec::new();
580+
for number in from..=to {
581+
let block = self
582+
.l2_client
583+
.get_block_by_number(number.into())
584+
.full()
585+
.await?
586+
.ok_or_else(|| ChainOrchestratorError::L2BlockNotFoundInL2Client(number))?;
587+
588+
let block = block.into_consensus().map_transactions(|tx| tx.inner.into_inner());
589+
reverted_transactions.extend(
590+
block.into_body().transactions.into_iter().filter(|tx| !tx.is_l1_message()),
591+
);
592+
}
593+
Ok(reverted_transactions)
594+
}
595+
596+
/// Reinserts given L2 transactions into the transaction pool.
597+
async fn reinsert_txs_into_pool(&self, txs: Vec<ScrollTxEnvelope>) {
598+
for tx in txs {
599+
let encoded_tx = tx.encoded_2718();
600+
if let Err(err) = self.l2_client.send_raw_transaction(&encoded_tx).await {
601+
tracing::warn!(
602+
target: "scroll::chain_orchestrator",
603+
?err,
604+
"failed to reinsert reverted transaction into pool"
605+
);
606+
}
607+
}
608+
}
609+
563610
/// Handles a reorganization event by deleting all indexed data which is greater than the
564611
/// provided block number.
565612
async fn handle_l1_reorg(
@@ -570,27 +617,36 @@ impl<
570617
let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } =
571618
self.database.unwind(genesis_hash, block_number).await?;
572619

573-
let l2_head_block_info = if let Some(block_number) = l2_head_block_number {
574-
// Fetch the block hash of the new L2 head block.
575-
let block_hash = self
576-
.l2_client
577-
.get_block_by_number(block_number.into())
578-
.full()
579-
.await?
580-
.expect("L2 head block must exist")
581-
.header
582-
.hash_slow();
620+
let (l2_head_block_info, reverted_transactions) =
621+
if let Some(block_number) = l2_head_block_number {
622+
// Fetch the block hash of the new L2 head block.
623+
let block_hash = self
624+
.l2_client
625+
.get_block_by_number(block_number.into())
626+
.full()
627+
.await?
628+
.expect("L2 head block must exist")
629+
.header
630+
.hash_slow();
631+
632+
// Cancel the inflight payload building job if the head has changed.
633+
if let Some(s) = self.sequencer.as_mut() {
634+
s.cancel_payload_building_job();
635+
};
636+
637+
// Collect transactions of reverted blocks from l2 client.
638+
let reverted_transactions = self
639+
.collect_reverted_txs_in_range(
640+
block_number.saturating_add(1),
641+
self.engine.fcs().head_block_info().number,
642+
)
643+
.await?;
583644

584-
// Cancel the inflight payload building job if the head has changed.
585-
if let Some(s) = self.sequencer.as_mut() {
586-
s.cancel_payload_building_job();
645+
(Some(BlockInfo { number: block_number, hash: block_hash }), reverted_transactions)
646+
} else {
647+
(None, Vec::new())
587648
};
588649

589-
Some(BlockInfo { number: block_number, hash: block_hash })
590-
} else {
591-
None
592-
};
593-
594650
// If the L1 reorg is before the origin of the inflight payload building job, cancel it.
595651
if Some(l1_block_number) <
596652
self.sequencer
@@ -608,6 +664,9 @@ impl<
608664
self.engine.update_fcs(l2_head_block_info, l2_safe_block_info, None).await?;
609665
}
610666

667+
// Add all reverted transactions to the transaction pool.
668+
self.reinsert_txs_into_pool(reverted_transactions).await;
669+
611670
let event = ChainOrchestratorEvent::L1Reorg {
612671
l1_block_number,
613672
queue_index,

crates/node/tests/e2e.rs

Lines changed: 151 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! End-to-end tests for the rollup node.
22
3-
use alloy_eips::BlockNumberOrTag;
3+
use alloy_eips::{eip2718::Encodable2718, BlockNumberOrTag};
44
use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256};
55
use alloy_rpc_types_eth::Block;
66
use alloy_signer::Signer;
@@ -1451,6 +1451,152 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> {
14511451
Ok(())
14521452
}
14531453

1454+
/// Test that when L2 block reorg happens due to an L1 reorg, the transactions that were reverted
1455+
/// are requeued.
1456+
#[tokio::test]
1457+
async fn requeues_transactions_after_l1_reorg() -> eyre::Result<()> {
1458+
reth_tracing::init_test_tracing();
1459+
1460+
let chain_spec = (*SCROLL_DEV).clone();
1461+
let mut config = default_sequencer_test_scroll_rollup_node_config();
1462+
config.sequencer_args.auto_start = false;
1463+
config.sequencer_args.block_time = 0;
1464+
1465+
let (mut nodes, _tasks, wallet) =
1466+
setup_engine(config, 1, chain_spec.clone(), false, false).await?;
1467+
let node = nodes.pop().expect("node exists");
1468+
1469+
let rnm_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
1470+
let mut events = rnm_handle.get_event_listener().await?;
1471+
let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();
1472+
1473+
l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?;
1474+
let _ = events.next().await;
1475+
let _ = events.next().await;
1476+
1477+
// Let the sequencer build 10 blocks.
1478+
for i in 1..=10 {
1479+
rnm_handle.build_block();
1480+
let b = wait_for_block_sequenced_5s(&mut events, i).await?;
1481+
tracing::info!(target: "scroll::test", block_number = ?b.header.number, block_hash = ?b.header.hash_slow(), "Sequenced block");
1482+
}
1483+
1484+
// Send a L1 message and wait for it to be indexed.
1485+
let l1_message_notification = L1Notification::L1Message {
1486+
message: TxL1Message {
1487+
queue_index: 0,
1488+
gas_limit: 21000,
1489+
to: Default::default(),
1490+
value: Default::default(),
1491+
sender: Default::default(),
1492+
input: Default::default(),
1493+
},
1494+
block_number: 2,
1495+
block_timestamp: 0,
1496+
};
1497+
1498+
// Build a L2 block with L1 message, so we can revert it later.
1499+
l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?;
1500+
l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(2))).await?;
1501+
wait_for_event_5s(&mut events, ChainOrchestratorEvent::L1MessageCommitted(0)).await?;
1502+
wait_for_event_5s(&mut events, ChainOrchestratorEvent::NewL1Block(2)).await?;
1503+
rnm_handle.build_block();
1504+
wait_for_block_sequenced_5s(&mut events, 11).await?;
1505+
1506+
// Inject a user transaction and force the sequencer to include it in the next block
1507+
let wallet = Arc::new(Mutex::new(wallet));
1508+
let tx = generate_tx(wallet.clone()).await;
1509+
let injected_tx_bytes: Vec<u8> = tx.clone().into();
1510+
node.rpc.inject_tx(tx).await?;
1511+
1512+
rnm_handle.build_block();
1513+
let block_with_tx = wait_for_block_sequenced_5s(&mut events, 12).await?;
1514+
assert!(
1515+
block_contains_raw_tx(&block_with_tx, &injected_tx_bytes),
1516+
"block 11 should contain the injected transaction before the reorg"
1517+
);
1518+
1519+
// Trigger an L1 reorg that reverts the block containing the transaction
1520+
l1_watcher_tx.send(Arc::new(L1Notification::Reorg(1))).await?;
1521+
wait_for_event_predicate_5s(&mut events, |event| {
1522+
matches!(event, ChainOrchestratorEvent::L1Reorg { l1_block_number: 1, .. })
1523+
})
1524+
.await?;
1525+
1526+
// Build the next block – the reverted transaction should have been requeued
1527+
rnm_handle.build_block();
1528+
let reseq_block = wait_for_block_sequenced_5s(&mut events, 11).await?;
1529+
assert!(
1530+
block_contains_raw_tx(&reseq_block, &injected_tx_bytes),
1531+
"re-sequenced block should contain the reverted transaction"
1532+
);
1533+
1534+
Ok(())
1535+
}
1536+
1537+
/// Test that when the FCS head is reset to an earlier block via `UpdateFcsHead`,
1538+
/// the transactions from reverted blocks are requeued into the tx pool and can
1539+
/// be included again.
1540+
#[tokio::test]
1541+
async fn requeues_transactions_after_update_fcs_head() -> eyre::Result<()> {
1542+
reth_tracing::init_test_tracing();
1543+
1544+
let chain_spec = (*SCROLL_DEV).clone();
1545+
let mut config = default_sequencer_test_scroll_rollup_node_config();
1546+
config.sequencer_args.auto_start = false;
1547+
config.sequencer_args.block_time = 0;
1548+
1549+
let (mut nodes, _tasks, wallet) =
1550+
setup_engine(config, 1, chain_spec.clone(), false, false).await?;
1551+
let node = nodes.pop().expect("node exists");
1552+
1553+
let handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
1554+
let mut events = handle.get_event_listener().await?;
1555+
1556+
// Set L1 synced to allow sequencing.
1557+
let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();
1558+
l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?;
1559+
let _ = events.next().await;
1560+
let _ = events.next().await;
1561+
1562+
// Build a few blocks and remember block #4 as the future reset target.
1563+
let mut target_head: Option<BlockInfo> = None;
1564+
for i in 1..=4 {
1565+
handle.build_block();
1566+
let b = wait_for_block_sequenced_5s(&mut events, i).await?;
1567+
if i == 4 {
1568+
target_head = Some(BlockInfo { number: b.header.number, hash: b.header.hash_slow() });
1569+
}
1570+
}
1571+
1572+
// Inject a user transaction and include it in block 5.
1573+
let wallet = Arc::new(Mutex::new(wallet));
1574+
let tx = generate_tx(wallet.clone()).await;
1575+
let injected_tx_bytes: Vec<u8> = tx.clone().into();
1576+
node.rpc.inject_tx(tx).await?;
1577+
1578+
handle.build_block();
1579+
let block_with_tx = wait_for_block_sequenced_5s(&mut events, 5).await?;
1580+
assert!(
1581+
block_contains_raw_tx(&block_with_tx, &injected_tx_bytes),
1582+
"block 5 should contain the injected transaction before the FCS reset",
1583+
);
1584+
1585+
// Reset FCS head back to block 4; this should collect block 5's txs and requeue them.
1586+
let head = target_head.expect("target head exists");
1587+
handle.update_fcs_head(head).await.expect("update_fcs_head should succeed");
1588+
1589+
// Build the next block – the reverted transaction should have been requeued and included.
1590+
handle.build_block();
1591+
let reseq_block = wait_for_block_sequenced_5s(&mut events, 5).await?;
1592+
assert!(
1593+
block_contains_raw_tx(&reseq_block, &injected_tx_bytes),
1594+
"re-sequenced block should contain the reverted transaction after FCS reset",
1595+
);
1596+
1597+
Ok(())
1598+
}
1599+
14541600
/// Tests that a sequencer and follower node can produce blocks using a custom local genesis
14551601
/// configuration and properly propagate them between nodes.
14561602
#[tokio::test]
@@ -2176,3 +2322,7 @@ async fn assert_latest_block_on_rpc_by_hash(
21762322
)
21772323
.await;
21782324
}
2325+
2326+
fn block_contains_raw_tx(block: &ScrollBlock, raw_tx: &[u8]) -> bool {
2327+
block.body.transactions.iter().any(|tx| tx.encoded_2718().as_slice() == raw_tx)
2328+
}

0 commit comments

Comments
 (0)