Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(code/starknet): Add mempool load generator to the Starknet test app #821

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct Config {
/// Mempool configuration options
pub mempool: MempoolConfig,

/// Mempool load configuration options
pub mempool_load: MempoolLoadConfig,

/// Sync configuration options
pub sync: SyncConfig,

Expand Down Expand Up @@ -339,6 +342,25 @@ mod gossipsub {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MempoolLoadType {
UniformLoad {
count: usize,
size: usize,
},
#[default]
NoLoad,
NonUniformLoad,
}

/// Mempool configuration options
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct MempoolLoadConfig {
/// Mempool loading type
pub load_type: MempoolLoadType,
}

/// Mempool configuration options
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct MempoolConfig {
Expand Down
9 changes: 8 additions & 1 deletion code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ use crate::host::proposal::compute_proposal_signature;
use crate::host::state::HostState;
use crate::host::{Host as _, StarknetHost};
use crate::mempool::{MempoolMsg, MempoolRef};
use crate::mempool_load::MempoolLoadRef;
use crate::proto::Protobuf;
use crate::types::*;

pub struct Host {
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: tracing::Span,
Expand All @@ -41,6 +43,7 @@ impl Host {
home_dir: PathBuf,
host: StarknetHost,
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: tracing::Span,
Expand All @@ -51,7 +54,7 @@ impl Host {

let (actor_ref, _) = Actor::spawn(
None,
Self::new(mempool, network, metrics, span),
Self::new(mempool, mempool_load, network, metrics, span),
HostState::new(host, db_path, &mut StdRng::from_entropy()),
)
.await?;
Expand All @@ -61,12 +64,14 @@ impl Host {

pub fn new(
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: tracing::Span,
) -> Self {
Self {
mempool,
mempool_load,
network,
metrics,
span,
Expand All @@ -86,6 +91,7 @@ impl Actor for Host {
initial_state: Self::State,
) -> Result<Self::State, ActorProcessingErr> {
self.mempool.link(myself.get_cell());
self.mempool_load.link(myself.get_cell());

Ok(initial_state)
}
Expand Down Expand Up @@ -545,6 +551,7 @@ async fn on_received_proposal_part(
Ok(())
}

//TODO
async fn on_decided(
state: &mut HostState,
consensus: &ConsensusRef<MockContext>,
Expand Down
5 changes: 5 additions & 0 deletions code/crates/starknet/host/src/host/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use malachitebft_core_types::{CommitCertificate, Extension, Round, SignedExtensi

use crate::host::Host;
use crate::mempool::MempoolRef;
use crate::mempool_load::MempoolLoadRef;
// use crate::mempool_load::MempoolLoadRef;
use crate::part_store::PartStore;
use crate::types::*;

Expand All @@ -33,6 +35,7 @@ pub struct StarknetParams {
pub struct StarknetHost {
pub params: StarknetParams,
pub mempool: MempoolRef,
pub mempool_load: MempoolLoadRef,
pub address: Address,
pub private_key: PrivateKey,
pub validator_set: ValidatorSet,
Expand All @@ -43,13 +46,15 @@ impl StarknetHost {
pub fn new(
params: StarknetParams,
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
address: Address,
private_key: PrivateKey,
validator_set: ValidatorSet,
) -> Self {
Self {
params,
mempool,
mempool_load,
address,
private_key,
validator_set,
Expand Down
4 changes: 2 additions & 2 deletions code/crates/starknet/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ pub mod block_store;
pub mod codec;
pub mod host;
pub mod mempool;
pub mod mempool_load;
pub mod node;
pub mod spawn;
pub mod streaming;

pub mod utils;
pub use malachitebft_app::part_store;

pub mod proto {
pub use malachitebft_proto::*;
pub use malachitebft_starknet_p2p_proto::*;
Expand Down
160 changes: 160 additions & 0 deletions code/crates/starknet/host/src/mempool_load.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use crate::proto::Protobuf;
use async_trait::async_trait;
use malachitebft_config::MempoolLoadType;
use malachitebft_starknet_p2p_types::{Transaction, Transactions};
use malachitebft_test_mempool::types::MempoolTransactionBatch;
use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef};
use rand::{Rng, RngCore};
use std::time::Duration;
use tracing::debug;

use crate::{
mempool::network::{MempoolNetworkMsg, MempoolNetworkRef},
utils::ticker::ticker,
};

pub type MempoolLoadMsg = Msg;
pub type MempoolLoadRef = ActorRef<Msg>;
pub enum Msg {
GenerateTransactions { count: usize, size: usize },
}

#[derive(Debug)]
pub struct State {
ticker: JoinHandle<()>,
}

#[derive(Debug)]
pub struct Params {
pub load_type: MempoolLoadType,
}

pub struct MempoolLoad {
params: Params,
network: MempoolNetworkRef,
span: tracing::Span,
}

impl Default for Params {
fn default() -> Self {
Self {
load_type: MempoolLoadType::UniformLoad {
size: 555,
count: 127,
},
}
}
}

impl MempoolLoad {
pub fn new(params: Params, network: MempoolNetworkRef, span: tracing::Span) -> Self {
Self {
params,
network,
span,
}
}

pub async fn spawn(
params: Params,
network: MempoolNetworkRef,
span: tracing::Span,
) -> Result<MempoolLoadRef, ractor::SpawnErr> {
debug!("spawning actor mempool_load");

let actor = Self::new(params, network, span);
let (actor_ref, _) = Actor::spawn(None, actor, ()).await?;
Ok(actor_ref)
}

pub fn generate_transactions(count: usize, size: usize) -> Vec<Transaction> {
let mut transactions: Vec<Transaction> = Vec::with_capacity(count);
let mut rng = rand::thread_rng();

for _ in 0..count {
let mut tx_bytes = vec![0; size];
rng.fill_bytes(&mut tx_bytes);
let tx = Transaction::new(tx_bytes);
// debug!("transaction {:?}", tx.clone());

transactions.push(tx);
}
debug!("transactions generated {:?}", transactions.clone().len());

transactions
}
}

#[async_trait]
impl Actor for MempoolLoad {
type Msg = Msg;
type State = State;
type Arguments = ();

async fn pre_start(
&self,
myself: MempoolLoadRef,
_args: (),
) -> Result<State, ActorProcessingErr> {
debug!("starting ticker");

let ticker = match self.params.load_type {
MempoolLoadType::UniformLoad { count, size } => {
debug!("entered uniform load branch");

let interval = Duration::from_secs(1);
tokio::spawn(ticker(interval, myself.clone(), move || {
Msg::GenerateTransactions { count, size }
}))
}
MempoolLoadType::NoLoad => tokio::spawn(async {}),
MempoolLoadType::NonUniformLoad => {
debug!("entered nonuniform load branch");

let mut rng = rand::thread_rng();
let interval = Duration::from_secs(rng.gen_range(1..10));
let count = rng.gen_range(500..=10000) as usize;
let size = rng.gen_range(128..=512) as usize;
tokio::spawn(ticker(interval, myself.clone(), move || {
Msg::GenerateTransactions { count, size }
}))
}
};
Ok(State { ticker })
}

async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.ticker.abort();
Ok(())
}

#[tracing::instrument("host.mempool_load", parent = &self.span, skip_all)]
async fn handle(
&self,
_myself: MempoolLoadRef,
msg: Msg,
_state: &mut State,
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::GenerateTransactions { count, size } => {
debug!("entered message handler GenerateTransactions");

let transactions = Self::generate_transactions(count, size);
debug!("broadcasting transactions {:?}", transactions.len());

let tx_batch = Transactions::new(transactions).to_any().unwrap();
debug!("broadcasting batch {:?}", tx_batch.clone().value.len());

let mempool_batch: MempoolTransactionBatch = MempoolTransactionBatch::new(tx_batch);

self.network
.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?;
Ok(())
}
}
}
}
29 changes: 28 additions & 1 deletion code/crates/starknet/host/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use malachitebft_engine::wal::{Wal, WalRef};
use tokio::task::JoinHandle;

use malachitebft_config::{
self as config, Config as NodeConfig, MempoolConfig, SyncConfig, TestConfig, TransportProtocol,
self as config, Config as NodeConfig, MempoolConfig, MempoolLoadConfig, SyncConfig, TestConfig,
TransportProtocol,
};
use malachitebft_core_consensus::ValuePayload;
use malachitebft_engine::consensus::{Consensus, ConsensusParams, ConsensusRef};
Expand All @@ -26,6 +27,7 @@ use crate::codec::ProtobufCodec;
use crate::host::{StarknetHost, StarknetParams};
use crate::mempool::network::{MempoolNetwork, MempoolNetworkRef};
use crate::mempool::{Mempool, MempoolRef};
use crate::mempool_load::{MempoolLoad, MempoolLoadRef, Params};
use crate::types::MockContext;
use crate::types::{Address, Height, PrivateKey, ValidatorSet};

Expand All @@ -50,6 +52,8 @@ pub async fn spawn_node_actor(
let mempool_network = spawn_mempool_network_actor(&cfg, &private_key, &registry, &span).await;
let mempool =
spawn_mempool_actor(mempool_network.clone(), &cfg.mempool, &cfg.test, &span).await;
let mempool_load =
spawn_mempool_load_actor(&cfg.mempool_load, mempool_network.clone(), &span).await;

// Spawn consensus gossip
let network = spawn_network_actor(&cfg, &private_key, &registry, &span).await;
Expand All @@ -62,6 +66,7 @@ pub async fn spawn_node_actor(
&private_key,
&initial_validator_set,
mempool.clone(),
mempool_load,
network.clone(),
metrics.clone(),
&span,
Expand Down Expand Up @@ -291,6 +296,25 @@ async fn spawn_mempool_actor(
.unwrap()
}

async fn spawn_mempool_load_actor(
mempool_load_config: &MempoolLoadConfig,
network: MempoolNetworkRef,
span: &tracing::Span,
) -> MempoolLoadRef {
// let params = mempool_load::Params::default();

// debug!("spawned mempool load actor with params {:?}", params);
MempoolLoad::spawn(
Params {
load_type: mempool_load_config.load_type,
},
network,
span.clone(),
)
.await
.unwrap()
}

async fn spawn_mempool_network_actor(
cfg: &NodeConfig,
private_key: &PrivateKey,
Expand Down Expand Up @@ -322,6 +346,7 @@ async fn spawn_host_actor(
private_key: &PrivateKey,
initial_validator_set: &ValidatorSet,
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: &tracing::Span,
Expand All @@ -346,6 +371,7 @@ async fn spawn_host_actor(
let mock_host = StarknetHost::new(
mock_params,
mempool.clone(),
mempool_load.clone(),
*address,
*private_key,
initial_validator_set.clone(),
Expand All @@ -355,6 +381,7 @@ async fn spawn_host_actor(
home_dir.to_owned(),
mock_host,
mempool,
mempool_load,
network,
metrics,
span.clone(),
Expand Down
1 change: 1 addition & 0 deletions code/crates/starknet/host/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod ticker;
Loading
Loading