From 334669fc72590a15561c5fcdbf4aec6443e9b20a Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Thu, 30 Jan 2025 15:25:17 +0100 Subject: [PATCH 01/21] work in progress, implementing mempool load actor --- code/crates/config/src/lib.rs | 21 ++- code/crates/starknet/host/src/actor.rs | 9 +- .../crates/starknet/host/src/host/starknet.rs | 5 + code/crates/starknet/host/src/lib.rs | 5 +- code/crates/starknet/host/src/mempool_load.rs | 165 ++++++++++++++++++ code/crates/starknet/host/src/spawn.rs | 26 ++- code/crates/starknet/host/src/utils/mod.rs | 1 + code/crates/starknet/host/src/utils/ticker.rs | 19 ++ code/crates/starknet/test/src/lib.rs | 4 +- .../test/cli/src/cmd/distributed_testnet.rs | 1 + code/crates/test/cli/src/new.rs | 1 + 11 files changed, 250 insertions(+), 7 deletions(-) create mode 100644 code/crates/starknet/host/src/mempool_load.rs create mode 100644 code/crates/starknet/host/src/utils/mod.rs create mode 100644 code/crates/starknet/host/src/utils/ticker.rs diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 42d126d13..161345ed6 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -24,7 +24,10 @@ pub struct Config { /// Mempool configuration options pub mempool: MempoolConfig, - + + /// Mempool load configuration options + pub mempool_load: MempoolLoadConfig, + /// Sync configuration options pub sync: SyncConfig, @@ -339,6 +342,22 @@ mod gossipsub { } } +#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum MempoolLoadType { + UniformLoad {count: usize, size: usize}, + #[default] + NoLoad, + NonUniformLoad, +} + +/// Mempool configuration options +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct MempoolLoadConfig { + /// Mempool loading type + pub load_type: MempoolLoadType, +} + /// Mempool configuration options #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct MempoolConfig { diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index e47756fbe..d975ed7fd 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -23,11 +23,13 @@ use crate::host::proposal::compute_proposal_signature; use crate::host::state::HostState; use crate::host::{Host as _, StarknetHost}; use crate::mempool::{MempoolMsg, MempoolRef}; +use crate::mempool_load::MempoolLoadRef; use crate::proto::Protobuf; use crate::types::*; pub struct Host { mempool: MempoolRef, + mempool_load: MempoolLoadRef, network: NetworkRef, metrics: Metrics, span: tracing::Span, @@ -41,6 +43,7 @@ impl Host { home_dir: PathBuf, host: StarknetHost, mempool: MempoolRef, + mempool_load: MempoolLoadRef, network: NetworkRef, metrics: Metrics, span: tracing::Span, @@ -51,7 +54,7 @@ impl Host { let (actor_ref, _) = Actor::spawn( None, - Self::new(mempool, network, metrics, span), + Self::new(mempool, mempool_load, network, metrics, span), HostState::new(host, db_path, &mut StdRng::from_entropy()), ) .await?; @@ -61,12 +64,14 @@ impl Host { pub fn new( mempool: MempoolRef, + mempool_load: MempoolLoadRef, network: NetworkRef, metrics: Metrics, span: tracing::Span, ) -> Self { Self { mempool, + mempool_load, network, metrics, span, @@ -86,6 +91,7 @@ impl Actor for Host { initial_state: Self::State, ) -> Result { self.mempool.link(myself.get_cell()); + self.mempool_load.link(myself.get_cell()); Ok(initial_state) } @@ -545,6 +551,7 @@ async fn on_received_proposal_part( Ok(()) } +//TODO async fn on_decided( state: &mut HostState, consensus: &ConsensusRef, diff --git a/code/crates/starknet/host/src/host/starknet.rs b/code/crates/starknet/host/src/host/starknet.rs index 5e27bd993..5a8319599 100644 --- a/code/crates/starknet/host/src/host/starknet.rs +++ b/code/crates/starknet/host/src/host/starknet.rs @@ -13,6 +13,8 @@ use malachitebft_core_types::{CommitCertificate, Extension, Round, SignedExtensi use crate::host::Host; use crate::mempool::MempoolRef; +use crate::mempool_load::MempoolLoadRef; +// use crate::mempool_load::MempoolLoadRef; use crate::part_store::PartStore; use crate::types::*; @@ -33,6 +35,7 @@ pub struct StarknetParams { pub struct StarknetHost { pub params: StarknetParams, pub mempool: MempoolRef, + pub mempool_load: MempoolLoadRef, pub address: Address, pub private_key: PrivateKey, pub validator_set: ValidatorSet, @@ -43,6 +46,7 @@ impl StarknetHost { pub fn new( params: StarknetParams, mempool: MempoolRef, + mempool_load: MempoolLoadRef, address: Address, private_key: PrivateKey, validator_set: ValidatorSet, @@ -50,6 +54,7 @@ impl StarknetHost { Self { params, mempool, + mempool_load, address, private_key, validator_set, diff --git a/code/crates/starknet/host/src/lib.rs b/code/crates/starknet/host/src/lib.rs index d3b8ee680..42ecfb67f 100644 --- a/code/crates/starknet/host/src/lib.rs +++ b/code/crates/starknet/host/src/lib.rs @@ -3,12 +3,12 @@ pub mod block_store; pub mod codec; pub mod host; pub mod mempool; +pub mod mempool_load; pub mod node; pub mod spawn; pub mod streaming; - +pub mod utils; pub use malachitebft_app::part_store; - pub mod proto { pub use malachitebft_proto::*; pub use malachitebft_starknet_p2p_proto::*; @@ -17,3 +17,4 @@ pub mod proto { pub mod types { pub use malachitebft_starknet_p2p_types::*; } + diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs new file mode 100644 index 000000000..2f9dcc471 --- /dev/null +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -0,0 +1,165 @@ +use crate::proto::Protobuf; +use async_trait::async_trait; +use malachitebft_config::MempoolLoadType; +use malachitebft_starknet_p2p_types::{Transaction, Transactions}; +use malachitebft_test_mempool::types::MempoolTransactionBatch; +use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; +use rand::{Rng, RngCore}; +use tracing::debug; +use std::time::Duration; + +use crate::{ + mempool::network::{MempoolNetworkMsg, MempoolNetworkRef}, + utils::ticker::ticker, +}; + +pub type MempoolLoadMsg = Msg; +pub type MempoolLoadRef = ActorRef; +pub enum Msg { + GenerateTransactions { count: usize, size: usize }, +} + +#[derive(Debug)] +pub struct State { + ticker: JoinHandle<()>, +} + + + +#[derive(Debug)] +pub struct Params { + pub load_type: MempoolLoadType, +} + +pub struct MempoolLoad { + params: Params, + network: MempoolNetworkRef, + span: tracing::Span, +} + +impl Default for Params { + fn default() -> Self { + Self { + load_type: MempoolLoadType::UniformLoad { + size: 555, + count: 127, + }, + } + } +} + +impl MempoolLoad { + pub fn new(params: Params, network: MempoolNetworkRef, span: tracing::Span) -> Self { + Self { + params, + network, + span, + } + } + + pub async fn spawn( + params: Params, + network: MempoolNetworkRef, + span: tracing::Span, + ) -> Result { + debug!("spawning actor mempool_load"); + + let actor = Self::new(params, network, span); + let (actor_ref, _) = Actor::spawn(None, actor, ()).await?; + Ok(actor_ref) + } + + pub fn generate_transactions(count: usize, size: usize) -> Vec { + let mut transactions: Vec = Vec::with_capacity(count); + let mut rng = rand::thread_rng(); + + for _ in 0..count { + let mut tx_bytes = vec![0; size]; + rng.fill_bytes(&mut tx_bytes); + let tx = Transaction::new(tx_bytes); + // debug!("transaction {:?}", tx.clone()); + + transactions.push(tx); + } + debug!("transactions generated {:?}", transactions.clone().len()); + + transactions + } +} + +#[async_trait] +impl Actor for MempoolLoad { + type Msg = Msg; + type State = State; + type Arguments = (); + + async fn pre_start( + &self, + myself: MempoolLoadRef, + _args: (), + ) -> Result { + debug!("starting ticker"); + + let ticker = match self.params.load_type { + MempoolLoadType::UniformLoad { count, size } => { + debug!("entered uniform load branch"); + + let interval = Duration::from_secs(1); + tokio::spawn(ticker(interval, myself.clone(), move || { + Msg::GenerateTransactions { count, size } + })) + } + MempoolLoadType::NoLoad => tokio::spawn(async {}), + MempoolLoadType::NonUniformLoad => { + debug!("entered nonuniform load branch"); + + let mut rng = rand::thread_rng(); + let interval = Duration::from_secs(rng.gen_range(1..10)); + let count = rng.gen_range(500..=10000) as usize; + let size = rng.gen_range(128..=512) as usize; + tokio::spawn(ticker(interval, myself.clone(), move || { + Msg::GenerateTransactions { count, size } + })) + } + }; + Ok(State { ticker }) + } + + async fn post_stop( + &self, + _myself: ActorRef, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + state.ticker.abort(); + Ok(()) + } + + #[tracing::instrument("host.mempool_load", parent = &self.span, skip_all)] + async fn handle( + &self, + _myself: MempoolLoadRef, + msg: Msg, + _state: &mut State, + ) -> Result<(), ActorProcessingErr> { + match msg { + Msg::GenerateTransactions { count, size } => { + debug!("entered message handler GenerateTransactions"); + + let mut tx_batch = Transactions::default(); + let transactions = Self::generate_transactions(count, size); + debug!("broadcasting transactions {:?}", transactions.len()); + + for tx in transactions { + tx_batch.push(tx); + } + let tx_batch1 = std::mem::take(&mut tx_batch).to_any().unwrap(); + let mempool_batch = MempoolTransactionBatch::new(tx_batch1); + debug!("broadcasting batch {:?}", tx_batch.len()); + + self.network + .cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; + Ok(()) + } + } + } +} diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index e612bf8d3..6be2c0e3a 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -7,7 +7,7 @@ use malachitebft_engine::wal::{Wal, WalRef}; use tokio::task::JoinHandle; use malachitebft_config::{ - self as config, Config as NodeConfig, MempoolConfig, SyncConfig, TestConfig, TransportProtocol, + self as config, Config as NodeConfig, MempoolConfig, MempoolLoadConfig, SyncConfig, TestConfig, TransportProtocol }; use malachitebft_core_consensus::ValuePayload; use malachitebft_engine::consensus::{Consensus, ConsensusParams, ConsensusRef}; @@ -26,6 +26,7 @@ use crate::codec::ProtobufCodec; use crate::host::{StarknetHost, StarknetParams}; use crate::mempool::network::{MempoolNetwork, MempoolNetworkRef}; use crate::mempool::{Mempool, MempoolRef}; +use crate::mempool_load::{MempoolLoad, MempoolLoadRef, Params}; use crate::types::MockContext; use crate::types::{Address, Height, PrivateKey, ValidatorSet}; @@ -50,6 +51,10 @@ pub async fn spawn_node_actor( let mempool_network = spawn_mempool_network_actor(&cfg, &private_key, ®istry, &span).await; let mempool = spawn_mempool_actor(mempool_network.clone(), &cfg.mempool, &cfg.test, &span).await; + let mempool_load = + spawn_mempool_load_actor( + &cfg.mempool_load, + mempool_network.clone(), &span).await; // Spawn consensus gossip let network = spawn_network_actor(&cfg, &private_key, ®istry, &span).await; @@ -62,6 +67,7 @@ pub async fn spawn_node_actor( &private_key, &initial_validator_set, mempool.clone(), + mempool_load, network.clone(), metrics.clone(), &span, @@ -291,6 +297,21 @@ async fn spawn_mempool_actor( .unwrap() } +async fn spawn_mempool_load_actor( + mempool_load_config: &MempoolLoadConfig, + network: MempoolNetworkRef, + span: &tracing::Span, +) -> MempoolLoadRef { + // let params = mempool_load::Params::default(); + + // debug!("spawned mempool load actor with params {:?}", params); + MempoolLoad::spawn( Params{ + load_type:mempool_load_config.load_type + }, network, span.clone()) + .await + .unwrap() +} + async fn spawn_mempool_network_actor( cfg: &NodeConfig, private_key: &PrivateKey, @@ -322,6 +343,7 @@ async fn spawn_host_actor( private_key: &PrivateKey, initial_validator_set: &ValidatorSet, mempool: MempoolRef, + mempool_load: MempoolLoadRef, network: NetworkRef, metrics: Metrics, span: &tracing::Span, @@ -346,6 +368,7 @@ async fn spawn_host_actor( let mock_host = StarknetHost::new( mock_params, mempool.clone(), + mempool_load.clone(), *address, *private_key, initial_validator_set.clone(), @@ -355,6 +378,7 @@ async fn spawn_host_actor( home_dir.to_owned(), mock_host, mempool, + mempool_load, network, metrics, span.clone(), diff --git a/code/crates/starknet/host/src/utils/mod.rs b/code/crates/starknet/host/src/utils/mod.rs new file mode 100644 index 000000000..2e6961ef8 --- /dev/null +++ b/code/crates/starknet/host/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod ticker; diff --git a/code/crates/starknet/host/src/utils/ticker.rs b/code/crates/starknet/host/src/utils/ticker.rs new file mode 100644 index 000000000..171a15051 --- /dev/null +++ b/code/crates/starknet/host/src/utils/ticker.rs @@ -0,0 +1,19 @@ +use ractor::message::Message; +use ractor::ActorRef; +use tracing::debug; +use std::time::Duration; + +pub async fn ticker(interval: Duration, target: ActorRef, msg: impl Fn() -> Msg) +where + Msg: Message, +{ + loop { + tokio::time::sleep(interval).await; + debug!("sending message generatetransactions"); + + if let Err(er) = target.cast(msg()) { + tracing::error!(?er, ?target, "Failed to send tick message"); + break; + } + } +} diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 12a2969ac..52d196053 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -14,8 +14,7 @@ use tokio::time::{sleep, Duration}; use tracing::{debug, error, error_span, info, Instrument, Span}; use malachitebft_config::{ - Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, PubSubProtocol, SyncConfig, - TestConfig, TransportProtocol, + Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, MempoolLoadConfig, MempoolLoadType, PubSubProtocol, SyncConfig, TestConfig, TransportProtocol }; use malachitebft_core_consensus::{LocallyProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{SignedVote, VotingPower}; @@ -736,6 +735,7 @@ pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { max_tx_count: 10000, gossip_batch_size: 100, }, + mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NonUniformLoad }, sync: SyncConfig { enabled: true, status_update_interval: Duration::from_secs(2), diff --git a/code/crates/test/cli/src/cmd/distributed_testnet.rs b/code/crates/test/cli/src/cmd/distributed_testnet.rs index a334a93cb..409cab1fe 100644 --- a/code/crates/test/cli/src/cmd/distributed_testnet.rs +++ b/code/crates/test/cli/src/cmd/distributed_testnet.rs @@ -293,6 +293,7 @@ fn generate_distributed_config( max_tx_count: 10000, gossip_batch_size: 0, }, + mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NoLoad }, sync: SyncConfig { enabled: false, status_update_interval: Duration::from_secs(0), diff --git a/code/crates/test/cli/src/new.rs b/code/crates/test/cli/src/new.rs index 46b812ae4..8708dc3f2 100644 --- a/code/crates/test/cli/src/new.rs +++ b/code/crates/test/cli/src/new.rs @@ -149,6 +149,7 @@ pub fn generate_config( max_tx_count: 10000, gossip_batch_size: 0, }, + mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NoLoad }, sync: Default::default(), metrics: MetricsConfig { enabled: true, From 55d37519660c984f7de58cb45c7d5725c791ada6 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Thu, 30 Jan 2025 15:54:14 +0100 Subject: [PATCH 02/21] set nonuniformload to all starknet configs --- code/crates/test/cli/src/cmd/distributed_testnet.rs | 2 +- code/crates/test/cli/src/new.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/code/crates/test/cli/src/cmd/distributed_testnet.rs b/code/crates/test/cli/src/cmd/distributed_testnet.rs index 409cab1fe..a314612d8 100644 --- a/code/crates/test/cli/src/cmd/distributed_testnet.rs +++ b/code/crates/test/cli/src/cmd/distributed_testnet.rs @@ -293,7 +293,7 @@ fn generate_distributed_config( max_tx_count: 10000, gossip_batch_size: 0, }, - mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NoLoad }, + mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NonUniformLoad }, sync: SyncConfig { enabled: false, status_update_interval: Duration::from_secs(0), diff --git a/code/crates/test/cli/src/new.rs b/code/crates/test/cli/src/new.rs index 8708dc3f2..dcd0c8af6 100644 --- a/code/crates/test/cli/src/new.rs +++ b/code/crates/test/cli/src/new.rs @@ -149,7 +149,7 @@ pub fn generate_config( max_tx_count: 10000, gossip_batch_size: 0, }, - mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NoLoad }, + mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NonUniformLoad }, sync: Default::default(), metrics: MetricsConfig { enabled: true, From 90560d200aaa5ef21d9a2b09a9daa15c5853bc89 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Thu, 30 Jan 2025 16:00:49 +0100 Subject: [PATCH 03/21] code formatting --- code/crates/config/src/lib.rs | 9 ++++--- code/crates/starknet/host/src/lib.rs | 1 - code/crates/starknet/host/src/mempool_load.rs | 10 +++----- code/crates/starknet/host/src/spawn.rs | 25 +++++++++++-------- code/crates/starknet/host/src/utils/ticker.rs | 2 +- code/crates/starknet/test/src/lib.rs | 7 ++++-- .../test/cli/src/cmd/distributed_testnet.rs | 4 ++- code/crates/test/cli/src/new.rs | 4 ++- 8 files changed, 36 insertions(+), 26 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 161345ed6..3453a7fd8 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -24,10 +24,10 @@ pub struct Config { /// Mempool configuration options pub mempool: MempoolConfig, - + /// Mempool load configuration options pub mempool_load: MempoolLoadConfig, - + /// Sync configuration options pub sync: SyncConfig, @@ -345,7 +345,10 @@ mod gossipsub { #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum MempoolLoadType { - UniformLoad {count: usize, size: usize}, + UniformLoad { + count: usize, + size: usize, + }, #[default] NoLoad, NonUniformLoad, diff --git a/code/crates/starknet/host/src/lib.rs b/code/crates/starknet/host/src/lib.rs index 42ecfb67f..6e594728a 100644 --- a/code/crates/starknet/host/src/lib.rs +++ b/code/crates/starknet/host/src/lib.rs @@ -17,4 +17,3 @@ pub mod proto { pub mod types { pub use malachitebft_starknet_p2p_types::*; } - diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index 2f9dcc471..8d2cf7be9 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -5,8 +5,8 @@ use malachitebft_starknet_p2p_types::{Transaction, Transactions}; use malachitebft_test_mempool::types::MempoolTransactionBatch; use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; use rand::{Rng, RngCore}; -use tracing::debug; use std::time::Duration; +use tracing::debug; use crate::{ mempool::network::{MempoolNetworkMsg, MempoolNetworkRef}, @@ -24,8 +24,6 @@ pub struct State { ticker: JoinHandle<()>, } - - #[derive(Debug)] pub struct Params { pub load_type: MempoolLoadType, @@ -98,7 +96,7 @@ impl Actor for MempoolLoad { myself: MempoolLoadRef, _args: (), ) -> Result { - debug!("starting ticker"); + debug!("starting ticker"); let ticker = match self.params.load_type { MempoolLoadType::UniformLoad { count, size } => { @@ -111,7 +109,7 @@ impl Actor for MempoolLoad { } MempoolLoadType::NoLoad => tokio::spawn(async {}), MempoolLoadType::NonUniformLoad => { - debug!("entered nonuniform load branch"); + debug!("entered nonuniform load branch"); let mut rng = rand::thread_rng(); let interval = Duration::from_secs(rng.gen_range(1..10)); @@ -143,7 +141,7 @@ impl Actor for MempoolLoad { ) -> Result<(), ActorProcessingErr> { match msg { Msg::GenerateTransactions { count, size } => { - debug!("entered message handler GenerateTransactions"); + debug!("entered message handler GenerateTransactions"); let mut tx_batch = Transactions::default(); let transactions = Self::generate_transactions(count, size); diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index 6be2c0e3a..d3476fdc4 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -7,7 +7,8 @@ use malachitebft_engine::wal::{Wal, WalRef}; use tokio::task::JoinHandle; use malachitebft_config::{ - self as config, Config as NodeConfig, MempoolConfig, MempoolLoadConfig, SyncConfig, TestConfig, TransportProtocol + self as config, Config as NodeConfig, MempoolConfig, MempoolLoadConfig, SyncConfig, TestConfig, + TransportProtocol, }; use malachitebft_core_consensus::ValuePayload; use malachitebft_engine::consensus::{Consensus, ConsensusParams, ConsensusRef}; @@ -51,10 +52,8 @@ pub async fn spawn_node_actor( let mempool_network = spawn_mempool_network_actor(&cfg, &private_key, ®istry, &span).await; let mempool = spawn_mempool_actor(mempool_network.clone(), &cfg.mempool, &cfg.test, &span).await; - let mempool_load = - spawn_mempool_load_actor( - &cfg.mempool_load, - mempool_network.clone(), &span).await; + let mempool_load = + spawn_mempool_load_actor(&cfg.mempool_load, mempool_network.clone(), &span).await; // Spawn consensus gossip let network = spawn_network_actor(&cfg, &private_key, ®istry, &span).await; @@ -303,13 +302,17 @@ async fn spawn_mempool_load_actor( span: &tracing::Span, ) -> MempoolLoadRef { // let params = mempool_load::Params::default(); - + // debug!("spawned mempool load actor with params {:?}", params); - MempoolLoad::spawn( Params{ - load_type:mempool_load_config.load_type - }, network, span.clone()) - .await - .unwrap() + MempoolLoad::spawn( + Params { + load_type: mempool_load_config.load_type, + }, + network, + span.clone(), + ) + .await + .unwrap() } async fn spawn_mempool_network_actor( diff --git a/code/crates/starknet/host/src/utils/ticker.rs b/code/crates/starknet/host/src/utils/ticker.rs index 171a15051..4d4e69d42 100644 --- a/code/crates/starknet/host/src/utils/ticker.rs +++ b/code/crates/starknet/host/src/utils/ticker.rs @@ -1,7 +1,7 @@ use ractor::message::Message; use ractor::ActorRef; -use tracing::debug; use std::time::Duration; +use tracing::debug; pub async fn ticker(interval: Duration, target: ActorRef, msg: impl Fn() -> Msg) where diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 52d196053..03147ea85 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -14,7 +14,8 @@ use tokio::time::{sleep, Duration}; use tracing::{debug, error, error_span, info, Instrument, Span}; use malachitebft_config::{ - Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, MempoolLoadConfig, MempoolLoadType, PubSubProtocol, SyncConfig, TestConfig, TransportProtocol + Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, MempoolLoadConfig, + MempoolLoadType, PubSubProtocol, SyncConfig, TestConfig, TransportProtocol, }; use malachitebft_core_consensus::{LocallyProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{SignedVote, VotingPower}; @@ -735,7 +736,9 @@ pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { max_tx_count: 10000, gossip_batch_size: 100, }, - mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NonUniformLoad }, + mempool_load: MempoolLoadConfig { + load_type: MempoolLoadType::NonUniformLoad, + }, sync: SyncConfig { enabled: true, status_update_interval: Duration::from_secs(2), diff --git a/code/crates/test/cli/src/cmd/distributed_testnet.rs b/code/crates/test/cli/src/cmd/distributed_testnet.rs index a314612d8..0ac9939b5 100644 --- a/code/crates/test/cli/src/cmd/distributed_testnet.rs +++ b/code/crates/test/cli/src/cmd/distributed_testnet.rs @@ -293,7 +293,9 @@ fn generate_distributed_config( max_tx_count: 10000, gossip_batch_size: 0, }, - mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NonUniformLoad }, + mempool_load: MempoolLoadConfig { + load_type: MempoolLoadType::NonUniformLoad, + }, sync: SyncConfig { enabled: false, status_update_interval: Duration::from_secs(0), diff --git a/code/crates/test/cli/src/new.rs b/code/crates/test/cli/src/new.rs index dcd0c8af6..b590dcc87 100644 --- a/code/crates/test/cli/src/new.rs +++ b/code/crates/test/cli/src/new.rs @@ -149,7 +149,9 @@ pub fn generate_config( max_tx_count: 10000, gossip_batch_size: 0, }, - mempool_load: MempoolLoadConfig { load_type: MempoolLoadType::NonUniformLoad }, + mempool_load: MempoolLoadConfig { + load_type: MempoolLoadType::NonUniformLoad, + }, sync: Default::default(), metrics: MetricsConfig { enabled: true, From 9f3bb0617bdea3f955954090cd1b54f3110240ed Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Thu, 30 Jan 2025 17:28:32 +0100 Subject: [PATCH 04/21] added mempool_load to config --- code/crates/config/src/lib.rs | 2 +- code/examples/channel/config.toml | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 3453a7fd8..9dd4ad686 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -343,7 +343,7 @@ mod gossipsub { } #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] +#[serde(rename_all = "snake_case")] pub enum MempoolLoadType { UniformLoad { count: usize, diff --git a/code/examples/channel/config.toml b/code/examples/channel/config.toml index 7a3b07d86..a6b780644 100644 --- a/code/examples/channel/config.toml +++ b/code/examples/channel/config.toml @@ -177,6 +177,12 @@ rpc_max_size = "10 MiB" # Broadcast is an experimental protocol with no additional configuration options. type = "gossipsub" +####################################################### +### Mempool Load Configuration Options ### +####################################################### +[mempool_load] +# Sets the type of generation for mempool transactions +load_type = "uniform_load" ####################################################### ### Sync Configuration Options ### From 34c0be0a8b39785b61f1fc4e6f86588bbc96340a Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Thu, 30 Jan 2025 17:32:21 +0100 Subject: [PATCH 05/21] fixed broadcasting in mempool_load --- code/crates/starknet/host/src/mempool_load.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index 8d2cf7be9..dacc6e746 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -143,16 +143,13 @@ impl Actor for MempoolLoad { Msg::GenerateTransactions { count, size } => { debug!("entered message handler GenerateTransactions"); - let mut tx_batch = Transactions::default(); let transactions = Self::generate_transactions(count, size); debug!("broadcasting transactions {:?}", transactions.len()); - for tx in transactions { - tx_batch.push(tx); - } - let tx_batch1 = std::mem::take(&mut tx_batch).to_any().unwrap(); - let mempool_batch = MempoolTransactionBatch::new(tx_batch1); - debug!("broadcasting batch {:?}", tx_batch.len()); + let tx_batch = Transactions::new(transactions).to_any().unwrap(); + debug!("broadcasting batch {:?}", tx_batch.clone().value.len()); + + let mempool_batch: MempoolTransactionBatch = MempoolTransactionBatch::new(tx_batch); self.network .cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; From 3241d9d8a6fe10f6d2dd45beee2a9f783161e466 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Thu, 30 Jan 2025 17:53:57 +0100 Subject: [PATCH 06/21] fixed config --- code/crates/config/src/lib.rs | 2 +- code/examples/channel/config.toml | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 9dd4ad686..f22531ba0 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -436,7 +436,7 @@ pub struct TimeoutConfig { #[serde(with = "humantime_serde")] pub timeout_propose_delta: Duration, - /// How long we wait after receiving +2/3 prevotes for “anything” (ie. not a single block or nil) + /// How long we wait after receiving +2/3 prevotes for "anything" (ie. not a single block or nil) #[serde(with = "humantime_serde")] pub timeout_prevote: Duration, diff --git a/code/examples/channel/config.toml b/code/examples/channel/config.toml index a6b780644..de0a5bdc4 100644 --- a/code/examples/channel/config.toml +++ b/code/examples/channel/config.toml @@ -178,11 +178,13 @@ rpc_max_size = "10 MiB" type = "gossipsub" ####################################################### -### Mempool Load Configuration Options ### +### Mempool Load Configuration Options ### ####################################################### + [mempool_load] # Sets the type of generation for mempool transactions -load_type = "uniform_load" +# Valid options are "no_load", "uniform_load", "non_uniform_load" +load_type = "no_load" ####################################################### ### Sync Configuration Options ### From 7748c362c87c3b993318f80bdc77b70dac93ebb1 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Fri, 31 Jan 2025 09:43:52 +0100 Subject: [PATCH 07/21] addressed minor pr comments --- .../crates/starknet/host/src/host/starknet.rs | 1 - code/crates/starknet/host/src/mempool_load.rs | 19 ++++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/code/crates/starknet/host/src/host/starknet.rs b/code/crates/starknet/host/src/host/starknet.rs index 5a8319599..7f618141c 100644 --- a/code/crates/starknet/host/src/host/starknet.rs +++ b/code/crates/starknet/host/src/host/starknet.rs @@ -14,7 +14,6 @@ use malachitebft_core_types::{CommitCertificate, Extension, Round, SignedExtensi use crate::host::Host; use crate::mempool::MempoolRef; use crate::mempool_load::MempoolLoadRef; -// use crate::mempool_load::MempoolLoadRef; use crate::part_store::PartStore; use crate::types::*; diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index dacc6e746..05020593f 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -1,12 +1,15 @@ -use crate::proto::Protobuf; +use std::time::Duration; + use async_trait::async_trait; +use rand::{Rng, RngCore}; +use tracing::debug; +use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; + use malachitebft_config::MempoolLoadType; use malachitebft_starknet_p2p_types::{Transaction, Transactions}; use malachitebft_test_mempool::types::MempoolTransactionBatch; -use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; -use rand::{Rng, RngCore}; -use std::time::Duration; -use tracing::debug; + +use crate::proto::Protobuf; use crate::{ mempool::network::{MempoolNetworkMsg, MempoolNetworkRef}, @@ -15,6 +18,7 @@ use crate::{ pub type MempoolLoadMsg = Msg; pub type MempoolLoadRef = ActorRef; + pub enum Msg { GenerateTransactions { count: usize, size: usize }, } @@ -38,10 +42,7 @@ pub struct MempoolLoad { impl Default for Params { fn default() -> Self { Self { - load_type: MempoolLoadType::UniformLoad { - size: 555, - count: 127, - }, + load_type: MempoolLoadType::NoLoad } } } From 9ecce10a49abdb4f4b7a1050301f0aec6f8f5ac5 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Fri, 31 Jan 2025 09:50:53 +0100 Subject: [PATCH 08/21] formatting --- code/crates/starknet/host/src/mempool_load.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index 05020593f..cd9c7565b 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -1,9 +1,9 @@ use std::time::Duration; use async_trait::async_trait; +use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; use rand::{Rng, RngCore}; use tracing::debug; -use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; use malachitebft_config::MempoolLoadType; use malachitebft_starknet_p2p_types::{Transaction, Transactions}; @@ -42,7 +42,7 @@ pub struct MempoolLoad { impl Default for Params { fn default() -> Self { Self { - load_type: MempoolLoadType::NoLoad + load_type: MempoolLoadType::NoLoad, } } } From 8efd1710902bb799ca9548bb0209db15ac5d7d78 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Fri, 31 Jan 2025 15:40:50 +0100 Subject: [PATCH 09/21] reworked configuration --- code/crates/config/src/lib.rs | 73 +++++++++++++++++-- code/crates/starknet/host/src/mempool.rs | 50 +++++++------ code/crates/starknet/host/src/mempool_load.rs | 29 ++++---- code/crates/starknet/host/src/spawn.rs | 3 - code/crates/starknet/test/src/lib.rs | 2 +- .../test/cli/src/cmd/distributed_testnet.rs | 2 +- code/crates/test/cli/src/new.rs | 2 +- code/examples/channel/config.toml | 13 +++- 8 files changed, 122 insertions(+), 52 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index f22531ba0..e5d95d08b 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -342,18 +342,75 @@ mod gossipsub { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "load_type", rename_all = "snake_case")] pub enum MempoolLoadType { - UniformLoad { - count: usize, - size: usize, - }, - #[default] + UniformLoad(UniformLoadConfig), + // #[default] NoLoad, NonUniformLoad, } +impl Default for MempoolLoadType { + fn default() -> Self { + Self::UniformLoad(UniformLoadConfig::default()) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(from = "uniformload::RawConfig", default)] +pub struct UniformLoadConfig { + #[serde(with = "humantime_serde")] + interval: Duration, + count: usize, + size: usize, +} +impl UniformLoadConfig { + fn new(interval: Duration, count: usize, size: usize) -> Self { + Self { + interval: interval, + count: count, + size: size, + } + } + pub fn interval(&self) -> Duration { + self.interval + } + + pub fn count(&self) -> usize { + self.count + } + + pub fn size(&self) -> usize { + self.size + } +} +impl Default for UniformLoadConfig { + fn default() -> Self { + Self::new(Duration::from_secs(10), 1000, 256) + } +} + +mod uniformload { + use std::time::Duration; + + #[derive(serde::Deserialize)] + pub struct RawConfig { + #[serde(default)] + #[serde(with = "humantime_serde")] + interval: Duration, + #[serde(default)] + count: usize, + #[serde(default)] + size: usize, + } + + impl From for super::UniformLoadConfig { + fn from(raw: RawConfig) -> Self { + super::UniformLoadConfig::new(raw.interval, raw.count, raw.size) + } + } +} /// Mempool configuration options #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct MempoolLoadConfig { @@ -436,7 +493,7 @@ pub struct TimeoutConfig { #[serde(with = "humantime_serde")] pub timeout_propose_delta: Duration, - /// How long we wait after receiving +2/3 prevotes for "anything" (ie. not a single block or nil) + /// How long we wait after receiving +2/3 prevotes for “anything” (ie. not a single block or nil) #[serde(with = "humantime_serde")] pub timeout_prevote: Duration, diff --git a/code/crates/starknet/host/src/mempool.rs b/code/crates/starknet/host/src/mempool.rs index 6f10b057a..e5543efd5 100644 --- a/code/crates/starknet/host/src/mempool.rs +++ b/code/crates/starknet/host/src/mempool.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use async_trait::async_trait; use bytesize::ByteSize; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; -use rand::RngCore; use tracing::{debug, info, trace}; use malachitebft_test_mempool::types::MempoolTransactionBatch; @@ -216,14 +215,15 @@ impl Actor for Mempool { reply.send(txes)?; } - Msg::Update { .. } => { + Msg::Update { tx_hashes } => { // Clear all transactions from the mempool, given that we consume // the full mempool when proposing a block. // // This reduces the mempool protocol overhead and allow us to // observe issues strictly related to consensus. // It also bumps performance, as we reduce the mempool's background traffic. - state.transactions.clear(); + // state.transactions.clear(); + tx_hashes.iter().for_each(|hash| state.remove_tx(hash)); } } @@ -254,35 +254,39 @@ fn generate_and_broadcast_txes( let gossip_enabled = gossip_batch_size > 0; // Start with transactions already in the mempool - let mut transactions = std::mem::take(&mut state.transactions) + let transactions = std::mem::take(&mut state.transactions) .into_values() .take(count) .collect::>(); - let initial_count = transactions.len(); + // let initial_count = transactions.len(); - let mut tx_batch = Transactions::default(); - let mut rng = rand::thread_rng(); + let mut tx_batch = Transactions::new(transactions.clone()); + // let mut rng = rand::thread_rng(); - for _ in initial_count..count { - // Generate transaction - let mut tx_bytes = vec![0; size]; - rng.fill_bytes(&mut tx_bytes); - let tx = Transaction::new(tx_bytes); + if gossip_enabled && tx_batch.len() >= batch_size { + let tx_batch = std::mem::take(&mut tx_batch).to_any().unwrap(); + let mempool_batch = MempoolTransactionBatch::new(tx_batch); + mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; + } + debug!("reaped transactions: {:?}", transactions.len()); + // for _ in initial_count..count { + // // Generate transaction + // let mut tx_bytes = vec![0; size]; + // rng.fill_bytes(&mut tx_bytes); + // let tx = Transaction::new(tx_bytes); - if gossip_enabled { - tx_batch.push(tx.clone()); - } + // if gossip_enabled { + // tx_batch.push(tx.clone()); + // } - transactions.push(tx); + // transactions.push(tx); - // Gossip tx-es to peers in batches - if gossip_enabled && tx_batch.len() >= batch_size { - let tx_batch = std::mem::take(&mut tx_batch).to_any().unwrap(); - let mempool_batch = MempoolTransactionBatch::new(tx_batch); - mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; - } - } + // // Gossip tx-es to peers in batches + // if gossip_enabled && tx_batch.len() >= batch_size { + + // } + // } Ok(transactions) } diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index cd9c7565b..16df16d59 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -42,7 +42,7 @@ pub struct MempoolLoad { impl Default for Params { fn default() -> Self { Self { - load_type: MempoolLoadType::NoLoad, + load_type: MempoolLoadType::default(), } } } @@ -80,7 +80,7 @@ impl MempoolLoad { transactions.push(tx); } - debug!("transactions generated {:?}", transactions.clone().len()); + // debug!("transactions generated {:?}", transactions.clone().len()); transactions } @@ -100,17 +100,20 @@ impl Actor for MempoolLoad { debug!("starting ticker"); let ticker = match self.params.load_type { - MempoolLoadType::UniformLoad { count, size } => { - debug!("entered uniform load branch"); - - let interval = Duration::from_secs(1); - tokio::spawn(ticker(interval, myself.clone(), move || { - Msg::GenerateTransactions { count, size } - })) + MempoolLoadType::UniformLoad(uniform_load_config) => { + // debug!("entered uniform load branch"); + tokio::spawn(ticker( + uniform_load_config.interval(), + myself.clone(), + move || Msg::GenerateTransactions { + count: uniform_load_config.count(), + size: uniform_load_config.size(), + }, + )) } MempoolLoadType::NoLoad => tokio::spawn(async {}), MempoolLoadType::NonUniformLoad => { - debug!("entered nonuniform load branch"); + // debug!("entered nonuniform load branch"); let mut rng = rand::thread_rng(); let interval = Duration::from_secs(rng.gen_range(1..10)); @@ -142,13 +145,13 @@ impl Actor for MempoolLoad { ) -> Result<(), ActorProcessingErr> { match msg { Msg::GenerateTransactions { count, size } => { - debug!("entered message handler GenerateTransactions"); + // debug!("entered message handler GenerateTransactions"); let transactions = Self::generate_transactions(count, size); - debug!("broadcasting transactions {:?}", transactions.len()); + // debug!("broadcasting transactions {:?}", transactions.len()); let tx_batch = Transactions::new(transactions).to_any().unwrap(); - debug!("broadcasting batch {:?}", tx_batch.clone().value.len()); + // debug!("broadcasting batch {:?}", tx_batch.clone().value.len()); let mempool_batch: MempoolTransactionBatch = MempoolTransactionBatch::new(tx_batch); diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index d3476fdc4..49390d378 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -301,9 +301,6 @@ async fn spawn_mempool_load_actor( network: MempoolNetworkRef, span: &tracing::Span, ) -> MempoolLoadRef { - // let params = mempool_load::Params::default(); - - // debug!("spawned mempool load actor with params {:?}", params); MempoolLoad::spawn( Params { load_type: mempool_load_config.load_type, diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 03147ea85..50c58d3b6 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -737,7 +737,7 @@ pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { gossip_batch_size: 100, }, mempool_load: MempoolLoadConfig { - load_type: MempoolLoadType::NonUniformLoad, + load_type: MempoolLoadType::default(), }, sync: SyncConfig { enabled: true, diff --git a/code/crates/test/cli/src/cmd/distributed_testnet.rs b/code/crates/test/cli/src/cmd/distributed_testnet.rs index 0ac9939b5..af8db7742 100644 --- a/code/crates/test/cli/src/cmd/distributed_testnet.rs +++ b/code/crates/test/cli/src/cmd/distributed_testnet.rs @@ -294,7 +294,7 @@ fn generate_distributed_config( gossip_batch_size: 0, }, mempool_load: MempoolLoadConfig { - load_type: MempoolLoadType::NonUniformLoad, + load_type: MempoolLoadType::default(), }, sync: SyncConfig { enabled: false, diff --git a/code/crates/test/cli/src/new.rs b/code/crates/test/cli/src/new.rs index b590dcc87..de45064f6 100644 --- a/code/crates/test/cli/src/new.rs +++ b/code/crates/test/cli/src/new.rs @@ -150,7 +150,7 @@ pub fn generate_config( gossip_batch_size: 0, }, mempool_load: MempoolLoadConfig { - load_type: MempoolLoadType::NonUniformLoad, + load_type: MempoolLoadType::default(), }, sync: Default::default(), metrics: MetricsConfig { diff --git a/code/examples/channel/config.toml b/code/examples/channel/config.toml index de0a5bdc4..0c0ee9a54 100644 --- a/code/examples/channel/config.toml +++ b/code/examples/channel/config.toml @@ -181,10 +181,19 @@ type = "gossipsub" ### Mempool Load Configuration Options ### ####################################################### -[mempool_load] +[mempool_load.load_type] # Sets the type of generation for mempool transactions # Valid options are "no_load", "uniform_load", "non_uniform_load" -load_type = "no_load" +load_type = "uniform_load" + +# Only available when "uniform_load", mempool load interval +interval = "5s" + +# Only available when "uniform_load", mempool load size +size = 256 + +# Only available when "uniform_load", mempool load count +count = 1000 ####################################################### ### Sync Configuration Options ### From 0aa4d130437252a44b16b1a1fa99cb81b22ccf2f Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Fri, 31 Jan 2025 18:35:07 +0100 Subject: [PATCH 10/21] extended config with non_uniform_load types --- code/crates/config/src/lib.rs | 131 +++++++++++++++++- code/crates/starknet/host/src/mempool_load.rs | 4 +- code/crates/starknet/host/src/spawn.rs | 2 +- code/examples/channel/config.toml | 41 +++++- 4 files changed, 168 insertions(+), 10 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index e5d95d08b..b1792bb6e 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -342,13 +342,13 @@ mod gossipsub { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(tag = "load_type", rename_all = "snake_case")] pub enum MempoolLoadType { UniformLoad(UniformLoadConfig), // #[default] NoLoad, - NonUniformLoad, + NonUniformLoad(NonUniformLoadConfig), } impl Default for MempoolLoadType { @@ -356,7 +356,134 @@ impl Default for MempoolLoadType { Self::UniformLoad(UniformLoadConfig::default()) } } +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(from = "nonuniformload::RawConfig", default)] +pub struct NonUniformLoadConfig { + /// Base transaction count + base_count: usize, + + /// Base transaction size + base_size: usize, + + /// How much the transaction count can vary + count_variation: std::ops::Range, + + size_variation: std::ops::Range, + + /// Chance of generating a spike. + /// e.g. 0.1 = 10% chance of spike + spike_probability: f64, + + /// Multiplier for spike transactions + /// e.g. 10 = 10x more transactions during spike + spike_multiplier: usize, + /// Range of intervals between generating load, in milliseconds + sleep_interval: std::ops::Range, +} +impl NonUniformLoadConfig { + pub fn new( + base_count: usize, + base_size: usize, + count_variation: std::ops::Range, + size_variation: std::ops::Range, + spike_probability: f64, + spike_multiplier: usize, + sleep_interval: std::ops::Range, + ) -> Self { + Self { + base_count, + base_size, + count_variation, + size_variation, + spike_probability, + spike_multiplier, + sleep_interval, + } + } + pub fn base_count(&self) -> usize { + self.base_count + } + pub fn base_size(&self) -> usize { + self.base_size + } + pub fn count_variation(&self) -> std::ops::Range { + self.count_variation.clone() + } + pub fn size_variation(&self) -> std::ops::Range { + self.size_variation.clone() + } + pub fn spike_probability(&self) -> f64 { + self.spike_probability + } + pub fn spike_multiplier(&self) -> usize { + self.spike_multiplier + } + pub fn sleep_interval(&self) -> std::ops::Range { + self.sleep_interval.clone() + } +} + +impl Default for NonUniformLoadConfig { + fn default() -> Self { + Self { + base_count: Default::default(), + base_size: Default::default(), + count_variation: Default::default(), + size_variation: Default::default(), + spike_probability: Default::default(), + spike_multiplier: Default::default(), + sleep_interval: Default::default(), + } + } +} +mod nonuniformload { + #[derive(serde::Deserialize)] + pub struct RawConfig { + /// Base transaction count + #[serde(default)] + base_count: usize, + + /// Base transaction size + #[serde(default)] + base_size: usize, + + /// How much the transaction count can vary + #[serde(default)] + count_variation: std::ops::Range, + + #[serde(default)] + size_variation: std::ops::Range, + + /// Chance of generating a spike. + /// e.g. 0.1 = 10% chance of spike + #[serde(default)] + spike_probability: f64, + + /// Multiplier for spike transactions + /// e.g. 10 = 10x more transactions during spike + #[serde(default)] + spike_multiplier: usize, + + /// Range of intervals between generating load, in milliseconds + #[serde(default)] + sleep_interval: std::ops::Range, + } + + impl From for super::NonUniformLoadConfig { + fn from(raw: RawConfig) -> Self { + super::NonUniformLoadConfig::new( + raw.base_count, + raw.base_size, + raw.count_variation, + raw.size_variation, + raw.spike_probability, + raw.spike_multiplier, + raw.sleep_interval, + ) + } + } +} #[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(from = "uniformload::RawConfig", default)] pub struct UniformLoadConfig { diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index 16df16d59..cdb665d1a 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -99,7 +99,7 @@ impl Actor for MempoolLoad { ) -> Result { debug!("starting ticker"); - let ticker = match self.params.load_type { + let ticker = match self.params.load_type.clone() { MempoolLoadType::UniformLoad(uniform_load_config) => { // debug!("entered uniform load branch"); tokio::spawn(ticker( @@ -112,7 +112,7 @@ impl Actor for MempoolLoad { )) } MempoolLoadType::NoLoad => tokio::spawn(async {}), - MempoolLoadType::NonUniformLoad => { + MempoolLoadType::NonUniformLoad(non_uniform_load) => { // debug!("entered nonuniform load branch"); let mut rng = rand::thread_rng(); diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index 49390d378..48ec26ba8 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -303,7 +303,7 @@ async fn spawn_mempool_load_actor( ) -> MempoolLoadRef { MempoolLoad::spawn( Params { - load_type: mempool_load_config.load_type, + load_type: mempool_load_config.load_type.clone(), }, network, span.clone(), diff --git a/code/examples/channel/config.toml b/code/examples/channel/config.toml index 0c0ee9a54..dba20f55b 100644 --- a/code/examples/channel/config.toml +++ b/code/examples/channel/config.toml @@ -48,14 +48,14 @@ timeout_propose = "3s" # How much timeout_propose increases with each round timeout_propose_delta = "500ms" -# How long we wait after receiving +2/3 prevotes for “anything” (ie. not a single block or nil) +# How long we wait after receiving +2/3 prevotes for "anything" (ie. not a single block or nil) # Override with MALACHITE__CONSENSUS__TIMEOUT_PREVOTE env variable timeout_prevote = "1s" # How much the timeout_prevote increases with each round timeout_prevote_delta = "500ms" -# How long we wait after receiving +2/3 precommits for “anything” (ie. not a single block or nil) +# How long we wait after receiving +2/3 precommits for "anything" (ie. not a single block or nil) # Override with MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT env variable timeout_precommit = "1s" @@ -186,15 +186,46 @@ type = "gossipsub" # Valid options are "no_load", "uniform_load", "non_uniform_load" load_type = "uniform_load" -# Only available when "uniform_load", mempool load interval +# Only available when "uniform_load" +# Sets interval for transaction generation interval = "5s" -# Only available when "uniform_load", mempool load size +# Only available when "uniform_load" +# Sets size of the transactions size = 256 -# Only available when "uniform_load", mempool load count +# Only available when "uniform_load" +# Sets size of the transactions count = 1000 +# Only available when "non_uniform_load" +# Base number of transactions +base_count = 1000 + +# Only available when "non_uniform_load" +# Base size in bytes +base_size = 256 + +# Only available when "non_uniform_load" +# Variation ranges: Transactions will vary by ±500 from base_count +count_variation = { start = -500, end = 500 } + +# Only available when "non_uniform_load" +# Variation ranges: Size will vary by -64 to +128 bytes from base_size +size_variation = { start = -64, end = 128 } + +# Only available when "non_uniform_load" +# Spike configuration: 10% chance of a spike occurring +spike_probability = 0.1 + +# Only available when "non_uniform_load" +# Spike configuration: During spike, multiply transaction count by 5 +spike_multiplier = 5 + +# Only available when "non_uniform_load" +# Sleep interval in milliseconds +sleep_interval = { start = 1000, end = 5000 } # Random delay between 1-5 seconds + ####################################################### ### Sync Configuration Options ### ####################################################### From d6c56f45ed70fce02c4c4e11b75812afcebe7f9e Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Fri, 31 Jan 2025 20:05:43 +0100 Subject: [PATCH 11/21] wip, nonuniform load --- code/crates/config/src/lib.rs | 21 +----- code/crates/starknet/host/src/mempool_load.rs | 74 +++++++++++-------- code/crates/starknet/test/src/lib.rs | 13 +++- 3 files changed, 59 insertions(+), 49 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index b1792bb6e..21684d1d5 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -356,7 +356,7 @@ impl Default for MempoolLoadType { Self::UniformLoad(UniformLoadConfig::default()) } } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] #[serde(from = "nonuniformload::RawConfig", default)] pub struct NonUniformLoadConfig { /// Base transaction count @@ -424,19 +424,6 @@ impl NonUniformLoadConfig { } } -impl Default for NonUniformLoadConfig { - fn default() -> Self { - Self { - base_count: Default::default(), - base_size: Default::default(), - count_variation: Default::default(), - size_variation: Default::default(), - spike_probability: Default::default(), - spike_multiplier: Default::default(), - sleep_interval: Default::default(), - } - } -} mod nonuniformload { #[derive(serde::Deserialize)] pub struct RawConfig { @@ -495,9 +482,9 @@ pub struct UniformLoadConfig { impl UniformLoadConfig { fn new(interval: Duration, count: usize, size: usize) -> Self { Self { - interval: interval, - count: count, - size: size, + interval, + count, + size, } } pub fn interval(&self) -> Duration { diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index cdb665d1a..d20191852 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -1,7 +1,9 @@ +use std::thread::sleep; use std::time::Duration; use async_trait::async_trait; use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; +use rand::seq::IteratorRandom; use rand::{Rng, RngCore}; use tracing::debug; @@ -28,7 +30,7 @@ pub struct State { ticker: JoinHandle<()>, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Params { pub load_type: MempoolLoadType, } @@ -39,14 +41,6 @@ pub struct MempoolLoad { span: tracing::Span, } -impl Default for Params { - fn default() -> Self { - Self { - load_type: MempoolLoadType::default(), - } - } -} - impl MempoolLoad { pub fn new(params: Params, network: MempoolNetworkRef, span: tracing::Span) -> Self { Self { @@ -100,29 +94,49 @@ impl Actor for MempoolLoad { debug!("starting ticker"); let ticker = match self.params.load_type.clone() { - MempoolLoadType::UniformLoad(uniform_load_config) => { - // debug!("entered uniform load branch"); - tokio::spawn(ticker( - uniform_load_config.interval(), - myself.clone(), - move || Msg::GenerateTransactions { - count: uniform_load_config.count(), - size: uniform_load_config.size(), - }, - )) - } + MempoolLoadType::UniformLoad(uniform_load_config) => tokio::spawn(ticker( + uniform_load_config.interval(), + myself.clone(), + move || Msg::GenerateTransactions { + count: uniform_load_config.count(), + size: uniform_load_config.size(), + }, + )), MempoolLoadType::NoLoad => tokio::spawn(async {}), - MempoolLoadType::NonUniformLoad(non_uniform_load) => { - // debug!("entered nonuniform load branch"); - + MempoolLoadType::NonUniformLoad(params) => tokio::spawn(async move { + // loop { let mut rng = rand::thread_rng(); - let interval = Duration::from_secs(rng.gen_range(1..10)); - let count = rng.gen_range(500..=10000) as usize; - let size = rng.gen_range(128..=512) as usize; - tokio::spawn(ticker(interval, myself.clone(), move || { - Msg::GenerateTransactions { count, size } - })) - } + // Determine if this iteration should generate a spike + let is_spike = rng.gen_bool(params.spike_probability()); + + // Vary transaction count and size + let count_variation = rng.gen_range(params.count_variation()); + let size_variation = rng.gen_range(params.size_variation()); + + let count = if is_spike { + (params.base_count() + count_variation as usize) * params.spike_multiplier() + } else { + params.base_count() + count_variation as usize + }; + + let size = params.base_size() + size_variation as usize; + + // Create and send the message + let msg = Msg::GenerateTransactions { + count: count.max(1), // Ensure count is at least 1 + size: size.max(1), // Ensure size is at least 1 + }; + + if let Err(er) = myself.cast(msg) { + tracing::error!(?er, ?myself, "Failed to send tick message"); + // break; + } + // Random sleep between 100ms and 1s + let sleep_duration = + Duration::from_millis(params.sleep_interval().choose(&mut rng).unwrap()); + sleep(sleep_duration); + // } + }), }; Ok(State { ticker }) } diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 50c58d3b6..2c719b1c7 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -15,7 +15,8 @@ use tracing::{debug, error, error_span, info, Instrument, Span}; use malachitebft_config::{ Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, MempoolLoadConfig, - MempoolLoadType, PubSubProtocol, SyncConfig, TestConfig, TransportProtocol, + MempoolLoadType, NonUniformLoadConfig, PubSubProtocol, SyncConfig, TestConfig, + TransportProtocol, }; use malachitebft_core_consensus::{LocallyProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{SignedVote, VotingPower}; @@ -737,7 +738,15 @@ pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { gossip_batch_size: 100, }, mempool_load: MempoolLoadConfig { - load_type: MempoolLoadType::default(), + load_type: MempoolLoadType::NonUniformLoad(NonUniformLoadConfig::new( + 100, // Base number of transactions + 256, // Random variation in transaction count + 1024..2048, // Base transaction size in bytes + 0..512, // Random variation in transaction size + 0.1, // 10% chance of spike + 5, // 5x multiplier during spikes + 100..500, // Sleep between 100ms and 500ms + )), }, sync: SyncConfig { enabled: true, From 58a7f479a3714e8645bae0012a4707b2d75cae1f Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Mon, 3 Feb 2025 18:47:33 +0100 Subject: [PATCH 12/21] wip nonuniformload, added debugs, added default implementation for nonuniformload --- code/crates/config/src/lib.rs | 20 +++++++++++++++---- .../crates/starknet/host/src/host/proposal.rs | 3 ++- code/crates/starknet/host/src/mempool.rs | 7 +++++-- .../starknet/host/src/mempool/network.rs | 3 +++ code/crates/starknet/host/src/mempool_load.rs | 8 ++++---- code/crates/starknet/host/src/utils/ticker.rs | 1 + 6 files changed, 31 insertions(+), 11 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 21684d1d5..4be48193c 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -346,17 +346,16 @@ mod gossipsub { #[serde(tag = "load_type", rename_all = "snake_case")] pub enum MempoolLoadType { UniformLoad(UniformLoadConfig), - // #[default] NoLoad, NonUniformLoad(NonUniformLoadConfig), } impl Default for MempoolLoadType { fn default() -> Self { - Self::UniformLoad(UniformLoadConfig::default()) + Self::NonUniformLoad(NonUniformLoadConfig::default()) } } -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(from = "nonuniformload::RawConfig", default)] pub struct NonUniformLoadConfig { /// Base transaction count @@ -381,6 +380,19 @@ pub struct NonUniformLoadConfig { /// Range of intervals between generating load, in milliseconds sleep_interval: std::ops::Range, } +impl Default for NonUniformLoadConfig { + fn default() -> Self { + Self::new( + 1000, + 256, + -500..500, + -64..128, + 0.10, + 5, + 1000..5000) + } +} + impl NonUniformLoadConfig { pub fn new( base_count: usize, @@ -501,7 +513,7 @@ impl UniformLoadConfig { } impl Default for UniformLoadConfig { fn default() -> Self { - Self::new(Duration::from_secs(10), 1000, 256) + Self::new(Duration::from_secs(10), 10000, 256) } } diff --git a/code/crates/starknet/host/src/host/proposal.rs b/code/crates/starknet/host/src/host/proposal.rs index 6bf6190bb..f273c6503 100644 --- a/code/crates/starknet/host/src/host/proposal.rs +++ b/code/crates/starknet/host/src/host/proposal.rs @@ -10,7 +10,7 @@ use rand::{RngCore, SeedableRng}; use sha3::Digest; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; -use tracing::{error, trace}; +use tracing::{debug, error, trace}; use malachitebft_core_types::Round; @@ -82,6 +82,7 @@ async fn run_build_proposal_task( }; loop { + debug!("reaping transactions"); trace!(%height, %round, %sequence, "Building local value"); let reaped_txes = mempool diff --git a/code/crates/starknet/host/src/mempool.rs b/code/crates/starknet/host/src/mempool.rs index e5543efd5..1c6da52bd 100644 --- a/code/crates/starknet/host/src/mempool.rs +++ b/code/crates/starknet/host/src/mempool.rs @@ -204,6 +204,7 @@ impl Actor for Mempool { Msg::Reap { reply, num_txes, .. } => { + debug!("reached reap endpoint"); let txes = generate_and_broadcast_txes( num_txes, self.test_tx_size.as_u64() as usize, @@ -211,7 +212,7 @@ impl Actor for Mempool { state, &self.network, )?; - + debug!("reaped transactions: {:?}", txes); reply.send(txes)?; } @@ -223,6 +224,7 @@ impl Actor for Mempool { // observe issues strictly related to consensus. // It also bumps performance, as we reduce the mempool's background traffic. // state.transactions.clear(); + debug!("tx hashes to remove: {:?}", tx_hashes); tx_hashes.iter().for_each(|hash| state.remove_tx(hash)); } } @@ -260,6 +262,7 @@ fn generate_and_broadcast_txes( .collect::>(); // let initial_count = transactions.len(); + debug!("reaped transactions: {:?}", transactions.len()); let mut tx_batch = Transactions::new(transactions.clone()); // let mut rng = rand::thread_rng(); @@ -269,7 +272,7 @@ fn generate_and_broadcast_txes( let mempool_batch = MempoolTransactionBatch::new(tx_batch); mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; } - debug!("reaped transactions: {:?}", transactions.len()); + debug!("reaped transactions after batch sent: {:?}", transactions.len()); // for _ in initial_count..count { // // Generate transaction // let mut tx_bytes = vec![0; size]; diff --git a/code/crates/starknet/host/src/mempool/network.rs b/code/crates/starknet/host/src/mempool/network.rs index 467cd220a..8408abc27 100644 --- a/code/crates/starknet/host/src/mempool/network.rs +++ b/code/crates/starknet/host/src/mempool/network.rs @@ -9,6 +9,7 @@ use ractor::ActorRef; use ractor::OutputPort; use ractor::{Actor, RpcReplyPort}; use tokio::task::JoinHandle; +use tracing::debug; use tracing::error; use malachitebft_metrics::SharedRegistry; @@ -134,6 +135,8 @@ impl Actor for MempoolNetwork { Msg::Subscribe(subscriber) => subscriber.subscribe_to_port(output_port), Msg::BroadcastMsg(batch) => { + debug!("entered broadcast batch"); + match NetworkMsg::TransactionBatch(batch).to_network_bytes() { Ok(bytes) => { ctrl_handle.broadcast(Mempool, bytes).await?; diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index d20191852..b0fad6ac5 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -74,7 +74,7 @@ impl MempoolLoad { transactions.push(tx); } - // debug!("transactions generated {:?}", transactions.clone().len()); + debug!("MEMPOOL LOAD TX GENERATED {:?}", transactions.clone().len()); transactions } @@ -159,13 +159,13 @@ impl Actor for MempoolLoad { ) -> Result<(), ActorProcessingErr> { match msg { Msg::GenerateTransactions { count, size } => { - // debug!("entered message handler GenerateTransactions"); + debug!("entered message handler GenerateTransactions"); let transactions = Self::generate_transactions(count, size); - // debug!("broadcasting transactions {:?}", transactions.len()); + debug!("broadcasting transactions {:?}", transactions.len()); let tx_batch = Transactions::new(transactions).to_any().unwrap(); - // debug!("broadcasting batch {:?}", tx_batch.clone().value.len()); + debug!("broadcasting batch {:?}", tx_batch.clone().value.len()); let mempool_batch: MempoolTransactionBatch = MempoolTransactionBatch::new(tx_batch); diff --git a/code/crates/starknet/host/src/utils/ticker.rs b/code/crates/starknet/host/src/utils/ticker.rs index 4d4e69d42..d6e4b7643 100644 --- a/code/crates/starknet/host/src/utils/ticker.rs +++ b/code/crates/starknet/host/src/utils/ticker.rs @@ -8,6 +8,7 @@ where Msg: Message, { loop { + debug!("slept for interval {:?}", interval); tokio::time::sleep(interval).await; debug!("sending message generatetransactions"); From a60a0844fb8bac01395f290b567eed583de4e9f9 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Tue, 4 Feb 2025 13:07:22 +0100 Subject: [PATCH 13/21] fix nonuniformload --- code/Cargo.toml | 2 +- code/crates/config/src/lib.rs | 25 +++---- code/crates/starknet/host/src/mempool.rs | 10 ++- code/crates/starknet/host/src/mempool_load.rs | 68 +++++++++---------- 4 files changed, 51 insertions(+), 54 deletions(-) diff --git a/code/Cargo.toml b/code/Cargo.toml index 7ba6c9cb8..d0f034818 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -140,7 +140,7 @@ prost = "0.13" prost-build = "0.13" prost-types = "0.13" ractor = { version = "0.14.6", default-features = false, features = ["async-trait", "tokio_runtime"] } -rand = { version = "0.8.5", features = ["std_rng"] } +rand = { version = "0.8.5", features = ["std_rng", "small_rng"] } rand_chacha = "0.3.1" redb = "2.4.0" seahash = "4.1" diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 4be48193c..233a61e70 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -359,10 +359,10 @@ impl Default for MempoolLoadType { #[serde(from = "nonuniformload::RawConfig", default)] pub struct NonUniformLoadConfig { /// Base transaction count - base_count: usize, + base_count: i32, /// Base transaction size - base_size: usize, + base_size: i32, /// How much the transaction count can vary count_variation: std::ops::Range, @@ -382,21 +382,14 @@ pub struct NonUniformLoadConfig { } impl Default for NonUniformLoadConfig { fn default() -> Self { - Self::new( - 1000, - 256, - -500..500, - -64..128, - 0.10, - 5, - 1000..5000) + Self::new(1000, 256, -500..500, -64..128, 0.10, 2, 100..1000) } } impl NonUniformLoadConfig { pub fn new( - base_count: usize, - base_size: usize, + base_count: i32, + base_size: i32, count_variation: std::ops::Range, size_variation: std::ops::Range, spike_probability: f64, @@ -413,10 +406,10 @@ impl NonUniformLoadConfig { sleep_interval, } } - pub fn base_count(&self) -> usize { + pub fn base_count(&self) -> i32 { self.base_count } - pub fn base_size(&self) -> usize { + pub fn base_size(&self) -> i32 { self.base_size } pub fn count_variation(&self) -> std::ops::Range { @@ -441,11 +434,11 @@ mod nonuniformload { pub struct RawConfig { /// Base transaction count #[serde(default)] - base_count: usize, + base_count: i32, /// Base transaction size #[serde(default)] - base_size: usize, + base_size: i32, /// How much the transaction count can vary #[serde(default)] diff --git a/code/crates/starknet/host/src/mempool.rs b/code/crates/starknet/host/src/mempool.rs index 1c6da52bd..c6941f75e 100644 --- a/code/crates/starknet/host/src/mempool.rs +++ b/code/crates/starknet/host/src/mempool.rs @@ -205,7 +205,8 @@ impl Actor for Mempool { reply, num_txes, .. } => { debug!("reached reap endpoint"); - let txes = generate_and_broadcast_txes( + debug!("current state of mempool: {:?}", state.transactions); + let txes = reap_and_broadcast_txes( num_txes, self.test_tx_size.as_u64() as usize, self.gossip_batch_size, @@ -243,7 +244,7 @@ impl Actor for Mempool { } } -fn generate_and_broadcast_txes( +fn reap_and_broadcast_txes( count: usize, size: usize, gossip_batch_size: usize, @@ -272,7 +273,10 @@ fn generate_and_broadcast_txes( let mempool_batch = MempoolTransactionBatch::new(tx_batch); mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; } - debug!("reaped transactions after batch sent: {:?}", transactions.len()); + debug!( + "reaped transactions after batch sent: {:?}", + transactions.len() + ); // for _ in initial_count..count { // // Generate transaction // let mut tx_bytes = vec![0; size]; diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index b0fad6ac5..29457e5dc 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -1,10 +1,10 @@ -use std::thread::sleep; use std::time::Duration; use async_trait::async_trait; use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; +use rand::rngs::SmallRng; use rand::seq::IteratorRandom; -use rand::{Rng, RngCore}; +use rand::{Rng, RngCore, SeedableRng}; use tracing::debug; use malachitebft_config::MempoolLoadType; @@ -64,7 +64,7 @@ impl MempoolLoad { pub fn generate_transactions(count: usize, size: usize) -> Vec { let mut transactions: Vec = Vec::with_capacity(count); - let mut rng = rand::thread_rng(); + let mut rng = SmallRng::from_entropy(); for _ in 0..count { let mut tx_bytes = vec![0; size]; @@ -104,38 +104,38 @@ impl Actor for MempoolLoad { )), MempoolLoadType::NoLoad => tokio::spawn(async {}), MempoolLoadType::NonUniformLoad(params) => tokio::spawn(async move { - // loop { - let mut rng = rand::thread_rng(); - // Determine if this iteration should generate a spike - let is_spike = rng.gen_bool(params.spike_probability()); - - // Vary transaction count and size - let count_variation = rng.gen_range(params.count_variation()); - let size_variation = rng.gen_range(params.size_variation()); - - let count = if is_spike { - (params.base_count() + count_variation as usize) * params.spike_multiplier() - } else { - params.base_count() + count_variation as usize - }; - - let size = params.base_size() + size_variation as usize; - - // Create and send the message - let msg = Msg::GenerateTransactions { - count: count.max(1), // Ensure count is at least 1 - size: size.max(1), // Ensure size is at least 1 - }; - - if let Err(er) = myself.cast(msg) { - tracing::error!(?er, ?myself, "Failed to send tick message"); - // break; + loop { + let mut rng = SmallRng::from_entropy(); + // Determine if this iteration should generate a spike + let is_spike = rng.gen_bool(params.spike_probability()); + + // Vary transaction count and size + let count_variation = rng.gen_range(params.count_variation()); + let size_variation = rng.gen_range(params.size_variation()); + + let count = if is_spike { + (params.base_count() - count_variation) as usize * params.spike_multiplier() + } else { + (params.base_count() + count_variation) as usize + }; + let size = (params.base_size() + size_variation) as usize; + + // Create and send the message + let msg = Msg::GenerateTransactions { + count: count.max(1), // Ensure count is at least 1 + size: size.max(1), // Ensure size is at least 1 + }; + + if let Err(er) = myself.cast(msg) { + tracing::error!(?er, ?myself, "Failed to send tick message"); + break; + } + // Random sleep between 100ms and 1s + let sleep_duration = + Duration::from_millis(params.sleep_interval().choose(&mut rng).unwrap()); + debug!("sleeping thread for duration {:?}", sleep_duration); + tokio::time::sleep(sleep_duration).await; } - // Random sleep between 100ms and 1s - let sleep_duration = - Duration::from_millis(params.sleep_interval().choose(&mut rng).unwrap()); - sleep(sleep_duration); - // } }), }; Ok(State { ticker }) From 98e54c242f89b6cef214bfc0263d4b15e9e89892 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Tue, 4 Feb 2025 13:14:36 +0100 Subject: [PATCH 14/21] cleanup --- .../crates/starknet/host/src/host/proposal.rs | 3 +- code/crates/starknet/host/src/mempool.rs | 46 +------------------ .../starknet/host/src/mempool/network.rs | 3 -- code/crates/starknet/host/src/mempool_load.rs | 22 ++------- code/crates/starknet/host/src/spawn.rs | 7 +-- code/crates/starknet/host/src/utils/ticker.rs | 3 -- 6 files changed, 10 insertions(+), 74 deletions(-) diff --git a/code/crates/starknet/host/src/host/proposal.rs b/code/crates/starknet/host/src/host/proposal.rs index f273c6503..6bf6190bb 100644 --- a/code/crates/starknet/host/src/host/proposal.rs +++ b/code/crates/starknet/host/src/host/proposal.rs @@ -10,7 +10,7 @@ use rand::{RngCore, SeedableRng}; use sha3::Digest; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; -use tracing::{debug, error, trace}; +use tracing::{error, trace}; use malachitebft_core_types::Round; @@ -82,7 +82,6 @@ async fn run_build_proposal_task( }; loop { - debug!("reaping transactions"); trace!(%height, %round, %sequence, "Building local value"); let reaped_txes = mempool diff --git a/code/crates/starknet/host/src/mempool.rs b/code/crates/starknet/host/src/mempool.rs index c6941f75e..e30b84c04 100644 --- a/code/crates/starknet/host/src/mempool.rs +++ b/code/crates/starknet/host/src/mempool.rs @@ -2,7 +2,6 @@ use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; -use bytesize::ByteSize; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; use tracing::{debug, info, trace}; @@ -22,7 +21,6 @@ pub struct Mempool { network: MempoolNetworkRef, gossip_batch_size: usize, max_tx_count: usize, - test_tx_size: ByteSize, span: tracing::Span, } @@ -77,14 +75,12 @@ impl Mempool { mempool_network: MempoolNetworkRef, gossip_batch_size: usize, max_tx_count: usize, - test_tx_size: ByteSize, span: tracing::Span, ) -> Self { Self { network: mempool_network, gossip_batch_size, max_tx_count, - test_tx_size, span, } } @@ -93,16 +89,9 @@ impl Mempool { mempool_network: MempoolNetworkRef, gossip_batch_size: usize, max_tx_count: usize, - test_tx_size: ByteSize, span: tracing::Span, ) -> Result { - let node = Self::new( - mempool_network, - gossip_batch_size, - max_tx_count, - test_tx_size, - span, - ); + let node = Self::new(mempool_network, gossip_batch_size, max_tx_count, span); let (actor_ref, _) = Actor::spawn(None, node, ()).await?; Ok(actor_ref) @@ -204,16 +193,12 @@ impl Actor for Mempool { Msg::Reap { reply, num_txes, .. } => { - debug!("reached reap endpoint"); - debug!("current state of mempool: {:?}", state.transactions); let txes = reap_and_broadcast_txes( num_txes, - self.test_tx_size.as_u64() as usize, self.gossip_batch_size, state, &self.network, )?; - debug!("reaped transactions: {:?}", txes); reply.send(txes)?; } @@ -225,7 +210,6 @@ impl Actor for Mempool { // observe issues strictly related to consensus. // It also bumps performance, as we reduce the mempool's background traffic. // state.transactions.clear(); - debug!("tx hashes to remove: {:?}", tx_hashes); tx_hashes.iter().for_each(|hash| state.remove_tx(hash)); } } @@ -246,12 +230,11 @@ impl Actor for Mempool { fn reap_and_broadcast_txes( count: usize, - size: usize, gossip_batch_size: usize, state: &mut State, mempool_network: &MempoolNetworkRef, ) -> Result, ActorProcessingErr> { - debug!(%count, %size, "Generating transactions"); + debug!(%count, "Reaping transactions"); let batch_size = std::cmp::min(gossip_batch_size, count); let gossip_enabled = gossip_batch_size > 0; @@ -262,38 +245,13 @@ fn reap_and_broadcast_txes( .take(count) .collect::>(); - // let initial_count = transactions.len(); - debug!("reaped transactions: {:?}", transactions.len()); - let mut tx_batch = Transactions::new(transactions.clone()); - // let mut rng = rand::thread_rng(); if gossip_enabled && tx_batch.len() >= batch_size { let tx_batch = std::mem::take(&mut tx_batch).to_any().unwrap(); let mempool_batch = MempoolTransactionBatch::new(tx_batch); mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; } - debug!( - "reaped transactions after batch sent: {:?}", - transactions.len() - ); - // for _ in initial_count..count { - // // Generate transaction - // let mut tx_bytes = vec![0; size]; - // rng.fill_bytes(&mut tx_bytes); - // let tx = Transaction::new(tx_bytes); - - // if gossip_enabled { - // tx_batch.push(tx.clone()); - // } - - // transactions.push(tx); - - // // Gossip tx-es to peers in batches - // if gossip_enabled && tx_batch.len() >= batch_size { - - // } - // } Ok(transactions) } diff --git a/code/crates/starknet/host/src/mempool/network.rs b/code/crates/starknet/host/src/mempool/network.rs index 8408abc27..467cd220a 100644 --- a/code/crates/starknet/host/src/mempool/network.rs +++ b/code/crates/starknet/host/src/mempool/network.rs @@ -9,7 +9,6 @@ use ractor::ActorRef; use ractor::OutputPort; use ractor::{Actor, RpcReplyPort}; use tokio::task::JoinHandle; -use tracing::debug; use tracing::error; use malachitebft_metrics::SharedRegistry; @@ -135,8 +134,6 @@ impl Actor for MempoolNetwork { Msg::Subscribe(subscriber) => subscriber.subscribe_to_port(output_port), Msg::BroadcastMsg(batch) => { - debug!("entered broadcast batch"); - match NetworkMsg::TransactionBatch(batch).to_network_bytes() { Ok(bytes) => { ctrl_handle.broadcast(Mempool, bytes).await?; diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index 29457e5dc..3e874ea38 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -5,7 +5,7 @@ use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef}; use rand::rngs::SmallRng; use rand::seq::IteratorRandom; use rand::{Rng, RngCore, SeedableRng}; -use tracing::debug; +use tracing::info; use malachitebft_config::MempoolLoadType; use malachitebft_starknet_p2p_types::{Transaction, Transactions}; @@ -55,8 +55,6 @@ impl MempoolLoad { network: MempoolNetworkRef, span: tracing::Span, ) -> Result { - debug!("spawning actor mempool_load"); - let actor = Self::new(params, network, span); let (actor_ref, _) = Actor::spawn(None, actor, ()).await?; Ok(actor_ref) @@ -70,12 +68,8 @@ impl MempoolLoad { let mut tx_bytes = vec![0; size]; rng.fill_bytes(&mut tx_bytes); let tx = Transaction::new(tx_bytes); - // debug!("transaction {:?}", tx.clone()); - transactions.push(tx); } - debug!("MEMPOOL LOAD TX GENERATED {:?}", transactions.clone().len()); - transactions } } @@ -91,8 +85,6 @@ impl Actor for MempoolLoad { myself: MempoolLoadRef, _args: (), ) -> Result { - debug!("starting ticker"); - let ticker = match self.params.load_type.clone() { MempoolLoadType::UniformLoad(uniform_load_config) => tokio::spawn(ticker( uniform_load_config.interval(), @@ -130,10 +122,10 @@ impl Actor for MempoolLoad { tracing::error!(?er, ?myself, "Failed to send tick message"); break; } - // Random sleep between 100ms and 1s + // Random sleep let sleep_duration = Duration::from_millis(params.sleep_interval().choose(&mut rng).unwrap()); - debug!("sleeping thread for duration {:?}", sleep_duration); + tokio::time::sleep(sleep_duration).await; } }), @@ -146,6 +138,7 @@ impl Actor for MempoolLoad { _myself: ActorRef, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { + info!("Stopping..."); state.ticker.abort(); Ok(()) } @@ -159,18 +152,13 @@ impl Actor for MempoolLoad { ) -> Result<(), ActorProcessingErr> { match msg { Msg::GenerateTransactions { count, size } => { - debug!("entered message handler GenerateTransactions"); - let transactions = Self::generate_transactions(count, size); - debug!("broadcasting transactions {:?}", transactions.len()); - let tx_batch = Transactions::new(transactions).to_any().unwrap(); - debug!("broadcasting batch {:?}", tx_batch.clone().value.len()); let mempool_batch: MempoolTransactionBatch = MempoolTransactionBatch::new(tx_batch); - self.network .cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; + Ok(()) } } diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index 48ec26ba8..0f25f2225 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -7,7 +7,7 @@ use malachitebft_engine::wal::{Wal, WalRef}; use tokio::task::JoinHandle; use malachitebft_config::{ - self as config, Config as NodeConfig, MempoolConfig, MempoolLoadConfig, SyncConfig, TestConfig, + self as config, Config as NodeConfig, MempoolConfig, MempoolLoadConfig, SyncConfig, TransportProtocol, }; use malachitebft_core_consensus::ValuePayload; @@ -50,8 +50,7 @@ pub async fn spawn_node_actor( // Spawn mempool and its gossip layer let mempool_network = spawn_mempool_network_actor(&cfg, &private_key, ®istry, &span).await; - let mempool = - spawn_mempool_actor(mempool_network.clone(), &cfg.mempool, &cfg.test, &span).await; + let mempool = spawn_mempool_actor(mempool_network.clone(), &cfg.mempool, &span).await; let mempool_load = spawn_mempool_load_actor(&cfg.mempool_load, mempool_network.clone(), &span).await; @@ -282,14 +281,12 @@ fn make_keypair(private_key: &PrivateKey) -> Keypair { async fn spawn_mempool_actor( mempool_network: MempoolNetworkRef, mempool_config: &MempoolConfig, - test_config: &TestConfig, span: &tracing::Span, ) -> MempoolRef { Mempool::spawn( mempool_network, mempool_config.gossip_batch_size, mempool_config.max_tx_count, - test_config.tx_size, span.clone(), ) .await diff --git a/code/crates/starknet/host/src/utils/ticker.rs b/code/crates/starknet/host/src/utils/ticker.rs index d6e4b7643..75463d2cf 100644 --- a/code/crates/starknet/host/src/utils/ticker.rs +++ b/code/crates/starknet/host/src/utils/ticker.rs @@ -1,16 +1,13 @@ use ractor::message::Message; use ractor::ActorRef; use std::time::Duration; -use tracing::debug; pub async fn ticker(interval: Duration, target: ActorRef, msg: impl Fn() -> Msg) where Msg: Message, { loop { - debug!("slept for interval {:?}", interval); tokio::time::sleep(interval).await; - debug!("sending message generatetransactions"); if let Err(er) = target.cast(msg()) { tracing::error!(?er, ?target, "Failed to send tick message"); From cdedc723f43b222a2d68217574fb1a20ccaae470 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Tue, 4 Feb 2025 13:48:48 +0100 Subject: [PATCH 15/21] addressing comments regarding config --- code/crates/config/src/lib.rs | 127 ++---------------- code/crates/starknet/host/src/mempool_load.rs | 23 ++-- 2 files changed, 25 insertions(+), 125 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 233a61e70..c04082c47 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -352,33 +352,32 @@ pub enum MempoolLoadType { impl Default for MempoolLoadType { fn default() -> Self { - Self::NonUniformLoad(NonUniformLoadConfig::default()) + Self::UniformLoad(UniformLoadConfig::default()) } } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(from = "nonuniformload::RawConfig", default)] +#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)] pub struct NonUniformLoadConfig { /// Base transaction count - base_count: i32, + pub base_count: i32, /// Base transaction size - base_size: i32, + pub base_size: i32, /// How much the transaction count can vary - count_variation: std::ops::Range, + pub count_variation: std::ops::Range, - size_variation: std::ops::Range, + pub size_variation: std::ops::Range, /// Chance of generating a spike. /// e.g. 0.1 = 10% chance of spike - spike_probability: f64, + pub spike_probability: f64, /// Multiplier for spike transactions /// e.g. 10 = 10x more transactions during spike - spike_multiplier: usize, + pub spike_multiplier: usize, /// Range of intervals between generating load, in milliseconds - sleep_interval: std::ops::Range, + pub sleep_interval: std::ops::Range, } impl Default for NonUniformLoadConfig { fn default() -> Self { @@ -406,83 +405,14 @@ impl NonUniformLoadConfig { sleep_interval, } } - pub fn base_count(&self) -> i32 { - self.base_count - } - pub fn base_size(&self) -> i32 { - self.base_size - } - pub fn count_variation(&self) -> std::ops::Range { - self.count_variation.clone() - } - pub fn size_variation(&self) -> std::ops::Range { - self.size_variation.clone() - } - pub fn spike_probability(&self) -> f64 { - self.spike_probability - } - pub fn spike_multiplier(&self) -> usize { - self.spike_multiplier - } - pub fn sleep_interval(&self) -> std::ops::Range { - self.sleep_interval.clone() - } } -mod nonuniformload { - #[derive(serde::Deserialize)] - pub struct RawConfig { - /// Base transaction count - #[serde(default)] - base_count: i32, - - /// Base transaction size - #[serde(default)] - base_size: i32, - - /// How much the transaction count can vary - #[serde(default)] - count_variation: std::ops::Range, - - #[serde(default)] - size_variation: std::ops::Range, - - /// Chance of generating a spike. - /// e.g. 0.1 = 10% chance of spike - #[serde(default)] - spike_probability: f64, - - /// Multiplier for spike transactions - /// e.g. 10 = 10x more transactions during spike - #[serde(default)] - spike_multiplier: usize, - - /// Range of intervals between generating load, in milliseconds - #[serde(default)] - sleep_interval: std::ops::Range, - } - - impl From for super::NonUniformLoadConfig { - fn from(raw: RawConfig) -> Self { - super::NonUniformLoadConfig::new( - raw.base_count, - raw.base_size, - raw.count_variation, - raw.size_variation, - raw.spike_probability, - raw.spike_multiplier, - raw.sleep_interval, - ) - } - } -} -#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(from = "uniformload::RawConfig", default)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, serde::Deserialize)] pub struct UniformLoadConfig { #[serde(with = "humantime_serde")] - interval: Duration, - count: usize, - size: usize, + pub interval: Duration, + pub count: usize, + pub size: usize, } impl UniformLoadConfig { fn new(interval: Duration, count: usize, size: usize) -> Self { @@ -492,17 +422,6 @@ impl UniformLoadConfig { size, } } - pub fn interval(&self) -> Duration { - self.interval - } - - pub fn count(&self) -> usize { - self.count - } - - pub fn size(&self) -> usize { - self.size - } } impl Default for UniformLoadConfig { fn default() -> Self { @@ -510,26 +429,6 @@ impl Default for UniformLoadConfig { } } -mod uniformload { - use std::time::Duration; - - #[derive(serde::Deserialize)] - pub struct RawConfig { - #[serde(default)] - #[serde(with = "humantime_serde")] - interval: Duration, - #[serde(default)] - count: usize, - #[serde(default)] - size: usize, - } - - impl From for super::UniformLoadConfig { - fn from(raw: RawConfig) -> Self { - super::UniformLoadConfig::new(raw.interval, raw.count, raw.size) - } - } -} /// Mempool configuration options #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct MempoolLoadConfig { diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index 3e874ea38..7bc47d3e7 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -87,11 +87,11 @@ impl Actor for MempoolLoad { ) -> Result { let ticker = match self.params.load_type.clone() { MempoolLoadType::UniformLoad(uniform_load_config) => tokio::spawn(ticker( - uniform_load_config.interval(), + uniform_load_config.interval, myself.clone(), move || Msg::GenerateTransactions { - count: uniform_load_config.count(), - size: uniform_load_config.size(), + count: uniform_load_config.count, + size: uniform_load_config.size, }, )), MempoolLoadType::NoLoad => tokio::spawn(async {}), @@ -99,18 +99,18 @@ impl Actor for MempoolLoad { loop { let mut rng = SmallRng::from_entropy(); // Determine if this iteration should generate a spike - let is_spike = rng.gen_bool(params.spike_probability()); + let is_spike = rng.gen_bool(params.spike_probability); // Vary transaction count and size - let count_variation = rng.gen_range(params.count_variation()); - let size_variation = rng.gen_range(params.size_variation()); + let count_variation = rng.gen_range(params.count_variation.clone()); + let size_variation = rng.gen_range(params.size_variation.clone()); let count = if is_spike { - (params.base_count() - count_variation) as usize * params.spike_multiplier() + (params.base_count - count_variation) as usize * params.spike_multiplier } else { - (params.base_count() + count_variation) as usize + (params.base_count + count_variation) as usize }; - let size = (params.base_size() + size_variation) as usize; + let size = (params.base_size + size_variation) as usize; // Create and send the message let msg = Msg::GenerateTransactions { @@ -123,8 +123,9 @@ impl Actor for MempoolLoad { break; } // Random sleep - let sleep_duration = - Duration::from_millis(params.sleep_interval().choose(&mut rng).unwrap()); + let sleep_duration = Duration::from_millis( + params.sleep_interval.clone().choose(&mut rng).unwrap(), + ); tokio::time::sleep(sleep_duration).await; } From 4079660576c96433e2b80edc4ef657205299cf4f Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Tue, 4 Feb 2025 15:12:00 +0100 Subject: [PATCH 16/21] refactored tickers --- code/crates/starknet/host/src/lib.rs | 1 - code/crates/starknet/host/src/mempool_load.rs | 113 ++++++++++-------- code/crates/starknet/host/src/utils/mod.rs | 1 - code/crates/starknet/host/src/utils/ticker.rs | 17 --- 4 files changed, 65 insertions(+), 67 deletions(-) delete mode 100644 code/crates/starknet/host/src/utils/mod.rs delete mode 100644 code/crates/starknet/host/src/utils/ticker.rs diff --git a/code/crates/starknet/host/src/lib.rs b/code/crates/starknet/host/src/lib.rs index 6e594728a..41219be84 100644 --- a/code/crates/starknet/host/src/lib.rs +++ b/code/crates/starknet/host/src/lib.rs @@ -7,7 +7,6 @@ pub mod mempool_load; pub mod node; pub mod spawn; pub mod streaming; -pub mod utils; pub use malachitebft_app::part_store; pub mod proto { pub use malachitebft_proto::*; diff --git a/code/crates/starknet/host/src/mempool_load.rs b/code/crates/starknet/host/src/mempool_load.rs index 7bc47d3e7..cd058d167 100644 --- a/code/crates/starknet/host/src/mempool_load.rs +++ b/code/crates/starknet/host/src/mempool_load.rs @@ -7,16 +7,13 @@ use rand::seq::IteratorRandom; use rand::{Rng, RngCore, SeedableRng}; use tracing::info; -use malachitebft_config::MempoolLoadType; +use malachitebft_config::{MempoolLoadType, NonUniformLoadConfig, UniformLoadConfig}; use malachitebft_starknet_p2p_types::{Transaction, Transactions}; use malachitebft_test_mempool::types::MempoolTransactionBatch; use crate::proto::Protobuf; -use crate::{ - mempool::network::{MempoolNetworkMsg, MempoolNetworkRef}, - utils::ticker::ticker, -}; +use crate::mempool::network::{MempoolNetworkMsg, MempoolNetworkRef}; pub type MempoolLoadMsg = Msg; pub type MempoolLoadRef = ActorRef; @@ -72,6 +69,63 @@ impl MempoolLoad { } transactions } + + fn generate_non_uniform_load_params(params: &NonUniformLoadConfig) -> (usize, usize, Duration) { + let mut rng = SmallRng::from_entropy(); + + // Determine if this iteration should generate a spike + let is_spike = rng.gen_bool(params.spike_probability); + + // Vary transaction count and size + let count_variation = rng.gen_range(params.count_variation.clone()); + let size_variation = rng.gen_range(params.size_variation.clone()); + + let count = if is_spike { + (params.base_count + count_variation) as usize * params.spike_multiplier + } else { + (params.base_count + count_variation) as usize + }; + let size = (params.base_size + size_variation) as usize; + + // Get sleep duration + let sleep_duration = + Duration::from_millis(params.sleep_interval.clone().choose(&mut rng).unwrap()); + + (count.max(1), size.max(1), sleep_duration) + } + + async fn run_uniform_load(params: UniformLoadConfig, myself: MempoolLoadRef) { + loop { + // Create and send the message + let msg = Msg::GenerateTransactions { + count: params.count, + size: params.size, + }; + + if let Err(er) = myself.cast(msg) { + tracing::error!(?er, ?myself, "Channel closed, stopping load generator"); + break; + } + + tokio::time::sleep(params.interval).await; + } + } + + async fn run_non_uniform_load(params: NonUniformLoadConfig, myself: MempoolLoadRef) { + loop { + let (count, size, sleep_duration) = Self::generate_non_uniform_load_params(¶ms); + + // Create and send the message + let msg = Msg::GenerateTransactions { count, size }; + + if let Err(er) = myself.cast(msg) { + tracing::error!(?er, ?myself, "Channel closed, stopping load generator"); + break; + } + + tokio::time::sleep(sleep_duration).await; + } + } } #[async_trait] @@ -86,50 +140,13 @@ impl Actor for MempoolLoad { _args: (), ) -> Result { let ticker = match self.params.load_type.clone() { - MempoolLoadType::UniformLoad(uniform_load_config) => tokio::spawn(ticker( - uniform_load_config.interval, - myself.clone(), - move || Msg::GenerateTransactions { - count: uniform_load_config.count, - size: uniform_load_config.size, - }, - )), + MempoolLoadType::UniformLoad(uniform_load_config) => { + tokio::spawn(Self::run_uniform_load(uniform_load_config, myself.clone())) + } MempoolLoadType::NoLoad => tokio::spawn(async {}), - MempoolLoadType::NonUniformLoad(params) => tokio::spawn(async move { - loop { - let mut rng = SmallRng::from_entropy(); - // Determine if this iteration should generate a spike - let is_spike = rng.gen_bool(params.spike_probability); - - // Vary transaction count and size - let count_variation = rng.gen_range(params.count_variation.clone()); - let size_variation = rng.gen_range(params.size_variation.clone()); - - let count = if is_spike { - (params.base_count - count_variation) as usize * params.spike_multiplier - } else { - (params.base_count + count_variation) as usize - }; - let size = (params.base_size + size_variation) as usize; - - // Create and send the message - let msg = Msg::GenerateTransactions { - count: count.max(1), // Ensure count is at least 1 - size: size.max(1), // Ensure size is at least 1 - }; - - if let Err(er) = myself.cast(msg) { - tracing::error!(?er, ?myself, "Failed to send tick message"); - break; - } - // Random sleep - let sleep_duration = Duration::from_millis( - params.sleep_interval.clone().choose(&mut rng).unwrap(), - ); - - tokio::time::sleep(sleep_duration).await; - } - }), + MempoolLoadType::NonUniformLoad(non_uniform_load_config) => tokio::spawn( + Self::run_non_uniform_load(non_uniform_load_config, myself.clone()), + ), }; Ok(State { ticker }) } diff --git a/code/crates/starknet/host/src/utils/mod.rs b/code/crates/starknet/host/src/utils/mod.rs deleted file mode 100644 index 2e6961ef8..000000000 --- a/code/crates/starknet/host/src/utils/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod ticker; diff --git a/code/crates/starknet/host/src/utils/ticker.rs b/code/crates/starknet/host/src/utils/ticker.rs deleted file mode 100644 index 75463d2cf..000000000 --- a/code/crates/starknet/host/src/utils/ticker.rs +++ /dev/null @@ -1,17 +0,0 @@ -use ractor::message::Message; -use ractor::ActorRef; -use std::time::Duration; - -pub async fn ticker(interval: Duration, target: ActorRef, msg: impl Fn() -> Msg) -where - Msg: Message, -{ - loop { - tokio::time::sleep(interval).await; - - if let Err(er) = target.cast(msg()) { - tracing::error!(?er, ?target, "Failed to send tick message"); - break; - } - } -} From 1480eee65c4f469b5649d2fc3e52a9b69889d937 Mon Sep 17 00:00:00 2001 From: OakenKnight Date: Tue, 4 Feb 2025 15:19:14 +0100 Subject: [PATCH 17/21] shortened default interval for UniformLoadConfig --- code/crates/config/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index c04082c47..22c0247da 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -425,7 +425,7 @@ impl UniformLoadConfig { } impl Default for UniformLoadConfig { fn default() -> Self { - Self::new(Duration::from_secs(10), 10000, 256) + Self::new(Duration::from_secs(3), 10000, 256) } } From d6a9b568975319db742427b3969ec403307ba878 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 12 Feb 2025 10:09:10 +0100 Subject: [PATCH 18/21] No load by default --- code/crates/config/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 3f30a4d88..c966e5aec 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -349,7 +349,7 @@ pub enum MempoolLoadType { impl Default for MempoolLoadType { fn default() -> Self { - Self::UniformLoad(mempool_load::UniformLoadConfig::default()) + Self::NoLoad } } From f83f37e9f15b1f463e3692b078615a916dad06f4 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 12 Feb 2025 10:18:47 +0100 Subject: [PATCH 19/21] Cleanup --- code/crates/starknet/host/src/mempool.rs | 129 ++++++++++++----------- 1 file changed, 66 insertions(+), 63 deletions(-) diff --git a/code/crates/starknet/host/src/mempool.rs b/code/crates/starknet/host/src/mempool.rs index a45df60cb..47864c155 100644 --- a/code/crates/starknet/host/src/mempool.rs +++ b/code/crates/starknet/host/src/mempool.rs @@ -2,8 +2,9 @@ use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; +use itertools::Itertools; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; -use tracing::{debug, info, trace}; +use tracing::{debug, error, info, trace}; use malachitebft_test_mempool::types::MempoolTransactionBatch; use malachitebft_test_mempool::{Event as NetworkEvent, NetworkMsg, PeerId}; @@ -43,30 +44,22 @@ impl From> for Msg { } } -#[allow(dead_code)] +#[derive(Default)] pub struct State { - pub transactions: BTreeMap, + transactions: BTreeMap, } impl State { pub fn new() -> Self { - Self { - transactions: BTreeMap::new(), - } + Self::default() } - pub fn add_tx(&mut self, tx: &Transaction) { - self.transactions.entry(tx.hash()).or_insert(tx.clone()); + pub fn add_tx(&mut self, tx: Transaction) { + self.transactions.entry(tx.hash()).or_insert(tx); } pub fn remove_tx(&mut self, hash: &Hash) { - self.transactions.remove_entry(hash); - } -} - -impl Default for State { - fn default() -> Self { - Self::new() + self.transactions.remove(hash); } } @@ -92,7 +85,6 @@ impl Mempool { span: tracing::Span, ) -> Result { let node = Self::new(mempool_network, gossip_batch_size, max_tx_count, span); - let (actor_ref, _) = Actor::spawn(None, node, ()).await?; Ok(actor_ref) } @@ -116,9 +108,7 @@ impl Mempool { NetworkEvent::Message(_channel, from, _msg_id, msg) => { trace!(%from, size = msg.size_bytes(), "Received message"); - trace!(%from, "Received message"); - self.handle_network_msg(from, msg.clone(), myself, state) // FIXME: Clone - .await?; + self.handle_network_msg(from, msg, myself, state).await?; } } @@ -128,15 +118,18 @@ impl Mempool { pub async fn handle_network_msg( &self, from: &PeerId, - msg: NetworkMsg, + msg: &NetworkMsg, myself: MempoolRef, _state: &mut State, ) -> Result<(), ractor::ActorProcessingErr> { match msg { NetworkMsg::TransactionBatch(batch) => { - let Ok(batch) = TransactionBatch::from_any(&batch.transaction_batch) else { - // TODO: Log error - return Ok(()); + let batch = match TransactionBatch::from_any(&batch.transaction_batch) { + Ok(batch) => batch, + Err(e) => { + error!("Failed to decode transaction batch: {e}"); + return Ok(()); + } }; trace!(%from, "Received batch with {} transactions", batch.len()); @@ -149,29 +142,8 @@ impl Mempool { Ok(()) } -} - -#[async_trait] -impl Actor for Mempool { - type Msg = Msg; - type State = State; - type Arguments = (); - - async fn pre_start( - &self, - myself: MempoolRef, - _args: (), - ) -> Result { - self.network.link(myself.get_cell()); - - self.network - .cast(MempoolNetworkMsg::Subscribe(Box::new(myself.clone())))?; - - Ok(State::new()) - } - #[tracing::instrument("host.mempool", parent = &self.span, skip_all)] - async fn handle( + async fn handle_msg( &self, myself: MempoolRef, msg: Msg, @@ -184,7 +156,7 @@ impl Actor for Mempool { Msg::Input(tx) => { if state.transactions.len() < self.max_tx_count { - state.add_tx(&tx); + state.add_tx(tx); } else { trace!("Mempool is full, dropping transaction"); } @@ -204,23 +176,52 @@ impl Actor for Mempool { } Msg::Update { tx_hashes } => { - // Clear all transactions from the mempool, given that we consume - // the full mempool when proposing a block. - // - // This reduces the mempool protocol overhead and allow us to - // observe issues strictly related to consensus. - // It also bumps performance, as we reduce the mempool's background traffic. - // state.transactions.clear(); tx_hashes.iter().for_each(|hash| state.remove_tx(hash)); } } Ok(()) } +} +#[async_trait] +impl Actor for Mempool { + type Msg = Msg; + type State = State; + type Arguments = (); + + #[tracing::instrument("host.mempool", parent = &self.span, skip_all)] + async fn pre_start( + &self, + myself: MempoolRef, + _args: (), + ) -> Result { + self.network.link(myself.get_cell()); + + self.network + .cast(MempoolNetworkMsg::Subscribe(Box::new(myself.clone())))?; + + Ok(State::new()) + } + + #[tracing::instrument("host.mempool", parent = &self.span, skip_all)] + async fn handle( + &self, + myself: MempoolRef, + msg: MempoolMsg, + state: &mut State, + ) -> Result<(), ractor::ActorProcessingErr> { + if let Err(e) = self.handle_msg(myself, msg, state).await { + error!("Error processing message: {e:?}"); + } + + Ok(()) + } + + #[tracing::instrument("host.mempool", parent = &self.span, skip_all)] async fn post_stop( &self, - _myself: ActorRef, + _myself: MempoolRef, _state: &mut State, ) -> Result<(), ActorProcessingErr> { info!("Stopping..."); @@ -237,22 +238,24 @@ fn reap_and_broadcast_txes( ) -> Result, ActorProcessingErr> { debug!(%count, "Reaping transactions"); - let batch_size = std::cmp::min(gossip_batch_size, count); let gossip_enabled = gossip_batch_size > 0; - // Start with transactions already in the mempool + // Reap transactions from the mempool let transactions = std::mem::take(&mut state.transactions) .into_values() .take(count) .collect::>(); - let tx_batch = TransactionBatch::new(transactions); - - if gossip_enabled && tx_batch.len() >= batch_size { - let tx_batch = tx_batch.to_any().unwrap(); - let mempool_batch = MempoolTransactionBatch::new(tx_batch); - mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; + // If mempool gossip is enabled, broadcast the transactions to the network + if gossip_enabled { + // Chunk the transactions in batch of max `gossip_batch_size` + for batch in &transactions.iter().chunks(gossip_batch_size) { + let tx_batch = TransactionBatch::new(batch.cloned().collect()); + let tx_batch = tx_batch.to_any().unwrap(); + let mempool_batch = MempoolTransactionBatch::new(tx_batch); + mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?; + } } - Ok(tx_batch.into_vec()) + Ok(transactions) } From 084261fefe616125b2974cc37fbf34b5997d8f09 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 12 Feb 2025 10:30:16 +0100 Subject: [PATCH 20/21] Fix config parsing --- code/crates/config/src/lib.rs | 1 + code/examples/channel/config.toml | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index c966e5aec..454a75331 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -424,6 +424,7 @@ pub mod mempool_load { #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct MempoolLoadConfig { /// Mempool loading type + #[serde(flatten)] pub load_type: MempoolLoadType, } diff --git a/code/examples/channel/config.toml b/code/examples/channel/config.toml index 9f754de95..7dc9b89a6 100644 --- a/code/examples/channel/config.toml +++ b/code/examples/channel/config.toml @@ -177,7 +177,7 @@ type = "gossipsub" ### Mempool Load Configuration Options ### ####################################################### -[mempool_load.load_type] +[mempool.load] # Sets the type of generation for mempool transactions # Valid options are "no_load", "uniform_load", "non_uniform_load" load_type = "uniform_load" @@ -199,16 +199,16 @@ count = 1000 base_count = 1000 # Only available when "non_uniform_load" -# Base size in bytes +# Base size in bytes base_size = 256 # Only available when "non_uniform_load" # Variation ranges: Transactions will vary by ±500 from base_count -count_variation = { start = -500, end = 500 } +count_variation = { start = -500, end = 500 } # Only available when "non_uniform_load" # Variation ranges: Size will vary by -64 to +128 bytes from base_size -size_variation = { start = -64, end = 128 } +size_variation = { start = -64, end = 128 } # Only available when "non_uniform_load" # Spike configuration: 10% chance of a spike occurring From 59994d377ebbf6ec02ecd781003c0ced09d3636d Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 12 Feb 2025 10:35:54 +0100 Subject: [PATCH 21/21] Less aggressive parameters --- code/crates/config/src/lib.rs | 8 ++++---- code/crates/starknet/test/src/lib.rs | 4 +--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 454a75331..3e2175dbd 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -385,13 +385,13 @@ pub mod mempool_load { impl Default for NonUniformLoadConfig { fn default() -> Self { Self { - base_count: 1000, + base_count: 100, base_size: 256, - count_variation: -500..500, + count_variation: -100..200, size_variation: -64..128, spike_probability: 0.10, spike_multiplier: 2, - sleep_interval: 100..1000, + sleep_interval: 1000..5000, } } } @@ -413,7 +413,7 @@ pub mod mempool_load { fn default() -> Self { Self { interval: Duration::from_secs(3), - count: 10000, + count: 100, size: 256, } } diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index a4f42a7d0..7d3ae0132 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -746,9 +746,7 @@ pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { max_tx_count: 10000, gossip_batch_size: 100, load: MempoolLoadConfig { - load_type: MempoolLoadType::NonUniformLoad( - mempool_load::NonUniformLoadConfig::default(), - ), + load_type: MempoolLoadType::UniformLoad(mempool_load::UniformLoadConfig::default()), }, }, sync: SyncConfig {