Skip to content

Commit e799df1

Browse files
authored
Reapply "str-288: parallel encoding with Rayon threadpool and plugin lifecycle management (#661)"
This reverts commit 04c54df. fix: attach parallel encoding to all broadcast_tx.send() paths
1 parent 995ce6f commit e799df1

9 files changed

Lines changed: 282 additions & 117 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ prost = "0.14.0"
5454
prost-types = "0.14.0"
5555
prost_011 = { package = "prost", version = "0.11.9" }
5656
protoc-bin-vendored = "3.2.0"
57+
rayon = "1.11.0"
5758
serde = "1.0.145"
5859
serde_json = "1.0.86"
5960
smallvec = "1.15.1"

yellowstone-grpc-geyser/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pin-project = { workspace = true }
3939
prometheus = { workspace = true }
4040
prost = { workspace = true }
4141
prost-types = { workspace = true }
42+
rayon = { workspace = true }
4243
serde = { workspace = true }
4344
serde_json = { workspace = true }
4445
thiserror = { workspace = true }

yellowstone-grpc-geyser/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,12 @@ pub struct ConfigGrpc {
213213
deserialize_with = "deserialize_int_str"
214214
)]
215215
pub replay_stored_slots: u64,
216+
/// Number of threads for parallel encoding
217+
#[serde(
218+
default = "ConfigGrpc::encoder_threads_default",
219+
deserialize_with = "deserialize_int_str"
220+
)]
221+
pub encoder_threads: usize,
216222
#[serde(default)]
217223
pub server_http2_adaptive_window: Option<bool>,
218224
#[serde(default, with = "humantime_serde")]
@@ -272,6 +278,10 @@ impl ConfigGrpc {
272278
const fn default_replay_stored_slots() -> u64 {
273279
0
274280
}
281+
282+
const fn encoder_threads_default() -> usize {
283+
4
284+
}
275285
}
276286

277287
#[derive(Debug, Clone, Deserialize)]

yellowstone-grpc-geyser/src/grpc.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ use {
44
metered::MeteredLayer,
55
metrics::{
66
self, incr_grpc_method_call_count, set_subscriber_queue_size,
7-
subscription_limit_exceeded_inc, DebugClientMessage,
7+
subscription_limit_exceeded_inc, DebugClientMessage, GEYSER_BATCH_SIZE,
88
},
9+
parallel::ParallelEncoder,
910
plugin::{
1011
filter::{
1112
limits::FilterLimits,
@@ -516,6 +517,7 @@ impl GrpcService {
516517
is_reload: bool,
517518
service_cancellation_token: CancellationToken,
518519
task_tracker: TaskTracker,
520+
parallel_encoder: ParallelEncoder,
519521
) -> anyhow::Result<(
520522
Option<crossbeam_channel::Sender<Box<Message>>>,
521523
mpsc::UnboundedSender<Message>,
@@ -637,6 +639,7 @@ impl GrpcService {
637639
replay_stored_slots_rx,
638640
replay_first_available_slot,
639641
config.replay_stored_slots,
642+
parallel_encoder,
640643
)
641644
.await;
642645
});
@@ -679,6 +682,7 @@ impl GrpcService {
679682
replay_stored_slots_rx: Option<mpsc::Receiver<ReplayStoredSlotsRequest>>,
680683
replay_first_available_slot: Option<Arc<AtomicU64>>,
681684
replay_stored_slots: u64,
685+
parallel_encoder: ParallelEncoder,
682686
) {
683687
const PROCESSED_MESSAGES_MAX: usize = 31;
684688
const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10);
@@ -923,8 +927,10 @@ impl GrpcService {
923927

924928
// processed
925929
processed_messages.push(message.clone());
930+
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
931+
let encoded = parallel_encoder.encode(processed_messages).await;
926932
let _ =
927-
broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
933+
broadcast_tx.send((CommitmentLevel::Processed, encoded.into()));
928934
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
929935
processed_sleep
930936
.as_mut()
@@ -962,8 +968,10 @@ impl GrpcService {
962968
|| !confirmed_messages.is_empty()
963969
|| !finalized_messages.is_empty()
964970
{
971+
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
972+
let encoded = parallel_encoder.encode(processed_messages).await;
965973
let _ = broadcast_tx
966-
.send((CommitmentLevel::Processed, processed_messages.into()));
974+
.send((CommitmentLevel::Processed, encoded.into()));
967975
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
968976
processed_sleep
969977
.as_mut()
@@ -984,7 +992,9 @@ impl GrpcService {
984992
}
985993
() = &mut processed_sleep => {
986994
if !processed_messages.is_empty() {
987-
let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
995+
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
996+
let encoded = parallel_encoder.encode(processed_messages).await;
997+
let _ = broadcast_tx.send((CommitmentLevel::Processed, encoded.into()));
988998
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
989999
}
9901000
processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);

yellowstone-grpc-geyser/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod config;
22
pub mod grpc;
33
pub mod metered;
44
pub mod metrics;
5+
pub mod parallel;
56
pub mod plugin;
67
pub mod transport;
78
pub(crate) mod util;

yellowstone-grpc-geyser/src/metrics.rs

Lines changed: 10 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,6 @@ lazy_static::lazy_static! {
104104
&["subscriber_id"]
105105
).unwrap();
106106

107-
static ref GRPC_SUBCRIBER_RX_LOAD: IntGaugeVec = IntGaugeVec::new(
108-
Opts::new(
109-
"grpc_subscriber_recv_bandwidth_load",
110-
"Current Receiver rate of subscriber channel (in bytes per second)"
111-
),
112-
&["subscriber_id"]
113-
).unwrap();
114-
115107
static ref GRPC_CLIENT_DISCONNECTS: IntCounterVec = IntCounterVec::new(
116108
Opts::new(
117109
"grpc_client_disconnects_total",
@@ -146,56 +138,12 @@ lazy_static::lazy_static! {
146138
&["type"]
147139
).unwrap();
148140

149-
static ref PRE_ENCODED_CACHE_SKIP: IntCounterVec = IntCounterVec::new(
150-
Opts::new("yellowstone_grpc_pre_encoded_cache_skip", "Pre-encoded cache skips due to unempty data slices"),
151-
&["type"]
152-
).unwrap();
153-
154-
static ref PRE_ENCODE_POPULATED_ON_ARRIVAL: IntCounterVec = IntCounterVec::new(
155-
Opts::new("yellowstone_grpc_pre_encode_populated_on_arrival", "OnceLock already populated when subscriber encodes"),
156-
&["type"]
157-
).unwrap();
158-
159-
static ref PRE_ENCODE_EMPTY_ON_ARRIVAL: IntCounterVec = IntCounterVec::new(
160-
Opts::new("yellowstone_grpc_pre_encode_empty_on_arrival", "OnceLock empty when subscriber encodes"),
161-
&["type"]
162-
).unwrap();
163-
164-
static ref PARALLEL_ENCODER_LATENCY_US: Histogram = Histogram::with_opts(
165-
HistogramOpts::new(
166-
"yellowstone_grpc_parallel_encoder_latency_us",
167-
"Time spent in parallel encoder encode() in microseconds"
168-
)
169-
.buckets(vec![10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0])
170-
).unwrap();
171-
172-
static ref PARALLEL_ENCODER_QUEUE_DEPTH: IntGauge = IntGauge::new(
173-
"yellowstone_grpc_parallel_encoder_queue_depth",
174-
"Number of batches waiting to be encoded"
175-
).unwrap();
176-
177-
static ref FAST_PATH_TIME_US: Histogram = Histogram::with_opts(
178-
HistogramOpts::new(
179-
"yellowstone_grpc_fast_path_time_us",
180-
"Time spent in pre-encoded cache hit path in microseconds"
181-
)
182-
.buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0, 50.0])
183-
).unwrap();
184-
185-
static ref FALLBACK_ENCODE_TIME_US: Histogram = Histogram::with_opts(
186-
HistogramOpts::new(
187-
"yellowstone_grpc_fallback_encode_time_us",
188-
"Time spent in fallback encode path in microseconds"
189-
)
190-
.buckets(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0])
191-
).unwrap();
192-
193-
static ref GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION: IntGaugeVec = IntGaugeVec::new(
194-
Opts::new(
195-
"grpc_concurrent_subscribe_per_tcp_connection",
196-
"Current concurrent subscriptions per remote TCP peer socket address"
197-
),
198-
&["remote_peer_sk_addr"]
141+
static ref GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION: IntGaugeVec = IntGaugeVec::new(
142+
Opts::new(
143+
"grpc_concurrent_subscribe_per_tcp_connection",
144+
"Current concurrent subscriptions per remote TCP peer socket address"
145+
),
146+
&["remote_peer_sk_addr"]
199147
).unwrap();
200148

201149
static ref TOTAL_TRAFFIC_SENT: IntCounter = IntCounter::new(
@@ -374,19 +322,11 @@ impl PrometheusService {
374322
register!(GRPC_BYTES_SENT);
375323
register!(GEYSER_ACCOUNT_UPDATE_RECEIVED);
376324
register!(GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD);
377-
register!(GRPC_SUBCRIBER_RX_LOAD);
378325
register!(GRPC_SUBSCRIBER_QUEUE_SIZE);
379326
register!(GEYSER_BATCH_SIZE);
380327
register!(GRPC_CLIENT_DISCONNECTS);
381328
register!(PRE_ENCODED_CACHE_HIT);
382329
register!(PRE_ENCODED_CACHE_MISS);
383-
register!(PRE_ENCODED_CACHE_SKIP);
384-
register!(PRE_ENCODE_POPULATED_ON_ARRIVAL);
385-
register!(PRE_ENCODE_EMPTY_ON_ARRIVAL);
386-
register!(PARALLEL_ENCODER_LATENCY_US);
387-
register!(PARALLEL_ENCODER_QUEUE_DEPTH);
388-
register!(FAST_PATH_TIME_US);
389-
register!(FALLBACK_ENCODE_TIME_US);
390330
register!(GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION);
391331
register!(TOTAL_TRAFFIC_SENT);
392332
register!(TRAFFIC_SENT_PER_REMOTE_IP);
@@ -599,12 +539,6 @@ pub fn set_subscriber_send_bandwidth_load<S: AsRef<str>>(subscriber_id: S, load:
599539
.set(load);
600540
}
601541

602-
pub fn set_subscriber_recv_bandwidth_load<S: AsRef<str>>(subscriber_id: S, load: i64) {
603-
GRPC_SUBCRIBER_RX_LOAD
604-
.with_label_values(&[subscriber_id.as_ref()])
605-
.set(load);
606-
}
607-
608542
pub fn set_subscriber_queue_size<S: AsRef<str>>(subscriber_id: S, size: u64) {
609543
GRPC_SUBSCRIBER_QUEUE_SIZE
610544
.with_label_values(&[subscriber_id.as_ref()])
@@ -625,42 +559,6 @@ pub fn pre_encoded_cache_miss(msg_type: &str) {
625559
PRE_ENCODED_CACHE_MISS.with_label_values(&[msg_type]).inc();
626560
}
627561

628-
pub fn pre_encoded_cache_skip(msg_type: &str) {
629-
PRE_ENCODED_CACHE_SKIP.with_label_values(&[msg_type]).inc();
630-
}
631-
632-
pub fn pre_encode_populated_on_arrival(msg_type: &str) {
633-
PRE_ENCODE_POPULATED_ON_ARRIVAL
634-
.with_label_values(&[msg_type])
635-
.inc();
636-
}
637-
638-
pub fn pre_encode_empty_on_arrival(msg_type: &str) {
639-
PRE_ENCODE_EMPTY_ON_ARRIVAL
640-
.with_label_values(&[msg_type])
641-
.inc();
642-
}
643-
644-
pub fn observe_parallel_encoder_latency_us(latency: f64) {
645-
PARALLEL_ENCODER_LATENCY_US.observe(latency);
646-
}
647-
648-
pub fn encoder_queue_depth_inc() {
649-
PARALLEL_ENCODER_QUEUE_DEPTH.inc();
650-
}
651-
652-
pub fn encoder_queue_depth_dec() {
653-
PARALLEL_ENCODER_QUEUE_DEPTH.dec();
654-
}
655-
656-
pub fn observe_fast_path_time_us(latency: f64) {
657-
FAST_PATH_TIME_US.observe(latency);
658-
}
659-
660-
pub fn observe_fallback_encode_time_us(latency: f64) {
661-
FALLBACK_ENCODE_TIME_US.observe(latency);
662-
}
663-
664562
pub fn incr_grpc_method_call_count<S: AsRef<str>>(method: S) {
665563
GRPC_METHOD_CALL_COUNT
666564
.with_label_values(&[method.as_ref()])
@@ -726,7 +624,6 @@ pub fn reset_metrics() {
726624
INVALID_FULL_BLOCKS.reset();
727625
GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD.reset();
728626
GRPC_SUBSCRIBER_QUEUE_SIZE.reset();
729-
GRPC_SUBCRIBER_RX_LOAD.reset();
730627

731628
// Reset counter vectors (clears all label combinations)
732629
MISSED_STATUS_MESSAGE.reset();
@@ -740,6 +637,10 @@ pub fn reset_metrics() {
740637
GRPC_SUBSCRIPTION_LIMIT_EXCEEDED.reset();
741638
GRPC_METHOD_CALL_COUNT.reset();
742639

640+
// Pre-encoding
641+
PRE_ENCODED_CACHE_HIT.reset();
642+
PRE_ENCODED_CACHE_MISS.reset();
643+
743644
// Note: VERSION and GEYSER_ACCOUNT_UPDATE_RECEIVED are intentionally not reset
744645
// - VERSION contains build info set once on startup
745646
// - GEYSER_ACCOUNT_UPDATE_RECEIVED is a Histogram which doesn't support reset()

0 commit comments

Comments
 (0)