diff --git a/Cargo.lock b/Cargo.lock index 74659b4d1..2ef4414ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3774,6 +3774,26 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if", + "dashmap 5.5.3", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "group" version = "0.13.0" @@ -5566,6 +5586,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -5576,6 +5602,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "notify" version = "8.2.0" @@ -9436,6 +9468,7 @@ dependencies = [ "reth-rpc-eth-types", "reth-rpc-layer", "reth-rpc-server-types", + "reth-seismic-rpc", "reth-storage-api", "reth-tasks", "reth-tracing", @@ -9949,9 +9982,11 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-signer-local", + "dashmap 6.1.0", "enr", - "eyre", "futures", + "governor", + "http", "jsonrpsee 0.26.0", "jsonrpsee-types 0.26.0", "k256", @@ -9962,7 +9997,6 @@ dependencies = [ "reth-evm", "reth-evm-ethereum", "reth-node-api", - "reth-node-builder", "reth-node-ethereum", "reth-payload-builder", "reth-primitives", @@ -9995,6 +10029,7 @@ dependencies = [ "serde", "thiserror 2.0.17", "tokio", + "tower 0.5.2", "tracing", ] @@ -11870,6 +11905,15 @@ dependencies = [ "sha1", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" diff --git a/bin/seismic-reth/src/main.rs b/bin/seismic-reth/src/main.rs index 0db6d7472..0c2c7886b 100644 --- a/bin/seismic-reth/src/main.rs +++ b/bin/seismic-reth/src/main.rs @@ -6,10 +6,14 @@ use clap::Parser; use jsonrpsee_http_client::HttpClientBuilder; use reth::cli::Cli; use reth_cli_commands::node::NoArgs; +use reth_node_builder::Node; use reth_node_core::node_config::NodeConfig; use reth_seismic_cli::chainspec::SeismicChainSpecParser; use reth_seismic_node::node::SeismicNode; -use reth_seismic_rpc::ext::{EthApiExt, EthApiOverrideServer, SeismicApi, SeismicApiServer}; +use reth_seismic_rpc::{ + ext::{EthApiExt, EthApiOverrideServer, SeismicApi, SeismicApiServer}, + rate_limiter::{RateLimitConfig, SeismicRateLimiter}, +}; use reth_tracing::tracing::*; use seismic_enclave::{ @@ -84,8 +88,33 @@ fn main() { // building additional endpoints seismic api let seismic_api = SeismicApi::new(purpose_keys.clone()); + // Configure rate limiting + let rate_limiter = SeismicRateLimiter::new(RateLimitConfig { + requests_per_second: 100, // 100 requests per second per IP + burst_size: 50, // Allow bursts of up to 50 requests + limited_methods: None, // Empty = limit all methods (except exempt) + exempt_methods: vec![ + "eth_chainId".to_string(), + "eth_blockNumber".to_string(), + "net_version".to_string(), + "web3_clientVersion".to_string(), + "seismic_getTeePublicKey".to_string(), + ], + exempt_ips: vec![ + // Add internal service IPs here if needed + // "127.0.0.1".parse().unwrap(), + ], + }); + + info!(target: "reth::cli", "Rate limiting configured: 100 req/s, burst 50"); + + let seismic_node = SeismicNode::default(); + let add_ons = seismic_node.add_ons().with_rpc_middleware(rate_limiter); + let node = builder - .node(SeismicNode::default()) + .with_types::() + .with_components(seismic_node.components_builder()) + .with_add_ons(add_ons) .extend_rpc_modules(move |ctx| { // replace eth_ namespace ctx.modules.replace_configured( diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index b824e76da..54d64d8b6 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -30,6 +30,7 @@ reth-transaction-pool.workspace = true reth-storage-api.workspace = true reth-chain-state.workspace = true reth-evm.workspace = true +reth-seismic-rpc.workspace = true # rpc/net jsonrpsee = { workspace = true, features = ["server"] } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 11971afd9..61d1745da 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -75,8 +75,8 @@ pub use reth_ipc::server::{ Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder, }; pub use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection}; +use reth_seismic_rpc::rate_limiter::ClientIpExtractorLayer; pub use tower::layer::util::{Identity, Stack}; - /// Auth server utilities. pub mod auth; @@ -1317,6 +1317,7 @@ impl RpcServerConfig { let server = ServerBuilder::new() .set_http_middleware( tower::ServiceBuilder::new() + .layer(ClientIpExtractorLayer) .option_layer(Self::maybe_cors_layer(cors)?) .option_layer(Self::maybe_jwt_layer(self.jwt_secret)) .option_layer(Self::maybe_compression_layer( @@ -1371,6 +1372,7 @@ impl RpcServerConfig { .set_config(config.ws_only().build()) .set_http_middleware( tower::ServiceBuilder::new() + .layer(ClientIpExtractorLayer) .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) .option_layer(Self::maybe_jwt_layer(self.jwt_secret)), ) @@ -1396,6 +1398,7 @@ impl RpcServerConfig { .set_config(config.http_only().build()) .set_http_middleware( tower::ServiceBuilder::new() + .layer(ClientIpExtractorLayer) .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) .option_layer(Self::maybe_jwt_layer(self.jwt_secret)) .option_layer(Self::maybe_compression_layer(self.http_disable_compression)), diff --git a/crates/rpc/rpc-builder/tests/it/main.rs b/crates/rpc/rpc-builder/tests/it/main.rs index f5dc3003e..e55da1250 100644 --- a/crates/rpc/rpc-builder/tests/it/main.rs +++ b/crates/rpc/rpc-builder/tests/it/main.rs @@ -3,6 +3,7 @@ mod auth; mod http; mod middleware; +mod rate_limiter; mod serde; mod startup; pub mod utils; diff --git a/crates/rpc/rpc-builder/tests/it/rate_limiter.rs b/crates/rpc/rpc-builder/tests/it/rate_limiter.rs new file mode 100644 index 000000000..d23e73ac7 --- /dev/null +++ b/crates/rpc/rpc-builder/tests/it/rate_limiter.rs @@ -0,0 +1,239 @@ +//! Integration tests for the Seismic rate limiter middleware. +//! +//! These tests verify that the rate limiter correctly limits RPC requests +//! when integrated with the full RPC server stack. + +use crate::utils::{test_address, test_rpc_builder}; +use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest}; +use reth_rpc_builder::{RpcServerConfig, TransportRpcModuleConfig}; +use reth_rpc_eth_api::EthApiClient; +use reth_rpc_server_types::RpcModuleSelection; +use reth_seismic_rpc::rate_limiter::{RateLimitConfig, SeismicRateLimiter}; + +/// Test that rate limiter middleware is properly applied to RPC server +#[tokio::test(flavor = "multi_thread")] +async fn test_rate_limiter_integration() { + let builder = test_rpc_builder(); + let eth_api = builder.eth_api_builder().enable_storage_apis(true).build(); + let modules = + builder.build(TransportRpcModuleConfig::set_http(RpcModuleSelection::All), eth_api); + + // Create rate limiter with very restrictive settings for testing + let rate_limiter = SeismicRateLimiter::new(RateLimitConfig { + requests_per_second: 2, + burst_size: 2, + limited_methods: None, // limit all + exempt_methods: vec![], // no exemptions + exempt_ips: vec![], + }); + + let handle = RpcServerConfig::http(Default::default()) + .with_http_address(test_address()) + .set_rpc_middleware(rate_limiter) + .start(&modules) + .await + .unwrap(); + + let client = handle.http_client().unwrap(); + + // First two requests should succeed (burst allows 2) + let result1 = + EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result1.is_ok(), "First request should succeed"); + + let result2 = + EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result2.is_ok(), "Second request should succeed"); + + // Third request should be rate limited + let result3 = + EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result3.is_err(), "Third request should be rate limited"); + + // Check that the error is a rate limit error + let err = result3.unwrap_err(); + let err_str = err.to_string(); + assert!( + err_str.contains("Rate limited") || err_str.contains("-32029"), + "Error should indicate rate limiting: {}", + err_str + ); +} + +/// Test that exempt methods bypass rate limiting +#[tokio::test(flavor = "multi_thread")] +async fn test_rate_limiter_exempt_methods() { + let builder = test_rpc_builder(); + let eth_api = builder.eth_api_builder().enable_storage_apis(true).build(); + let modules = + builder.build(TransportRpcModuleConfig::set_http(RpcModuleSelection::All), eth_api); + + // Create rate limiter that exempts eth_protocolVersion + let rate_limiter = SeismicRateLimiter::new(RateLimitConfig { + requests_per_second: 1, + burst_size: 1, + limited_methods: None, + exempt_methods: vec!["eth_protocolVersion".to_string()], + exempt_ips: vec![], + }); + + let handle = RpcServerConfig::http(Default::default()) + .with_http_address(test_address()) + .set_rpc_middleware(rate_limiter) + .start(&modules) + .await + .unwrap(); + + let client = handle.http_client().unwrap(); + + // Make many requests to the exempt method - all should succeed + for i in 0..10 { + let result = EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result.is_ok(), "Exempt method request {} should succeed", i); + } +} + +/// Test that rate limiting works with specific limited methods +#[tokio::test(flavor = "multi_thread")] +async fn test_rate_limiter_specific_methods() { + let builder = test_rpc_builder(); + let eth_api = builder.eth_api_builder().enable_storage_apis(true).build(); + let modules = + builder.build(TransportRpcModuleConfig::set_http(RpcModuleSelection::All), eth_api); + + // Only rate limit eth_chainId, leave others unlimited + let rate_limiter = SeismicRateLimiter::new(RateLimitConfig { + requests_per_second: 1, + burst_size: 1, + limited_methods: Some(vec!["eth_chainId".to_string()]), + exempt_methods: vec![], + exempt_ips: vec![], + }); + + let handle = RpcServerConfig::http(Default::default()) + .with_http_address(test_address()) + .set_rpc_middleware(rate_limiter) + .start(&modules) + .await + .unwrap(); + + let client = handle.http_client().unwrap(); + + // eth_protocolVersion is not in limited_methods, should be unlimited + for i in 0..10 { + let result = EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result.is_ok(), "Non-limited method request {} should succeed", i); + } + + // eth_chainId is limited, first request succeeds + let chain_result1 = + EthApiClient::::chain_id(&client) + .await; + assert!(chain_result1.is_ok(), "First chainId request should succeed"); + + // Second chainId request should be rate limited + let chain_result2 = + EthApiClient::::chain_id(&client) + .await; + assert!(chain_result2.is_err(), "Second chainId request should be rate limited"); +} + +/// Test rate limiter with default configuration +#[tokio::test(flavor = "multi_thread")] +async fn test_rate_limiter_default_config() { + let builder = test_rpc_builder(); + let eth_api = builder.eth_api_builder().enable_storage_apis(true).build(); + let modules = + builder.build(TransportRpcModuleConfig::set_http(RpcModuleSelection::All), eth_api); + + // Use default config (100 req/s, 50 burst) + let rate_limiter = SeismicRateLimiter::with_defaults(); + + let handle = RpcServerConfig::http(Default::default()) + .with_http_address(test_address()) + .set_rpc_middleware(rate_limiter) + .start(&modules) + .await + .unwrap(); + + let client = handle.http_client().unwrap(); + + // With default config, we should be able to make many requests + // eth_chainId is exempt by default + for i in 0..100 { + let result = + EthApiClient::::chain_id( + &client, + ) + .await; + assert!(result.is_ok(), "Default exempt method request {} should succeed", i); + } +} + +/// Test that rate limiting resets after waiting +#[tokio::test(flavor = "multi_thread")] +async fn test_rate_limiter_token_refill() { + let builder = test_rpc_builder(); + let eth_api = builder.eth_api_builder().enable_storage_apis(true).build(); + let modules = + builder.build(TransportRpcModuleConfig::set_http(RpcModuleSelection::All), eth_api); + + // 10 requests per second = 1 token every 100ms + let rate_limiter = SeismicRateLimiter::new(RateLimitConfig { + requests_per_second: 10, + burst_size: 1, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec![], + }); + + let handle = RpcServerConfig::http(Default::default()) + .with_http_address(test_address()) + .set_rpc_middleware(rate_limiter) + .start(&modules) + .await + .unwrap(); + + let client = handle.http_client().unwrap(); + + // Exhaust the burst + let result1 = + EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result1.is_ok(), "First request should succeed"); + + let result2 = + EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result2.is_err(), "Second request should be rate limited"); + + // Wait for token to refill (>100ms for safety) + tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; + + // Should be able to make another request + let result3 = + EthApiClient::::protocol_version( + &client, + ) + .await; + assert!(result3.is_ok(), "Request after refill should succeed"); +} diff --git a/crates/seismic/node/src/node.rs b/crates/seismic/node/src/node.rs index b1ce1d1aa..974382432 100644 --- a/crates/seismic/node/src/node.rs +++ b/crates/seismic/node/src/node.rs @@ -14,7 +14,7 @@ use reth_evm::{ ConfigureEngineEvm, ConfigureEvm, EvmFactory, EvmFactoryFor, NextBlockEnvAttributes, }; use reth_network::{NetworkHandle, NetworkPrimitives}; -use reth_node_api::{AddOnsContext, FullNodeComponents, NodeAddOns, PrimitivesTy, TxTy}; +use reth_node_api::{AddOnsContext, FullNodeComponents, HeaderTy, NodeAddOns, PrimitivesTy, TxTy}; use reth_node_builder::{ components::{ BasicPayloadServiceBuilder, ComponentsBuilder, ConsensusBuilder, ExecutorBuilder, @@ -23,8 +23,9 @@ use reth_node_builder::{ node::{FullNodeTypes, NodeTypes}, rpc::{ BasicEngineApiBuilder, BasicEngineValidator, BasicEngineValidatorBuilder, EngineApiBuilder, - EngineValidatorAddOn, EngineValidatorBuilder, EthApiBuilder, PayloadValidatorBuilder, - RethRpcAddOns, RethRpcMiddleware, RpcAddOns, RpcHandle, RpcModuleContainer, + EngineValidatorAddOn, EngineValidatorBuilder, EthApiBuilder, EthApiCtx, + PayloadValidatorBuilder, RethRpcAddOns, RethRpcMiddleware, RpcAddOns, RpcHandle, + RpcModuleContainer, }, BuilderContext, DebugNode, Node, NodeAdapter, NodeComponentsBuilder, PayloadBuilderConfig, }; @@ -34,6 +35,10 @@ use reth_provider::{providers::ProviderFactoryBuilder, CanonStateSubscriptions, use reth_rpc::ValidationApi; use reth_rpc_api::BlockSubmissionValidationApiServer; use reth_rpc_builder::{config::RethRpcServerConfig, Identity}; +use reth_rpc_eth_api::{ + helpers::{pending_block::BuildPendingEnv, AddDevSigners}, + FullEthApiServer, RpcConvert, RpcConverter, RpcTypes, +}; use reth_rpc_eth_types::{ error::{api::FromEvmHalt, FromEvmError}, EthApiError, @@ -42,14 +47,17 @@ use reth_rpc_server_types::RethRpcModule; use reth_seismic_evm::SeismicEvmConfig; use reth_seismic_payload_builder::SeismicBuilderConfig; use reth_seismic_primitives::{SeismicPrimitives, SeismicReceipt, SeismicTransactionSigned}; -use reth_seismic_rpc::{SeismicEthApiBuilder, SeismicEthApiError, SeismicRethWithSignable}; +use reth_seismic_rpc::{ + SeismicEthApi, SeismicEthApiError, SeismicReceiptConverter, SeismicRethWithSignable, + SeismicRpcConvert, SeismicRpcTxConverter, SeismicSimTxConverter, +}; use reth_transaction_pool::{ blobstore::{DiskFileBlobStore, DiskFileBlobStoreConfig}, CoinbaseTipOrdering, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor, }; use revm::context::TxEnv; use seismic_alloy_consensus::SeismicTxEnvelope; -use std::{sync::Arc, time::SystemTime}; +use std::{marker::PhantomData, sync::Arc, time::SystemTime}; use crate::seismic_evm_config; @@ -228,6 +236,15 @@ where pub fn builder() -> SeismicAddOnsBuilder { SeismicAddOnsBuilder::default() } + + /// Sets the RPC middleware stack for processing RPC requests. + /// + /// This method configures a custom middleware stack that will be applied to all RPC requests + /// across HTTP, `WebSocket`, and IPC transports. + pub fn with_rpc_middleware(self, rpc_middleware: T) -> SeismicAddOns + { + SeismicAddOns { inner: self.inner.with_rpc_middleware(rpc_middleware) } + } } /// A regular seismic evm and executor builder. @@ -713,3 +730,49 @@ impl NetworkPrimitives for SeismicNetworkPrimitives { type Receipt = SeismicReceipt; type NewBlockPayload = NewBlock; } + +/// Builds [`SeismicEthApi`]. +#[derive(Debug)] +pub struct SeismicEthApiBuilder { + _nt: PhantomData, +} + +impl Default for SeismicEthApiBuilder { + fn default() -> Self { + Self { _nt: PhantomData } + } +} + +impl SeismicEthApiBuilder { + /// Creates a [`SeismicEthApiBuilder`] instance. + pub const fn new() -> Self { + Self { _nt: PhantomData } + } +} + +impl EthApiBuilder for SeismicEthApiBuilder +where + N: FullNodeComponents< + Evm: ConfigureEvm< + NextBlockEnvCtx: BuildPendingEnv> + Unpin, + >, + >, + NetworkT: RpcTypes, + SeismicRpcConvert: RpcConvert, + SeismicEthApi>: + FullEthApiServer + AddDevSigners, +{ + type EthApi = SeismicEthApi>; + + async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result { + let receipt_converter = SeismicReceiptConverter::new(); + + let rpc_converter: SeismicRpcConvert = RpcConverter::new(receipt_converter) + .with_sim_tx_converter(SeismicSimTxConverter::new()) + .with_rpc_tx_converter(SeismicRpcTxConverter::new()); + + let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner(); + + Ok(SeismicEthApi { inner: Arc::new(eth_api) }) + } +} diff --git a/crates/seismic/rpc/Cargo.toml b/crates/seismic/rpc/Cargo.toml index 73214ef6d..307f7f7f5 100644 --- a/crates/seismic/rpc/Cargo.toml +++ b/crates/seismic/rpc/Cargo.toml @@ -25,7 +25,6 @@ alloy-dyn-abi.workspace = true # reth reth-evm.workspace = true reth-node-api.workspace = true -reth-node-builder.workspace = true reth-primitives.workspace = true reth-primitives-traits.workspace = true reth-provider.workspace = true @@ -62,9 +61,14 @@ tokio.workspace = true jsonrpsee-types.workspace = true jsonrpsee.workspace = true futures.workspace = true +http = "1" + +# rate limiter +governor = "0.6" +dashmap = "6" +tower.workspace = true # misc -eyre.workspace = true serde.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/seismic/rpc/src/eth/mod.rs b/crates/seismic/rpc/src/eth/mod.rs index b0a591864..747c09128 100644 --- a/crates/seismic/rpc/src/eth/mod.rs +++ b/crates/seismic/rpc/src/eth/mod.rs @@ -7,31 +7,27 @@ pub mod transaction; pub mod utils; pub use receipt::SeismicReceiptConverter; +pub use transaction::{SeismicRpcTxConverter, SeismicSimTxConverter}; mod block; mod call; mod pending_block; -use crate::{ - eth::transaction::{SeismicRpcTxConverter, SeismicSimTxConverter}, - SeismicEthApiError, -}; +use crate::SeismicEthApiError; use alloy_consensus::TxEip4844; use alloy_primitives::U256; -use reth_evm::ConfigureEvm; -use reth_node_api::{FullNodeComponents, HeaderTy}; -use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; +use reth_node_api::FullNodeComponents; use reth_rpc::{ eth::{core::EthApiInner, DevSigner}, RpcTypes, }; use reth_rpc_eth_api::{ helpers::{ - pending_block::BuildPendingEnv, spec::SignersForApi, AddDevSigners, EthApiSpec, EthFees, - EthState, LoadFee, LoadPendingBlock, LoadState, SpawnBlocking, Trace, + spec::SignersForApi, AddDevSigners, EthApiSpec, EthFees, EthState, LoadFee, + LoadPendingBlock, LoadState, SpawnBlocking, Trace, }, - EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore, - RpcNodeCoreExt, SignableTxRequest, + EthApiTypes, FromEvmError, RpcConvert, RpcConverter, RpcNodeCore, RpcNodeCoreExt, + SignableTxRequest, }; use reth_rpc_eth_types::{EthStateCache, FeeHistoryCache, GasPriceOracle}; use reth_storage_api::{BlockReader, ProviderHeader, ProviderTx}; @@ -40,7 +36,7 @@ use reth_tasks::{ TaskSpawner, }; use seismic_alloy_network::SeismicReth; -use std::{fmt, marker::PhantomData, sync::Arc}; +use std::{fmt, sync::Arc}; use reth_rpc_convert::transaction::{EthTxEnvError, TryIntoTxEnv}; use revm_context::{BlockEnv, CfgEnv, TxEnv}; @@ -159,11 +155,6 @@ impl SeismicEthApi { pub fn eth_api(&self) -> &EthApiNodeBackend { &self.inner } - - /// Build a [`SeismicEthApi`] using [`SeismicEthApiBuilder`]. - pub const fn builder() -> SeismicEthApiBuilder { - SeismicEthApiBuilder::new() - } } impl EthApiTypes for SeismicEthApi @@ -352,50 +343,3 @@ pub type SeismicRpcConvert = RpcConverter< crate::eth::transaction::SeismicRpcTxConverter, >; -/// Builds [`SeismicEthApi`] for Optimism. -#[derive(Debug)] -pub struct SeismicEthApiBuilder { - _nt: PhantomData, -} - -impl Default for SeismicEthApiBuilder { - fn default() -> Self { - Self { _nt: PhantomData } - } -} - -impl SeismicEthApiBuilder { - /// Creates a [`SeismicEthApiBuilder`] instance from core components. - pub const fn new() -> Self { - Self { _nt: PhantomData } - } -} - -impl EthApiBuilder for SeismicEthApiBuilder -where - N: FullNodeComponents< - Evm: ConfigureEvm< - NextBlockEnvCtx: BuildPendingEnv> - // + From - + Unpin, - >, - >, - NetworkT: RpcTypes, - SeismicRpcConvert: RpcConvert, - SeismicEthApi>: - FullEthApiServer + AddDevSigners, -{ - type EthApi = SeismicEthApi>; - - async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result { - let receipt_converter = SeismicReceiptConverter::new(); - - let rpc_converter: SeismicRpcConvert = RpcConverter::new(receipt_converter) - .with_sim_tx_converter(SeismicSimTxConverter::new()) - .with_rpc_tx_converter(SeismicRpcTxConverter::new()); - - let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner(); - - Ok(SeismicEthApi { inner: Arc::new(eth_api) }) - } -} diff --git a/crates/seismic/rpc/src/lib.rs b/crates/seismic/rpc/src/lib.rs index 662bcc0df..d999832ee 100644 --- a/crates/seismic/rpc/src/lib.rs +++ b/crates/seismic/rpc/src/lib.rs @@ -13,3 +13,5 @@ pub use eth::*; mod error; pub use error::*; + +pub mod rate_limiter; diff --git a/crates/seismic/rpc/src/rate_limiter.rs b/crates/seismic/rpc/src/rate_limiter.rs new file mode 100644 index 000000000..2d15c6969 --- /dev/null +++ b/crates/seismic/rpc/src/rate_limiter.rs @@ -0,0 +1,975 @@ +//! Rate limiting middleware for Seismic RPC server. +//! +//! Provides per-IP rate limiting using token bucket algorithm via the `governor` crate. +//! +//! ## Architecture +//! +//! 1. `ClientIpExtractorLayer` - HTTP middleware that extracts client IP from nginx headers +//! 2. `SeismicRateLimiter` - RPC middleware that enforces per-IP rate limits +//! +//! The IP flows through request extensions: +//! HTTP layer → extensions → jsonrpsee → RPC middleware + +use dashmap::DashMap; +use governor::{ + NotUntil, Quota, RateLimiter, clock::{DefaultClock, QuantaInstant}, state::{InMemoryState, NotKeyed} +}; +use http::{Request, Response}; +use jsonrpsee::{ + server::middleware::rpc::RpcServiceT, types::Request as RpcRequest, MethodResponse, +}; +use std::{ + future::Future, + net::{IpAddr, Ipv4Addr}, + num::NonZeroU32, + sync::Arc, + task::{Context, Poll}, +}; +use tower::{Layer, Service}; + +// ============================================================================ +// ClientIp - Stored in request extensions +// ============================================================================ + +/// Client IP address extracted from HTTP headers. +/// +/// This is stored in request extensions by `ClientIpExtractorService` +/// and read by `SeismicRateLimitingService`. +#[derive(Clone, Copy, Debug)] +pub struct ClientIp(pub IpAddr); + +// ============================================================================ +// HTTP Middleware - Extract IP from nginx headers +// ============================================================================ + +/// Tower layer that extracts client IP from HTTP headers. +/// +/// This must be added to the HTTP middleware stack to make +/// the client IP available to the RPC rate limiter. +#[derive(Clone, Debug, Default)] +pub struct ClientIpExtractorLayer; + +impl Layer for ClientIpExtractorLayer { + type Service = ClientIpExtractorService; + + fn layer(&self, inner: S) -> Self::Service { + ClientIpExtractorService { inner } + } +} + +/// Service that extracts client IP from nginx headers and stores in extensions. +#[derive(Clone, Debug)] +pub struct ClientIpExtractorService { + inner: S, +} + +impl Service> for ClientIpExtractorService +where + S: Service, Response = Response>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + // Extract client IP from nginx headers + let client_ip = extract_client_ip_from_headers(&req); + + // Store in extensions for the RPC layer to read + req.extensions_mut().insert(ClientIp(client_ip)); + + self.inner.call(req) + } +} + +/// Extract client IP from HTTP headers set by nginx. +/// +/// Checks in order: +/// 1. X-Forwarded-For (first IP in chain) +/// 2. X-Real-IP +/// 3. Falls back to 0.0.0.0 +fn extract_client_ip_from_headers(req: &Request) -> IpAddr { + // Try X-Forwarded-For first (standard proxy header) + // Format: "client, proxy1, proxy2" - we want the first one + if let Some(forwarded) = req.headers().get("x-forwarded-for") { + if let Ok(value) = forwarded.to_str() { + if let Some(first_ip) = value.split(',').next() { + if let Ok(ip) = first_ip.trim().parse() { + return ip; + } + } + } + } + + // Try X-Real-IP (nginx specific) + if let Some(real_ip) = req.headers().get("x-real-ip") { + if let Ok(value) = real_ip.to_str() { + if let Ok(ip) = value.trim().parse() { + return ip; + } + } + } + + // Fallback - this shouldn't happen if nginx is configured correctly + // Using 0.0.0.0 makes it obvious something is wrong + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)) +} + +// ============================================================================ +// Rate Limit Configuration +// ============================================================================ + +/// Configuration for the rate limiter. +#[derive(Debug, Clone)] +pub struct RateLimitConfig { + /// Maximum requests per second allowed per IP. + pub requests_per_second: u32, + + /// Burst size - how many requests can come in at once before limiting. + /// This allows short bursts while still enforcing the overall rate. + pub burst_size: u32, + + /// Methods to rate limit. If empty, all methods are rate limited. + /// Prefix matching is used, e.g., "eth_" matches all eth_ methods. + pub limited_methods: Option>, + + /// Methods to exempt from rate limiting. + /// These are never rate limited regardless of `limited_methods`. + pub exempt_methods: Vec, + + /// IP addresses to exempt from rate limiting. + /// Useful for internal services. + pub exempt_ips: Vec, +} + +impl Default for RateLimitConfig { + fn default() -> Self { + Self { + requests_per_second: 100, + burst_size: 50, + limited_methods: None, + exempt_methods: vec![ + "eth_chainId".to_string(), + "eth_blockNumber".to_string(), + "net_version".to_string(), + "web3_clientVersion".to_string(), + ], + exempt_ips: vec![], + } + } +} + +// ============================================================================ +// RPC Middleware - Rate Limiting Service +// ============================================================================ + +/// Type alias for the per-IP rate limiter. +type IpRateLimiter = RateLimiter; + +/// Seismic rate limiter using token bucket algorithm. +/// +/// Tracks per-IP request rates and rejects requests that exceed the limit. +#[derive(Clone)] +pub struct SeismicRateLimiter { + /// Per-IP rate limiters stored in a concurrent hashmap. + ip_limiters: Arc>>, + /// Configuration. + config: Arc, + /// Quota used for creating new rate limiters. + quota: Quota, +} + +impl std::fmt::Debug for SeismicRateLimiter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SeismicRateLimiter") + .field("config", &self.config) + .field("active_ips", &self.ip_limiters.len()) + .finish() + } +} + +impl SeismicRateLimiter { + /// Create a new rate limiter with the given configuration. + pub fn new(config: RateLimitConfig) -> Self { + let rps = NonZeroU32::new(config.requests_per_second).unwrap_or(NonZeroU32::MIN); + let burst = NonZeroU32::new(config.burst_size).unwrap_or(NonZeroU32::MIN); + + let quota = Quota::per_second(rps).allow_burst(burst); + + Self { ip_limiters: Arc::new(DashMap::new()), config: Arc::new(config), quota } + } + + /// Create a rate limiter with default configuration. + pub fn with_defaults() -> Self { + Self::new(RateLimitConfig::default()) + } + + /// Get or create a rate limiter for the given IP. + fn get_limiter(&self, ip: IpAddr) -> Arc { + self.ip_limiters + .entry(ip) + .or_insert_with(|| Arc::new(RateLimiter::direct(self.quota))) + .clone() + } + + /// Check if a method should be rate limited. + fn should_limit_method(&self, method: &str) -> bool { + // Check exempt methods first + for exempt in &self.config.exempt_methods { + if method == exempt || method.starts_with(exempt) { + return false; + } + } + + // If no limited_methods specified (None), limit all (except exempt) + let Some(limited_methods) = &self.config.limited_methods else { + return true; + }; + + // If limited_methods is empty, limit all (except exempt) + if limited_methods.is_empty() { + return true; + } + + // Check if method matches any limited prefix + for limited in limited_methods { + if method == limited || method.starts_with(limited) { + return true; + } + } + + false + } + + /// Check if an IP is exempt from rate limiting. + fn is_ip_exempt(&self, ip: IpAddr) -> bool { + self.config.exempt_ips.contains(&ip) + } + + /// Check if a request should be allowed. + /// + /// Returns `Ok(())` if allowed, `Err(())` if rate limited. + pub fn check(&self, ip: IpAddr, method: &str) -> Result<(), NotUntil> { + // Check exemptions + if self.is_ip_exempt(ip) || !self.should_limit_method(method) { + return Ok(()); + } + + // Get or create limiter for this IP + let limiter = self.get_limiter(ip); + + // Try to acquire a token + limiter.check() + } + + /// Get the number of currently tracked IPs. + /// Useful for monitoring. + pub fn active_ip_count(&self) -> usize { + self.ip_limiters.len() + } + + /// Clear old entries from the limiter map. + /// Call this periodically to prevent memory growth. + pub const fn cleanup_stale_entries(&self) { + // For now, we don't have TTL tracking on entries. + // In production, you might want to add timestamps and prune old entries. + // The governor crate handles token refill automatically. + } +} + +// Implement Tower Layer for SeismicRateLimiter +impl Layer for SeismicRateLimiter { + type Service = SeismicRateLimitingService; + + fn layer(&self, inner: S) -> Self::Service { + SeismicRateLimitingService { inner, rate_limiter: self.clone() } + } +} + +/// RPC service that applies rate limiting. +#[derive(Clone, Debug)] +pub struct SeismicRateLimitingService { + inner: S, + rate_limiter: SeismicRateLimiter, +} + +impl SeismicRateLimitingService { + /// Create a new rate limiting service. + pub const fn new(inner: S, rate_limiter: SeismicRateLimiter) -> Self { + Self { inner, rate_limiter } + } +} + +impl RpcServiceT for SeismicRateLimitingService +where + S: RpcServiceT + Send + Sync + Clone + 'static, +{ + type MethodResponse = MethodResponse; + type NotificationResponse = S::NotificationResponse; + type BatchResponse = S::BatchResponse; + + fn call<'a>( + &self, + req: RpcRequest<'a>, + ) -> impl Future + Send + 'a { + // Extract client IP from extensions (set by HTTP middleware) + let client_ip = req + .extensions() + .get::() + .map(|c| c.0) + .unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); + + let method_name = req.method_name(); + + // Check rate limit + if self.rate_limiter.check(client_ip, method_name).is_err() { + // Rate limited - return error immediately + let error = jsonrpsee::types::ErrorObject::owned( + -32029, // Custom error code for rate limiting + "Rate limited".to_string(), + Some(format!("Too many requests from {}", client_ip)), + ); + + return futures::future::Either::Left( + async move { MethodResponse::error(req.id(), error) }, + ); + } + + // Not rate limited - proceed with the request + let fut = self.inner.call(req); + futures::future::Either::Right(fut) + } + + fn batch<'a>( + &self, + requests: jsonrpsee::core::middleware::Batch<'a>, + ) -> impl Future + Send + 'a { + // For batch requests, we could rate limit the whole batch or each request + // For simplicity, we pass through and let individual calls be limited + self.inner.batch(requests) + } + + fn notification<'a>( + &self, + n: jsonrpsee::core::middleware::Notification<'a>, + ) -> impl Future + Send + 'a { + // Notifications don't get responses, so no rate limiting + self.inner.notification(n) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ======================================================================== + // RateLimitConfig Tests + // ======================================================================== + + #[test] + fn test_rate_limit_config_default() { + let config = RateLimitConfig::default(); + assert_eq!(config.requests_per_second, 100); + assert_eq!(config.burst_size, 50); + assert!(config.exempt_methods.contains(&"eth_chainId".to_string())); + assert!(config.exempt_methods.contains(&"eth_blockNumber".to_string())); + assert!(config.exempt_methods.contains(&"net_version".to_string())); + assert!(config.exempt_methods.contains(&"web3_clientVersion".to_string())); + assert!(config.limited_methods.is_none()); + assert!(config.exempt_ips.is_empty()); + } + + #[test] + fn test_rate_limit_config_custom() { + let config = RateLimitConfig { + requests_per_second: 50, + burst_size: 10, + limited_methods: Some(vec!["eth_call".to_string()]), + exempt_methods: vec!["eth_chainId".to_string()], + exempt_ips: vec!["127.0.0.1".parse().unwrap()], + }; + assert_eq!(config.requests_per_second, 50); + assert_eq!(config.burst_size, 10); + assert_eq!(config.limited_methods.as_ref().unwrap().len(), 1); + assert_eq!(config.exempt_methods.len(), 1); + assert_eq!(config.exempt_ips.len(), 1); + } + + // ======================================================================== + // IP Extraction Tests + // ======================================================================== + + #[test] + fn test_extract_ip_from_x_forwarded_for_single() { + let req = Request::builder().header("x-forwarded-for", "203.0.113.50").body(()).unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "203.0.113.50".parse::().unwrap()); + } + + #[test] + fn test_extract_ip_from_x_forwarded_for_chain() { + // When multiple proxies, first IP is the client + let req = Request::builder() + .header("x-forwarded-for", "203.0.113.50, 70.41.3.18, 150.172.238.178") + .body(()) + .unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "203.0.113.50".parse::().unwrap()); + } + + #[test] + fn test_extract_ip_from_x_forwarded_for_with_spaces() { + let req = Request::builder() + .header("x-forwarded-for", " 203.0.113.50 , 70.41.3.18 ") + .body(()) + .unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "203.0.113.50".parse::().unwrap()); + } + + #[test] + fn test_extract_ip_from_x_real_ip() { + let req = Request::builder().header("x-real-ip", "192.168.1.100").body(()).unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "192.168.1.100".parse::().unwrap()); + } + + #[test] + fn test_extract_ip_prefers_x_forwarded_for_over_x_real_ip() { + let req = Request::builder() + .header("x-forwarded-for", "10.0.0.1") + .header("x-real-ip", "10.0.0.2") + .body(()) + .unwrap(); + let ip = extract_client_ip_from_headers(&req); + // X-Forwarded-For takes priority + assert_eq!(ip, "10.0.0.1".parse::().unwrap()); + } + + #[test] + fn test_extract_ip_fallback_when_no_headers() { + let req = Request::builder().body(()).unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "0.0.0.0".parse::().unwrap()); + } + + #[test] + fn test_extract_ip_fallback_when_invalid_header() { + let req = + Request::builder().header("x-forwarded-for", "not-an-ip-address").body(()).unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "0.0.0.0".parse::().unwrap()); + } + + #[test] + fn test_extract_ipv6_address() { + let req = Request::builder().header("x-forwarded-for", "2001:db8::1").body(()).unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "2001:db8::1".parse::().unwrap()); + } + + #[test] + fn test_extract_ip_case_insensitive_header() { + // HTTP headers are case-insensitive, but http crate normalizes them + let req = Request::builder().header("X-Forwarded-For", "10.20.30.40").body(()).unwrap(); + let ip = extract_client_ip_from_headers(&req); + assert_eq!(ip, "10.20.30.40".parse::().unwrap()); + } + + // ======================================================================== + // Method Filtering Tests + // ======================================================================== + + #[test] + fn test_should_limit_method_with_specific_methods() { + let config = RateLimitConfig { + limited_methods: Some(vec!["eth_call".to_string(), "eth_send".to_string()]), + exempt_methods: vec!["eth_chainId".to_string()], + ..Default::default() + }; + let limiter = SeismicRateLimiter::new(config); + + // Exact matches + assert!(limiter.should_limit_method("eth_call")); + + // Prefix matches + assert!(limiter.should_limit_method("eth_sendRawTransaction")); + assert!(limiter.should_limit_method("eth_sendTransaction")); + + // Exempt method + assert!(!limiter.should_limit_method("eth_chainId")); + + // Not in limited_methods list + assert!(!limiter.should_limit_method("eth_blockNumber")); + assert!(!limiter.should_limit_method("eth_getBalance")); + assert!(!limiter.should_limit_method("debug_traceTransaction")); + } + + #[test] + fn test_should_limit_method_empty_limited_methods_limits_all() { + let config = RateLimitConfig { + limited_methods: Some(vec![]), // empty vec = limit all + exempt_methods: vec!["eth_chainId".to_string()], + ..Default::default() + }; + let limiter = SeismicRateLimiter::new(config); + + // All methods should be limited except exempt ones + assert!(limiter.should_limit_method("eth_call")); + assert!(limiter.should_limit_method("eth_sendRawTransaction")); + assert!(limiter.should_limit_method("debug_traceTransaction")); + assert!(limiter.should_limit_method("any_random_method")); + + // Exempt method should not be limited + assert!(!limiter.should_limit_method("eth_chainId")); + } + + #[test] + fn test_should_limit_method_exempt_takes_priority() { + let config = RateLimitConfig { + limited_methods: Some(vec!["eth_".to_string()]), // All eth_ methods + exempt_methods: vec!["eth_chainId".to_string(), "eth_blockNumber".to_string()], + ..Default::default() + }; + let limiter = SeismicRateLimiter::new(config); + + // Limited by prefix + assert!(limiter.should_limit_method("eth_call")); + assert!(limiter.should_limit_method("eth_getBalance")); + + // Exempt takes priority over limited + assert!(!limiter.should_limit_method("eth_chainId")); + assert!(!limiter.should_limit_method("eth_blockNumber")); + } + + #[test] + fn test_should_limit_method_prefix_matching() { + let config = RateLimitConfig { + limited_methods: Some(vec!["debug_".to_string(), "trace_".to_string()]), + exempt_methods: vec![], + ..Default::default() + }; + let limiter = SeismicRateLimiter::new(config); + + assert!(limiter.should_limit_method("debug_traceTransaction")); + assert!(limiter.should_limit_method("debug_storageRangeAt")); + assert!(limiter.should_limit_method("trace_block")); + assert!(limiter.should_limit_method("trace_call")); + + // Not matching prefixes + assert!(!limiter.should_limit_method("eth_call")); + assert!(!limiter.should_limit_method("net_version")); + } + + // ======================================================================== + // IP Exemption Tests + // ======================================================================== + + #[test] + fn test_exempt_ip_single() { + let config = RateLimitConfig { + exempt_ips: vec!["192.168.1.1".parse().unwrap()], + ..Default::default() + }; + let limiter = SeismicRateLimiter::new(config); + + assert!(limiter.is_ip_exempt("192.168.1.1".parse().unwrap())); + assert!(!limiter.is_ip_exempt("192.168.1.2".parse().unwrap())); + } + + #[test] + fn test_exempt_ip_multiple() { + let config = RateLimitConfig { + exempt_ips: vec![ + "192.168.1.1".parse().unwrap(), + "10.0.0.1".parse().unwrap(), + "127.0.0.1".parse().unwrap(), + ], + ..Default::default() + }; + let limiter = SeismicRateLimiter::new(config); + + assert!(limiter.is_ip_exempt("192.168.1.1".parse().unwrap())); + assert!(limiter.is_ip_exempt("10.0.0.1".parse().unwrap())); + assert!(limiter.is_ip_exempt("127.0.0.1".parse().unwrap())); + assert!(!limiter.is_ip_exempt("8.8.8.8".parse().unwrap())); + } + + #[test] + fn test_exempt_ip_ipv6() { + let config = RateLimitConfig { + exempt_ips: vec!["::1".parse().unwrap(), "2001:db8::1".parse().unwrap()], + ..Default::default() + }; + let limiter = SeismicRateLimiter::new(config); + + assert!(limiter.is_ip_exempt("::1".parse().unwrap())); + assert!(limiter.is_ip_exempt("2001:db8::1".parse().unwrap())); + assert!(!limiter.is_ip_exempt("2001:db8::2".parse().unwrap())); + } + + // ======================================================================== + // Rate Limiting Behavior Tests + // ======================================================================== + + #[test] + fn test_rate_limiting_basic() { + let config = RateLimitConfig { + requests_per_second: 2, + burst_size: 2, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec![], + }; + let limiter = SeismicRateLimiter::new(config); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // First two requests should succeed (burst) + assert!(limiter.check(ip, "eth_call").is_ok()); + assert!(limiter.check(ip, "eth_call").is_ok()); + + // Third request should be rate limited + assert!(limiter.check(ip, "eth_call").is_err()); + } + + #[test] + fn test_rate_limiting_per_ip_isolation() { + let config = RateLimitConfig { + requests_per_second: 2, + burst_size: 2, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec![], + }; + let limiter = SeismicRateLimiter::new(config); + let ip1: IpAddr = "10.0.0.1".parse().unwrap(); + let ip2: IpAddr = "10.0.0.2".parse().unwrap(); + + // Exhaust ip1's quota + assert!(limiter.check(ip1, "eth_call").is_ok()); + assert!(limiter.check(ip1, "eth_call").is_ok()); + assert!(limiter.check(ip1, "eth_call").is_err()); + + // ip2 should still have its own quota + assert!(limiter.check(ip2, "eth_call").is_ok()); + assert!(limiter.check(ip2, "eth_call").is_ok()); + assert!(limiter.check(ip2, "eth_call").is_err()); + } + + #[test] + fn test_rate_limiting_exempt_ip_bypasses() { + let config = RateLimitConfig { + requests_per_second: 1, + burst_size: 1, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec!["192.168.1.1".parse().unwrap()], + }; + let limiter = SeismicRateLimiter::new(config); + let exempt_ip: IpAddr = "192.168.1.1".parse().unwrap(); + let normal_ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Exempt IP should never be rate limited + for _ in 0..100 { + assert!(limiter.check(exempt_ip, "eth_call").is_ok()); + } + + // Normal IP should be rate limited + assert!(limiter.check(normal_ip, "eth_call").is_ok()); + assert!(limiter.check(normal_ip, "eth_call").is_err()); + } + + #[test] + fn test_rate_limiting_exempt_method_bypasses() { + let config = RateLimitConfig { + requests_per_second: 1, + burst_size: 1, + limited_methods: None, + exempt_methods: vec!["eth_chainId".to_string()], + exempt_ips: vec![], + }; + let limiter = SeismicRateLimiter::new(config); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Exempt method should never be rate limited + for _ in 0..100 { + assert!(limiter.check(ip, "eth_chainId").is_ok()); + } + + // Non-exempt method should be rate limited + assert!(limiter.check(ip, "eth_call").is_ok()); + assert!(limiter.check(ip, "eth_call").is_err()); + } + + #[test] + fn test_rate_limiting_different_methods_share_quota() { + let config = RateLimitConfig { + requests_per_second: 2, + burst_size: 2, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec![], + }; + let limiter = SeismicRateLimiter::new(config); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Different methods from same IP share the same quota + assert!(limiter.check(ip, "eth_call").is_ok()); + assert!(limiter.check(ip, "eth_getBalance").is_ok()); + assert!(limiter.check(ip, "eth_sendRawTransaction").is_err()); + } + + #[test] + fn test_rate_limiting_burst_allows_quick_requests() { + let config = RateLimitConfig { + requests_per_second: 10, + burst_size: 5, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec![], + }; + let limiter = SeismicRateLimiter::new(config); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // All 5 burst requests should succeed immediately + for i in 0..5 { + assert!( + limiter.check(ip, "eth_call").is_ok(), + "Request {} should succeed within burst", + i + ); + } + + // 6th request should fail (burst exhausted) + assert!(limiter.check(ip, "eth_call").is_err()); + } + + // ======================================================================== + // Active IP Count Tests + // ======================================================================== + + #[test] + fn test_active_ip_count() { + let config = RateLimitConfig::default(); + let limiter = SeismicRateLimiter::new(config); + + assert_eq!(limiter.active_ip_count(), 0); + + // Make requests from different IPs + limiter.check("10.0.0.1".parse().unwrap(), "eth_call").ok(); + assert_eq!(limiter.active_ip_count(), 1); + + limiter.check("10.0.0.2".parse().unwrap(), "eth_call").ok(); + assert_eq!(limiter.active_ip_count(), 2); + + limiter.check("10.0.0.3".parse().unwrap(), "eth_call").ok(); + assert_eq!(limiter.active_ip_count(), 3); + + // Same IP shouldn't increase count + limiter.check("10.0.0.1".parse().unwrap(), "eth_call").ok(); + assert_eq!(limiter.active_ip_count(), 3); + } + + // ======================================================================== + // Constructor Tests + // ======================================================================== + + #[test] + fn test_with_defaults() { + let limiter = SeismicRateLimiter::with_defaults(); + // Should be able to make many requests with default config (100 rps, 50 burst) + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + for _ in 0..50 { + assert!(limiter.check(ip, "eth_call").is_ok()); + } + } + + #[test] + fn test_zero_values_use_minimum() { + // Governor doesn't allow 0 for NonZeroU32, so we clamp to MIN + let config = + RateLimitConfig { requests_per_second: 0, burst_size: 0, ..Default::default() }; + let limiter = SeismicRateLimiter::new(config); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Should still work with minimum values (1 req/sec, 1 burst) + assert!(limiter.check(ip, "eth_call").is_ok()); + assert!(limiter.check(ip, "eth_call").is_err()); + } + + // ======================================================================== + // ClientIp Type Tests + // ======================================================================== + + #[test] + fn test_client_ip_clone_copy() { + let ip1 = ClientIp("10.0.0.1".parse().unwrap()); + let ip2 = ip1; // Copy + let ip3 = ip1.clone(); // Clone + + assert_eq!(ip1.0, ip2.0); + assert_eq!(ip1.0, ip3.0); + } + + #[test] + fn test_client_ip_debug() { + let ip = ClientIp("192.168.1.1".parse().unwrap()); + let debug_str = format!("{:?}", ip); + assert!(debug_str.contains("192.168.1.1")); + } + + // ======================================================================== + // Layer Implementation Tests + // ======================================================================== + + #[test] + fn test_client_ip_extractor_layer_default() { + let layer = ClientIpExtractorLayer::default(); + let _layer2 = layer.clone(); + let debug_str = format!("{:?}", layer); + assert!(debug_str.contains("ClientIpExtractorLayer")); + } + + #[test] + fn test_seismic_rate_limiter_debug() { + let limiter = SeismicRateLimiter::with_defaults(); + let debug_str = format!("{:?}", limiter); + assert!(debug_str.contains("SeismicRateLimiter")); + assert!(debug_str.contains("active_ips")); + } + + #[test] + fn test_seismic_rate_limiter_clone() { + let limiter1 = SeismicRateLimiter::with_defaults(); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Use some quota + limiter1.check(ip, "eth_call").ok(); + + // Clone should share the same internal state + let limiter2 = limiter1.clone(); + assert_eq!(limiter1.active_ip_count(), limiter2.active_ip_count()); + } + + // ======================================================================== + // Token Refill Tests (Async) + // ======================================================================== + + #[tokio::test] + async fn test_rate_limiting_tokens_refill_over_time() { + let config = RateLimitConfig { + requests_per_second: 10, // 10 per second = 1 token every 100ms + burst_size: 1, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec![], + }; + let limiter = SeismicRateLimiter::new(config); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Use up the burst + assert!(limiter.check(ip, "eth_call").is_ok()); + assert!(limiter.check(ip, "eth_call").is_err()); + + // Wait for token to refill (slightly more than 100ms to be safe) + tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; + + // Should be able to make another request + assert!(limiter.check(ip, "eth_call").is_ok()); + } + + #[tokio::test] + async fn test_rate_limiting_sustained_rate() { + let config = RateLimitConfig { + requests_per_second: 20, // 20 per second = 1 token every 50ms + burst_size: 1, + limited_methods: None, + exempt_methods: vec![], + exempt_ips: vec![], + }; + let limiter = SeismicRateLimiter::new(config); + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Make requests at a sustainable rate + for _ in 0..5 { + assert!(limiter.check(ip, "eth_call").is_ok()); + tokio::time::sleep(tokio::time::Duration::from_millis(60)).await; + } + } + + // ======================================================================== + // Integration-like Tests + // ======================================================================== + + #[test] + fn test_full_check_flow() { + let config = RateLimitConfig { + requests_per_second: 10, + burst_size: 5, + limited_methods: Some(vec!["eth_call".to_string(), "eth_send".to_string()]), + exempt_methods: vec!["eth_chainId".to_string()], + exempt_ips: vec!["127.0.0.1".parse().unwrap()], + }; + let limiter = SeismicRateLimiter::new(config); + + let normal_ip: IpAddr = "10.0.0.1".parse().unwrap(); + let exempt_ip: IpAddr = "127.0.0.1".parse().unwrap(); + + // Exempt IP can make unlimited requests to limited methods + for _ in 0..100 { + assert!(limiter.check(exempt_ip, "eth_call").is_ok()); + } + + // Normal IP can make unlimited requests to exempt methods + for _ in 0..100 { + assert!(limiter.check(normal_ip, "eth_chainId").is_ok()); + } + + // Normal IP can make unlimited requests to non-limited methods + for _ in 0..100 { + assert!(limiter.check(normal_ip, "net_version").is_ok()); + } + + // Normal IP is rate limited on limited methods + for _ in 0..5 { + assert!(limiter.check(normal_ip, "eth_call").is_ok()); + } + assert!(limiter.check(normal_ip, "eth_call").is_err()); + } + + #[test] + fn test_concurrent_access_safety() { + use std::sync::Arc; + use std::thread; + + let config = + RateLimitConfig { requests_per_second: 1000, burst_size: 100, ..Default::default() }; + let limiter = Arc::new(SeismicRateLimiter::new(config)); + + let mut handles = vec![]; + + // Spawn multiple threads making requests + for i in 0..10 { + let limiter = Arc::clone(&limiter); + let handle = thread::spawn(move || { + let ip: IpAddr = format!("10.0.0.{}", i).parse().unwrap(); + for _ in 0..50 { + let _ = limiter.check(ip, "eth_call"); + } + }); + handles.push(handle); + } + + // Wait for all threads + for handle in handles { + handle.join().unwrap(); + } + + // Should have 10 unique IPs tracked + assert_eq!(limiter.active_ip_count(), 10); + } +}