Skip to content
This repository was archived by the owner on Jan 27, 2025. It is now read-only.

Commit 2e9b257

Browse files
0o-de-lallysimsekgokhanintuitiveminds
authored
[mempool] Specify number of workers for mempool threads. Add prometheus metrics for networking. (#1113)
* function to revoke vouch * vouch revoking apis * add test * set rotation can only expand by a certain number of nodes on every epoch. So that no more than 25% of the next validator set has unproven nodes. This is based on the amount of nodes that we know to keep consensus (case 1 and 2). * updating tests and patch implementation * patch reconfig case_2 * make sure jailed nodes are dropping * increase threshold for voting * update writeset for rescue to include recovert mode * patch build * remove recovery mode * expand epoch set by 1/6th * make setting recovery mode optional from CLI * add debug prints * debug prints * debug prints and comments. * add debug prints and comments * notes * comment the mempool config params * find where we could create backpressure on mempool * prints for debugging * WIP experimental backpressure on shared mempool consensus requests. Does not build. * change node.yaml default params for state_sync and mempool * Fix build * adding prometheus counters * patch build * State sync debug (#1117) * Release v5.1.1 (#1114) * function to revoke vouch * vouch revoking apis * add test * set rotation can only expand by a certain number of nodes on every epoch. So that no more than 25% of the next validator set has unproven nodes. This is based on the amount of nodes that we know to keep consensus (case 1 and 2). * updating tests and patch implementation * patch reconfig case_2 * make sure jailed nodes are dropping * increase threshold for voting * update writeset for rescue to include recovert mode * patch build * remove recovery mode * expand epoch set by 1/6th * make setting recovery mode optional from CLI * impove mock case 1 helper * patch onboarding reconfig * patch mock tests * refactored tests that use mock_ * build stdlib for release * update 0L default configs for mempool and state sync * bump version * changelog * Update 5_1_1.md * adds some more metrics * adds more metrics * [move] [Fast Track Proposal] Turn down the heat on Cost To Exist (#1119) * defer for 90 days cost to inactives, and reduce the cost of burn by only implementing at steady state. * burn should be the default if user has not set send to community explicitly * exchanges some dbg! statements with debug! statements to be able to control log output Co-authored-by: 0o-de-lally <[email protected]> Co-authored-by: Gökhan Şimşek <[email protected]> Co-authored-by: Sven Panko <[email protected]>
1 parent ad9c806 commit 2e9b257

File tree

13 files changed

+403
-33
lines changed

13 files changed

+403
-33
lines changed

config/src/config/mempool_config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,34 @@ use serde::{Deserialize, Serialize};
66
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
77
#[serde(default, deny_unknown_fields)]
88
pub struct MempoolConfig {
9+
/// What is the total size of the mempool queue, including invalid txs.
910
pub capacity: usize,
11+
/// How many txs can each user have in the mempool at a given time.
1012
pub capacity_per_user: usize,
13+
// a threshold for fullnodes to determine which peers to broadcast to.
14+
// peers which are go over this threshold, will receive broadcasts.
1115
// number of failovers to broadcast to when the primary network is alive
1216
pub default_failovers: usize,
17+
// number of times a mempool broadcast gets re-sent to a peer if the previous was unacknowledged.
1318
pub max_broadcasts_per_peer: usize,
19+
// how often to snapshot the mempool for analytics purposes.
1420
pub mempool_snapshot_interval_secs: u64,
21+
// how long to wait for a peer after a broadcast was submitted, before we mark it as unacknowledged.
1522
pub shared_mempool_ack_timeout_ms: u64,
23+
// if peer_manager is in backoff mode mempool/src/shared_mempool/peer_manager.rs
24+
// this is the base interval for backing off.
1625
pub shared_mempool_backoff_interval_ms: u64,
26+
27+
// size of batch from mempool timeline to broadcast to peers.
1728
pub shared_mempool_batch_size: usize,
29+
// Number of workers to be spawned to receive inbound shared mempool broadcasts.
1830
pub shared_mempool_max_concurrent_inbound_syncs: usize,
31+
// the default interval to execute shared mempool broadcasts to peers.
32+
// this is overriden when peer is in backoff mode.
1933
pub shared_mempool_tick_interval_ms: u64,
34+
/// when a transaction gets automatically garbage collected by system. Different than user tx expiry which has separate GC
2035
pub system_transaction_timeout_secs: u64,
36+
/// tick interval for system GC.
2137
pub system_transaction_gc_interval_ms: u64,
2238
}
2339

config/src/config/state_sync_config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,16 @@ pub struct StateSyncConfig {
1111
// The timeout of the state sync client to process a commit notification (in milliseconds)
1212
pub client_commit_timeout_ms: u64,
1313
// default timeout used for long polling to remote peer
14+
// this is only used by fullnodes
1415
pub long_poll_timeout_ms: u64,
1516
// valid maximum chunk limit for sanity check
1617
pub max_chunk_limit: u64,
1718
// valid maximum timeout limit for sanity check
19+
// This timeout applies to the process_request_for_target_and_highest
20+
// if the chunk cannot be applied now, then insert it in a subscription to appply. The subscription expires at max_timeout_ms
1821
pub max_timeout_ms: u64,
1922
// The timeout of the state sync coordinator to receive a commit ack from mempool (in milliseconds)
23+
// Stops sending
2024
pub mempool_commit_timeout_ms: u64,
2125
// default timeout to make state sync progress by sending chunk requests to a certain number of networks
2226
// if no progress is made by sending chunk requests to a number of networks,
@@ -26,6 +30,7 @@ pub struct StateSyncConfig {
2630
// commits when processing a sync request).
2731
pub sync_request_timeout_ms: u64,
2832
// interval used for checking state synchronization progress
33+
// IMPORTANT: the mempool peer ack timeout is determined by 2X this number.
2934
pub tick_interval_ms: u64,
3035
}
3136

execution/executor/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ impl<V: VMExecutor> ChunkExecutor for Executor<V> {
618618

619619
let latency = start_time.elapsed();
620620
metrics_timer_vl.observe_duration();
621-
dbg!("verify_chunk latency", &latency);
621+
debug!("verify_chunk latency: {:?}", &latency);
622622

623623
// 3. Execute transactions.
624624

@@ -632,7 +632,7 @@ impl<V: VMExecutor> ChunkExecutor for Executor<V> {
632632

633633
let latency = start_time.elapsed();
634634
metrics_timer_el.observe_duration();
635-
dbg!("execute_chunk latency", &latency);
635+
debug!("execute_chunk latency: {:?}", &latency);
636636

637637

638638
// temp time the transaction execution.
@@ -657,7 +657,7 @@ impl<V: VMExecutor> ChunkExecutor for Executor<V> {
657657

658658
let latency = start_time.elapsed();
659659
metrics_timer_stxl.observe_duration();
660-
dbg!("save_transactions latency", &latency);
660+
debug!("save_transactions latency: {:?}", &latency);
661661

662662
// 5. Cache maintenance.
663663
let output_trees = output.executed_trees().clone();
112 Bytes
Binary file not shown.

language/diem-vm/src/diem_transaction_executor.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,8 @@ impl DiemVM {
512512
.map(|_return_vals| ())
513513
.or_else(|e| {
514514
println!("error here\n");
515-
dbg!(&proposer);
516-
dbg!(&previous_vote);
515+
debug!("proposer: {:?}", &proposer);
516+
debug!("previous vote: {:?}", &previous_vote);
517517

518518
expect_only_successful_execution(e, BLOCK_PROLOGUE.as_str(), log_context)
519519
})?;
@@ -777,16 +777,8 @@ impl DiemVM {
777777
let (vm_status, output, sender) =
778778
self.execute_single_transaction(&txn, data_cache, &log_context)?;
779779

780-
// match &txn {
781-
// PreprocessedTransaction::UserTransaction(t) => {
782-
// dbg!(&t.sequence_number());
783-
// },
784-
// _ => {},
785-
// };
786-
// dbg!("tx sender", &sender);
787-
// let latency = start_time.elapsed();
788780
metric_single_tx_lat.observe_duration();
789-
// dbg!("single tx latency", &latency);
781+
790782

791783
if !output.status().is_discarded() {
792784
data_cache.push_write_set(output.write_set());

mempool/src/counters.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ pub const STATE_SYNC_EVENT_LABEL: &str = "state_sync";
5555
pub const RECONFIG_EVENT_LABEL: &str = "reconfig";
5656
pub const PEER_BROADCAST_EVENT_LABEL: &str = "peer_broadcast";
5757

58+
//////// 0L ////////
59+
pub const CONSENSUS_REQUEST_LABEL: &str = "consensus_request";
60+
61+
5862
// task spawn stage labels
5963
pub const SPAWN_LABEL: &str = "spawn";
6064
pub const START_LABEL: &str = "start";
@@ -449,3 +453,72 @@ pub static MAIN_LOOP: Lazy<DurationHistogram> = Lazy::new(|| {
449453
.unwrap(),
450454
)
451455
});
456+
457+
458+
459+
//////// 0L ////////
460+
/// Counter for my node
461+
pub static SELF_REQUEST_BACKOFF: Lazy<IntCounter> = Lazy::new(|| {
462+
register_int_counter!(
463+
"diem_mempool_self_request_backoff",
464+
"Number of times my node requested backoff"
465+
)
466+
.unwrap()
467+
});
468+
469+
pub static COORDINATOR_HANDLE_CLIENT_EVENT: Lazy<IntCounter> = Lazy::new(|| {
470+
register_int_counter!(
471+
"diem_mempool_coordinator_handle_client_event",
472+
"Number of times a client event was handled in mempool"
473+
)
474+
.unwrap()
475+
});
476+
477+
pub static COORDINATOR_HANDLE_CONSENSUS_EVENT: Lazy<IntCounter> = Lazy::new(|| {
478+
register_int_counter!(
479+
"diem_mempool_coordinator_handle_consensus_event",
480+
"Number of times a consensus event was handled in mempool"
481+
)
482+
.unwrap()
483+
});
484+
485+
pub static COORDINATOR_HANDLE_STATE_SYNC_EVENT: Lazy<IntCounter> = Lazy::new(|| {
486+
register_int_counter!(
487+
"diem_mempool_coordinator_handle_state_sync_event",
488+
"Number of times a state-sync event was handled in mempool"
489+
)
490+
.unwrap()
491+
});
492+
493+
pub static COORDINATOR_HANDLE_MEMPOOL_RECONFIG_EVENT: Lazy<IntCounter> = Lazy::new(|| {
494+
register_int_counter!(
495+
"diem_mempool_coordinator_handle_mempool_reconfig_event",
496+
"Number of times a mempool reconfiguration event was handled in mempool"
497+
)
498+
.unwrap()
499+
});
500+
501+
pub static TASKS_PROCESS_TX_BROADCAST_EVENT: Lazy<IntCounter> = Lazy::new(|| {
502+
register_int_counter!(
503+
"diem_mempool_tasks_process_tx_broadcast_event",
504+
"Number of times a transaction broadcast event was handled in mempool"
505+
)
506+
.unwrap()
507+
});
508+
509+
pub static TASKS_PROCESS_CONSENSUS_REQUEST_EVENT: Lazy<IntCounter> = Lazy::new(|| {
510+
register_int_counter!(
511+
"diem_mempool_tasks_process_consensus_request_event",
512+
"Number of times a consensus request was processed in mempool"
513+
)
514+
.unwrap()
515+
});
516+
517+
pub static PEER_MANAGER_PEER_REQUESTED_BACKOFF: Lazy<IntCounterVec> = Lazy::new(|| {
518+
register_int_counter_vec!(
519+
"diem_mempool_peer_requested_backoff",
520+
"Number of backoff requests from peers",
521+
&["network", "peer"]
522+
)
523+
.unwrap()
524+
});

mempool/src/shared_mempool/coordinator.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,21 +73,36 @@ pub(crate) async fn coordinator<V>(
7373
let _timer = counters::MAIN_LOOP.start_timer();
7474
::futures::select! {
7575
(msg, callback) = client_events.select_next_some() => {
76+
debug!("handle_client_event");
77+
counters::COORDINATOR_HANDLE_CLIENT_EVENT.inc();
7678
handle_client_event(&mut smp, &bounded_executor, msg, callback).await;
7779
},
80+
// 0L TODO: execute mempool tasks in a bounded execution with capacity.
7881
msg = consensus_requests.select_next_some() => {
79-
tasks::process_consensus_request(&smp.mempool, msg).await;
82+
debug!("process_consensus_request");
83+
counters::COORDINATOR_HANDLE_CONSENSUS_EVENT.inc();
84+
//////// 0L ////////
85+
// The goal here is to put consensus requests also in a Tokio Semaphore (diem BoundedExecutor) where we can control the amount of workers and put backpressure.
86+
87+
handle_consensus_request(&mut smp, &bounded_executor, msg).await;
88+
// tasks::process_consensus_request(&smp.mempool, msg).await;
8089
}
8190
msg = state_sync_requests.select_next_some() => {
91+
debug!("state_sync_requests");
92+
counters::COORDINATOR_HANDLE_STATE_SYNC_EVENT.inc();
8293
handle_state_sync_request(&mut smp, msg);
8394
}
8495
config_update = mempool_reconfig_events.select_next_some() => {
96+
debug!("handle_mempool_reconfig_event");
97+
counters::COORDINATOR_HANDLE_MEMPOOL_RECONFIG_EVENT.inc();
8598
handle_mempool_reconfig_event(&mut smp, &bounded_executor, config_update).await;
8699
},
87100
(peer, backoff) = scheduled_broadcasts.select_next_some() => {
88101
tasks::execute_broadcast(peer, backoff, &mut smp, &mut scheduled_broadcasts, executor.clone());
89102
},
90103
(network_id, event) = events.select_next_some() => {
104+
// dbg!("handle_event", &event.);
105+
91106
handle_event(&executor, &bounded_executor, &mut scheduled_broadcasts, &mut smp, network_id, event).await;
92107
},
93108
complete => break,
@@ -124,6 +139,27 @@ async fn handle_client_event<V>(
124139
.await;
125140
}
126141

142+
//////// 0L ////////
143+
async fn handle_consensus_request<V>(
144+
smp: &mut SharedMempool<V>,
145+
bounded_executor: &BoundedExecutor,
146+
msg: ConsensusRequest,
147+
) where
148+
V: TransactionValidation,
149+
{
150+
// This timer measures how long it took for the bounded executor to *schedule* the
151+
// task.
152+
let _timer =
153+
counters::task_spawn_latency_timer(counters::CONSENSUS_REQUEST_LABEL, counters::SPAWN_LABEL);
154+
// This timer measures how long it took for the task to go from scheduled to started.
155+
let _task_start_timer =
156+
counters::task_spawn_latency_timer(counters::CONSENSUS_REQUEST_LABEL, counters::START_LABEL);
157+
158+
bounded_executor
159+
.spawn(tasks::process_consensus_request(smp.clone(), msg))
160+
.await;
161+
}
162+
127163
fn handle_state_sync_request<V>(smp: &mut SharedMempool<V>, msg: CommitNotification)
128164
where
129165
V: TransactionValidation,

mempool/src/shared_mempool/peer_manager.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub(crate) type PeerSyncStates = HashMap<PeerNetworkId, PeerSyncState>;
3535
/// State of last sync with peer:
3636
/// `timeline_id` is position in log of ready transactions
3737
/// `is_alive` - is connection healthy
38-
#[derive(Clone)]
38+
#[derive(Clone, Debug)]
3939
pub(crate) struct PeerSyncState {
4040
pub timeline_id: u64,
4141
pub is_alive: bool,
@@ -81,7 +81,7 @@ impl Ord for BatchId {
8181
}
8282

8383
/// Txn broadcast-related info for a given remote peer.
84-
#[derive(Clone)]
84+
#[derive(Clone, Debug)]
8585
pub struct BroadcastInfo {
8686
// Sent broadcasts that have not yet received an ack.
8787
pub sent_batches: BTreeMap<BatchId, SystemTime>,
@@ -140,6 +140,7 @@ impl PeerManager {
140140

141141
/// Disables a peer if it can be restarted, otherwise removes it
142142
pub fn disable_peer(&self, peer: PeerNetworkId) {
143+
error!("shared mempool disable peer {:?}", &peer);
143144
// Remove all state on the peer, and start over
144145
self.peer_states.lock().remove(&peer);
145146
counters::active_upstream_peers(&peer.raw_network_id()).dec();
@@ -150,6 +151,7 @@ impl PeerManager {
150151

151152
pub fn is_backoff_mode(&self, peer: &PeerNetworkId) -> bool {
152153
if let Some(state) = self.peer_states.lock().get(peer) {
154+
warn!("shared mempool is in backoff mode for peer: {:?} ", &peer);
153155
state.broadcast_info.backoff_mode
154156
} else {
155157
// If we don't have sync state, we shouldn't backoff
@@ -165,6 +167,10 @@ impl PeerManager {
165167
) where
166168
V: TransactionValidation,
167169
{
170+
171+
// dbg!("execute broadcast");
172+
// dbg!(&self.peer_states);
173+
168174
// Start timer for tracking broadcast latency.
169175
let start_time = Instant::now();
170176

@@ -178,6 +184,7 @@ impl PeerManager {
178184

179185
// Only broadcast to peers that are alive.
180186
if !state.is_alive {
187+
error!("shared mempool peer is not alive: {:?}", &state.metadata);
181188
return;
182189
}
183190

@@ -242,6 +249,7 @@ impl PeerManager {
242249
// This helps rate-limit egress network bandwidth and not overload a remote peer or this
243250
// node's Diem network sender.
244251
if pending_broadcasts >= self.mempool_config.max_broadcasts_per_peer {
252+
error!("will stop broadcasting shared mempool to peer: {:?}", &peer);
245253
return;
246254
}
247255
}
@@ -370,6 +378,7 @@ impl PeerManager {
370378
let _ = std::mem::replace(&mut *prioritized_peers, peers);
371379
}
372380

381+
/// Node receives ack from peer.
373382
pub fn process_broadcast_ack(
374383
&self,
375384
peer: PeerNetworkId,
@@ -433,6 +442,11 @@ impl PeerManager {
433442
// as a backoff broadcast.
434443
// This ensures backpressure request from remote peer is honored at least once.
435444
if backoff {
445+
counters::PEER_MANAGER_PEER_REQUESTED_BACKOFF.with_label_values(&[
446+
&peer.raw_network_id().to_string(),
447+
&peer.peer_id().to_string(),
448+
]).inc();
449+
error!("Peer requested backoff: {:?}", &peer);
436450
sync_state.broadcast_info.backoff_mode = true;
437451
}
438452
}

0 commit comments

Comments
 (0)