Skip to content

Commit f1d40aa

Browse files
committed
feat: add mempool integration with execution sync and error isolation
1 parent 62efa15 commit f1d40aa

File tree

9 files changed

+550
-4
lines changed

9 files changed

+550
-4
lines changed

crates/execution/src/engine.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
66
use crate::{
77
database::{CipherBftDatabase, Provider},
8-
error::{ExecutionError, Result},
8+
error::{ExecutionError, Result, TxErrorCategory},
99
evm::CipherBftEvmConfig,
1010
precompiles::{GenesisValidatorData, StakingPrecompile},
1111
receipts::{
@@ -353,7 +353,26 @@ impl<P: Provider + Clone> ExecutionEngine<P> {
353353
let tx_start = Instant::now();
354354

355355
// Execute transaction
356-
let tx_result = self.evm_config.execute_transaction(&mut evm, tx_bytes)?;
356+
let tx_result = match self.evm_config.execute_transaction(&mut evm, tx_bytes) {
357+
Ok(result) => result,
358+
Err(e) => match e.category() {
359+
TxErrorCategory::Skip { reason } => {
360+
tracing::warn!(
361+
tx_index,
362+
?reason,
363+
"Skipping invalid transaction (mempool should catch this)"
364+
);
365+
continue;
366+
}
367+
TxErrorCategory::FailedReceipt => {
368+
tracing::warn!(tx_index, error = %e, "Transaction reverted, skipping");
369+
continue;
370+
}
371+
TxErrorCategory::Fatal => {
372+
return Err(e);
373+
}
374+
},
375+
};
357376

358377
// Record per-transaction metrics
359378
let tx_duration = tx_start.elapsed();
@@ -947,4 +966,42 @@ mod tests {
947966
// Verify beneficiary is set in sealed block header
948967
assert_eq!(sealed.header.beneficiary, beneficiary);
949968
}
969+
970+
#[test]
971+
fn test_error_isolation_skips_invalid_transactions() {
972+
// This test documents the expected behavior of error isolation.
973+
//
974+
// When a block contains transactions with invalid nonces (NonceTooHigh, NonceTooLow),
975+
// insufficient balance, or other validation errors, the execution engine should:
976+
//
977+
// 1. Skip the invalid transaction (no receipt generated)
978+
// 2. Continue processing remaining transactions in the block
979+
// 3. Return success with results from valid transactions only
980+
//
981+
// This prevents a single bad transaction from failing an entire block,
982+
// which would cause execution-consensus divergence.
983+
//
984+
// Implementation note: The actual error isolation is implemented in
985+
// execute_block() which handles TxErrorCategory::Skip by continuing
986+
// the transaction loop instead of returning an error.
987+
//
988+
// Testing this end-to-end requires creating signed transactions with
989+
// specific nonces, which requires test infrastructure for:
990+
// - Generating valid ECDSA signatures
991+
// - Setting up account state with specific nonces
992+
// - Creating transactions that will trigger NonceTooHigh
993+
//
994+
// For now, this test documents the expected behavior.
995+
// Full integration testing is done via devnet MassMint scenarios.
996+
997+
let engine = create_test_engine();
998+
999+
// Verify engine is created correctly
1000+
assert_eq!(engine.chain_config.chain_id, 85300);
1001+
1002+
// The actual error isolation behavior is tested by:
1003+
// 1. Running the devnet with MassMint script
1004+
// 2. Verifying blocks execute even when some transactions have wrong nonces
1005+
// 3. Checking that valid transactions are included and executed
1006+
}
9501007
}

crates/execution/src/error.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,59 @@ impl DatabaseError {
143143
}
144144

145145
impl DBErrorMarker for DatabaseError {}
146+
147+
/// Categorizes transaction execution errors for handling decisions.
148+
#[derive(Debug, Clone, PartialEq, Eq)]
149+
pub enum TxErrorCategory {
150+
/// Skip transaction, continue block execution.
151+
Skip {
152+
/// The reason for skipping this transaction.
153+
reason: SkipReason,
154+
},
155+
/// Include transaction with failed receipt (EVM revert).
156+
FailedReceipt,
157+
/// Fatal error - halt block execution.
158+
Fatal,
159+
}
160+
161+
/// Reason for skipping a transaction.
162+
#[derive(Debug, Clone, PartialEq, Eq)]
163+
pub enum SkipReason {
164+
/// Transaction nonce is higher than expected.
165+
NonceTooHigh,
166+
/// Transaction nonce is lower than expected (already executed).
167+
NonceTooLow,
168+
/// Account has insufficient balance for transaction.
169+
InsufficientBalance,
170+
/// Transaction failed basic validation.
171+
InvalidTransaction,
172+
}
173+
174+
impl ExecutionError {
175+
/// Categorize this error for handling decision.
176+
pub fn category(&self) -> TxErrorCategory {
177+
let error_str = format!("{:?}", self);
178+
179+
if error_str.contains("NonceTooHigh") {
180+
TxErrorCategory::Skip {
181+
reason: SkipReason::NonceTooHigh,
182+
}
183+
} else if error_str.contains("NonceTooLow") {
184+
TxErrorCategory::Skip {
185+
reason: SkipReason::NonceTooLow,
186+
}
187+
} else if error_str.contains("InsufficientFunds") || error_str.contains("insufficient") {
188+
TxErrorCategory::Skip {
189+
reason: SkipReason::InsufficientBalance,
190+
}
191+
} else if matches!(self, ExecutionError::InvalidTransaction(_)) {
192+
TxErrorCategory::Skip {
193+
reason: SkipReason::InvalidTransaction,
194+
}
195+
} else if matches!(self, ExecutionError::Evm(_)) {
196+
TxErrorCategory::FailedReceipt
197+
} else {
198+
TxErrorCategory::Fatal
199+
}
200+
}
201+
}

crates/node/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@ cipherbft-metrics = { path = "../metrics" }
2626
alloy-primitives = { version = "1", features = ["serde"] }
2727
alloy-rlp = { workspace = true }
2828
alloy-consensus = { workspace = true }
29+
alloy-eips = { version = "1" }
2930

3031
# Reth primitives (for transaction parsing in block execution)
3132
reth-primitives = { workspace = true }
3233
reth-primitives-traits = { workspace = true }
3334

35+
# Reth transaction pool (for WorkerPoolAdapter)
36+
reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.10.0" }
37+
3438
# Async runtime
3539
tokio = { workspace = true, features = ["full", "signal"] }
3640
tokio-util = { workspace = true }

crates/node/src/execution_bridge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ mod tests {
787787

788788
#[tokio::test]
789789
async fn test_set_genesis_block_hash() {
790-
let bridge = create_default_bridge().unwrap();
790+
let (bridge, _temp_dir) = create_default_bridge().unwrap();
791791

792792
// Initially should be B256::ZERO
793793
let initial_hash = bridge.last_block_hash.read().map(|guard| *guard).unwrap();

crates/node/src/execution_sync.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
//! Execution-Consensus synchronization tracking.
2+
3+
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
4+
use tracing::error;
5+
6+
/// Configuration for execution sync tracking.
7+
///
8+
/// Note: Named `ExecutionSyncConfig` to avoid collision with
9+
/// `cipherbft_consensus::ExecutionSyncConfig`.
10+
#[derive(Clone, Debug)]
11+
pub struct ExecutionSyncConfig {
12+
/// Maximum blocks execution can fall behind before halting.
13+
pub max_divergence: u64,
14+
/// Maximum consecutive failures before halting.
15+
pub max_consecutive_failures: u32,
16+
}
17+
18+
impl Default for ExecutionSyncConfig {
19+
fn default() -> Self {
20+
Self {
21+
max_divergence: 10,
22+
max_consecutive_failures: 5,
23+
}
24+
}
25+
}
26+
27+
/// Action to take after execution failure.
28+
#[derive(Debug, Clone, PartialEq, Eq)]
29+
pub enum SyncAction {
30+
Continue,
31+
Halt { reason: String },
32+
}
33+
34+
/// Tracks execution progress relative to consensus.
35+
pub struct ExecutionSyncTracker {
36+
last_executed: AtomicU64,
37+
consecutive_failures: AtomicU32,
38+
config: ExecutionSyncConfig,
39+
}
40+
41+
impl ExecutionSyncTracker {
42+
pub fn new(config: ExecutionSyncConfig) -> Self {
43+
Self {
44+
last_executed: AtomicU64::new(0),
45+
consecutive_failures: AtomicU32::new(0),
46+
config,
47+
}
48+
}
49+
50+
pub fn on_success(&self, height: u64) {
51+
self.last_executed.store(height, Ordering::SeqCst);
52+
self.consecutive_failures.store(0, Ordering::SeqCst);
53+
}
54+
55+
pub fn on_failure(&self, consensus_height: u64, error: &str) -> SyncAction {
56+
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
57+
let last_executed = self.last_executed.load(Ordering::SeqCst);
58+
let divergence = consensus_height.saturating_sub(last_executed);
59+
60+
error!(
61+
last_executed,
62+
consensus_height,
63+
divergence,
64+
consecutive_failures = failures,
65+
error,
66+
"Execution failed"
67+
);
68+
69+
if divergence > self.config.max_divergence {
70+
return SyncAction::Halt {
71+
reason: format!(
72+
"Divergence {} exceeds max {}. Last executed: {}, consensus: {}",
73+
divergence, self.config.max_divergence, last_executed, consensus_height
74+
),
75+
};
76+
}
77+
78+
if failures > self.config.max_consecutive_failures {
79+
return SyncAction::Halt {
80+
reason: format!(
81+
"Consecutive failures {} exceeds max {}",
82+
failures, self.config.max_consecutive_failures
83+
),
84+
};
85+
}
86+
87+
SyncAction::Continue
88+
}
89+
90+
pub fn last_executed(&self) -> u64 {
91+
self.last_executed.load(Ordering::SeqCst)
92+
}
93+
}
94+
95+
#[cfg(test)]
96+
mod tests {
97+
use super::*;
98+
99+
#[test]
100+
fn test_success_resets_failures() {
101+
let tracker = ExecutionSyncTracker::new(ExecutionSyncConfig::default());
102+
tracker.on_failure(5, "test");
103+
tracker.on_failure(6, "test");
104+
tracker.on_success(7);
105+
assert_eq!(tracker.consecutive_failures.load(Ordering::SeqCst), 0);
106+
assert_eq!(tracker.last_executed(), 7);
107+
}
108+
109+
#[test]
110+
fn test_divergence_triggers_halt() {
111+
let config = ExecutionSyncConfig {
112+
max_divergence: 5,
113+
max_consecutive_failures: 100,
114+
};
115+
let tracker = ExecutionSyncTracker::new(config);
116+
tracker.on_success(10);
117+
let action = tracker.on_failure(16, "test");
118+
assert!(matches!(action, SyncAction::Halt { .. }));
119+
}
120+
121+
#[test]
122+
fn test_consecutive_failures_triggers_halt() {
123+
let config = ExecutionSyncConfig {
124+
max_divergence: 100,
125+
max_consecutive_failures: 3,
126+
};
127+
let tracker = ExecutionSyncTracker::new(config);
128+
tracker.on_success(10);
129+
assert_eq!(tracker.on_failure(11, "e"), SyncAction::Continue);
130+
assert_eq!(tracker.on_failure(11, "e"), SyncAction::Continue);
131+
assert_eq!(tracker.on_failure(11, "e"), SyncAction::Continue);
132+
assert!(matches!(
133+
tracker.on_failure(11, "e"),
134+
SyncAction::Halt { .. }
135+
));
136+
}
137+
}

crates/node/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@
77
pub mod client_config;
88
pub mod config;
99
pub mod execution_bridge;
10+
pub mod execution_sync;
1011
pub mod genesis_bootstrap;
1112
pub mod key_cli;
13+
pub mod mempool_state;
1214
pub mod network;
1315
pub mod network_api;
1416
pub mod node;
1517
pub mod supervisor;
1618
pub mod util;
19+
pub mod worker_pool_adapter;
1720

1821
pub use client_config::ClientConfig;
1922
pub use config::{
@@ -23,11 +26,14 @@ pub use config::{
2326
DEFAULT_RPC_HTTP_PORT, DEFAULT_RPC_WS_PORT,
2427
};
2528
pub use execution_bridge::ExecutionBridge;
29+
pub use execution_sync::{ExecutionSyncConfig, ExecutionSyncTracker, SyncAction};
2630
pub use genesis_bootstrap::{
2731
GeneratedValidator, GenesisGenerationResult, GenesisGenerator, GenesisGeneratorConfig,
2832
GenesisLoader, ValidatorKeyFile,
2933
};
3034
pub use key_cli::{execute_keys_command, KeysCommand};
35+
pub use mempool_state::ExecutionStateValidator;
3136
pub use network_api::{NodeNetworkApi, TcpNetworkApi};
3237
pub use node::Node;
3338
pub use supervisor::{NodeSupervisor, ShutdownError};
39+
pub use worker_pool_adapter::WorkerPoolAdapter;

0 commit comments

Comments
 (0)