Skip to content

Commit bea22e2

Browse files
authored
Reapply "str-288: parallel encoding with Rayon threadpool and plugin lifecycle management (#661)"
This reverts commit 04c54df. fix: attach parallel encoding to all broadcast_tx.send() paths
1 parent 30f4a51 commit bea22e2

8 files changed

Lines changed: 272 additions & 7 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ prost = "0.14.0"
5454
prost-types = "0.14.0"
5555
prost_011 = { package = "prost", version = "0.11.9" }
5656
protoc-bin-vendored = "3.2.0"
57+
rayon = "1.11.0"
5758
serde = "1.0.145"
5859
serde_json = "1.0.86"
5960
smallvec = "1.15.1"

yellowstone-grpc-geyser/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pin-project = { workspace = true }
3939
prometheus = { workspace = true }
4040
prost = { workspace = true }
4141
prost-types = { workspace = true }
42+
rayon = { workspace = true }
4243
serde = { workspace = true }
4344
serde_json = { workspace = true }
4445
thiserror = { workspace = true }

yellowstone-grpc-geyser/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,12 @@ pub struct ConfigGrpc {
200200
deserialize_with = "deserialize_int_str"
201201
)]
202202
pub replay_stored_slots: u64,
203+
/// Number of threads for parallel encoding
204+
#[serde(
205+
default = "ConfigGrpc::encoder_threads_default",
206+
deserialize_with = "deserialize_int_str"
207+
)]
208+
pub encoder_threads: usize,
203209
#[serde(default)]
204210
pub server_http2_adaptive_window: Option<bool>,
205211
#[serde(default, with = "humantime_serde")]
@@ -255,6 +261,10 @@ impl ConfigGrpc {
255261
const fn default_replay_stored_slots() -> u64 {
256262
0
257263
}
264+
265+
const fn encoder_threads_default() -> usize {
266+
4
267+
}
258268
}
259269

260270
#[derive(Debug, Clone, Deserialize)]

yellowstone-grpc-geyser/src/grpc.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use {
44
metered::MeteredLayer,
55
metrics::{
66
self, incr_grpc_method_call_count, set_subscriber_queue_size, DebugClientMessage,
7+
GEYSER_BATCH_SIZE,
78
},
9+
parallel::ParallelEncoder,
810
plugin::{
911
filter::{
1012
limits::FilterLimits,
@@ -495,6 +497,7 @@ impl GrpcService {
495497
is_reload: bool,
496498
service_cancellation_token: CancellationToken,
497499
task_tracker: TaskTracker,
500+
parallel_encoder: ParallelEncoder,
498501
) -> anyhow::Result<(
499502
Option<crossbeam_channel::Sender<Box<Message>>>,
500503
mpsc::UnboundedSender<Message>,
@@ -613,6 +616,7 @@ impl GrpcService {
613616
replay_stored_slots_rx,
614617
replay_first_available_slot,
615618
config.replay_stored_slots,
619+
parallel_encoder,
616620
)
617621
.await;
618622
});
@@ -655,6 +659,7 @@ impl GrpcService {
655659
replay_stored_slots_rx: Option<mpsc::Receiver<ReplayStoredSlotsRequest>>,
656660
replay_first_available_slot: Option<Arc<AtomicU64>>,
657661
replay_stored_slots: u64,
662+
parallel_encoder: ParallelEncoder,
658663
) {
659664
const PROCESSED_MESSAGES_MAX: usize = 31;
660665
const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10);
@@ -899,8 +904,10 @@ impl GrpcService {
899904

900905
// processed
901906
processed_messages.push(message.clone());
907+
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
908+
let encoded = parallel_encoder.encode(processed_messages).await;
902909
let _ =
903-
broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
910+
broadcast_tx.send((CommitmentLevel::Processed, encoded.into()));
904911
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
905912
processed_sleep
906913
.as_mut()
@@ -938,8 +945,10 @@ impl GrpcService {
938945
|| !confirmed_messages.is_empty()
939946
|| !finalized_messages.is_empty()
940947
{
948+
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
949+
let encoded = parallel_encoder.encode(processed_messages).await;
941950
let _ = broadcast_tx
942-
.send((CommitmentLevel::Processed, processed_messages.into()));
951+
.send((CommitmentLevel::Processed, encoded.into()));
943952
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
944953
processed_sleep
945954
.as_mut()
@@ -960,7 +969,9 @@ impl GrpcService {
960969
}
961970
() = &mut processed_sleep => {
962971
if !processed_messages.is_empty() {
963-
let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
972+
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
973+
let encoded = parallel_encoder.encode(processed_messages).await;
974+
let _ = broadcast_tx.send((CommitmentLevel::Processed, encoded.into()));
964975
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
965976
}
966977
processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);

yellowstone-grpc-geyser/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod config;
22
pub mod grpc;
33
pub mod metered;
44
pub mod metrics;
5+
pub mod parallel;
56
pub mod plugin;
67
pub mod transport;
78
pub(crate) mod util;
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
use {
2+
crate::plugin::{
3+
filter::encoder::{AccountEncoder, TransactionEncoder},
4+
message::Message,
5+
},
6+
rayon::{ThreadPool, ThreadPoolBuilder},
7+
tokio::sync::{mpsc, oneshot},
8+
};
9+
10+
pub struct ParallelEncoder {
11+
tx: mpsc::UnboundedSender<EncodeRequest>,
12+
}
13+
14+
struct EncodeRequest {
15+
batch: Vec<(u64, Message)>,
16+
response: oneshot::Sender<Vec<(u64, Message)>>,
17+
}
18+
19+
impl ParallelEncoder {
20+
pub fn new(num_threads: usize) -> (Self, std::thread::JoinHandle<()>) {
21+
let pool = ThreadPoolBuilder::new()
22+
.num_threads(num_threads)
23+
.thread_name(|i| format!("geyser-encoder-{i}"))
24+
.build()
25+
.expect("failed to create rayon pool");
26+
27+
let (tx, rx) = mpsc::unbounded_channel();
28+
29+
let handle = std::thread::Builder::new()
30+
.name("geyser-encoder-bridge".into())
31+
.spawn(move || Self::bridge_loop(rx, pool))
32+
.expect("failed to spawn encoder bridge");
33+
34+
(Self { tx }, handle)
35+
}
36+
37+
fn bridge_loop(mut rx: mpsc::UnboundedReceiver<EncodeRequest>, pool: ThreadPool) {
38+
use rayon::prelude::*;
39+
40+
while let Some(req) = rx.blocking_recv() {
41+
let EncodeRequest {
42+
mut batch,
43+
response,
44+
} = req;
45+
46+
pool.install(|| {
47+
batch.par_iter_mut().for_each(|(_msgid, msg)| {
48+
Self::encode_message(msg);
49+
});
50+
});
51+
52+
let _ = response.send(batch);
53+
}
54+
55+
log::info!("exiting encoder bridge loop");
56+
}
57+
58+
fn encode_message(msg: &Message) {
59+
match msg {
60+
Message::Transaction(tx) => {
61+
if tx.transaction.pre_encoded.get().is_none() {
62+
TransactionEncoder::pre_encode(&tx.transaction);
63+
}
64+
}
65+
Message::Account(acc) => {
66+
if acc.account.pre_encoded.get().is_none() {
67+
AccountEncoder::pre_encode(&acc.account);
68+
}
69+
}
70+
_ => {}
71+
}
72+
}
73+
74+
pub async fn encode(&self, batch: Vec<(u64, Message)>) -> Vec<(u64, Message)> {
75+
if batch.len() < 4 {
76+
return Self::encode_sync(batch);
77+
}
78+
79+
let (tx, rx) = oneshot::channel();
80+
81+
// move batch, don't clone
82+
if self
83+
.tx
84+
.send(EncodeRequest {
85+
batch,
86+
response: tx,
87+
})
88+
.is_err()
89+
{
90+
// channel closed - this shouldn't happen in normal operation
91+
panic!("encoder channel closed");
92+
}
93+
94+
rx.await.expect("encoder response failed")
95+
}
96+
97+
fn encode_sync(mut batch: Vec<(u64, Message)>) -> Vec<(u64, Message)> {
98+
for (_msgid, msg) in &mut batch {
99+
Self::encode_message(msg);
100+
}
101+
batch
102+
}
103+
}
104+
105+
#[cfg(test)]
106+
mod tests {
107+
use {
108+
super::*,
109+
crate::plugin::message::{
110+
MessageAccount, MessageAccountInfo, MessageTransaction, MessageTransactionInfo,
111+
},
112+
bytes::Bytes,
113+
prost_types::Timestamp,
114+
solana_pubkey::Pubkey,
115+
solana_signature::Signature,
116+
std::{
117+
sync::{Arc, OnceLock},
118+
time::SystemTime,
119+
},
120+
};
121+
122+
fn create_test_transaction() -> Message {
123+
let tx_info = MessageTransactionInfo {
124+
signature: Signature::from([1u8; 64]),
125+
is_vote: false,
126+
transaction: Default::default(),
127+
meta: Default::default(),
128+
index: 0,
129+
account_keys: Default::default(),
130+
pre_encoded: OnceLock::new(),
131+
};
132+
Message::Transaction(MessageTransaction {
133+
transaction: Arc::new(tx_info),
134+
slot: 100,
135+
created_at: Timestamp::from(SystemTime::now()),
136+
})
137+
}
138+
139+
fn create_test_account() -> Message {
140+
let acc_info = MessageAccountInfo {
141+
pubkey: Pubkey::new_unique(),
142+
lamports: 1000,
143+
owner: Pubkey::new_unique(),
144+
executable: false,
145+
rent_epoch: 0,
146+
data: Bytes::from(vec![1, 2, 3]),
147+
write_version: 1,
148+
txn_signature: None,
149+
pre_encoded: OnceLock::new(),
150+
};
151+
Message::Account(MessageAccount {
152+
account: Arc::new(acc_info),
153+
slot: 100,
154+
is_startup: false,
155+
created_at: Timestamp::from(SystemTime::now()),
156+
})
157+
}
158+
159+
#[tokio::test]
160+
async fn test_parallel_encoder_transactions() {
161+
let (encoder, _handle) = ParallelEncoder::new(2);
162+
163+
let batch: Vec<(u64, Message)> = (0..10).map(|i| (i, create_test_transaction())).collect();
164+
165+
let encoded = encoder.encode(batch).await;
166+
167+
assert_eq!(encoded.len(), 10);
168+
for (_msgid, msg) in encoded {
169+
if let Message::Transaction(tx) = msg {
170+
assert!(
171+
tx.transaction.pre_encoded.get().is_some(),
172+
"transaction should be encoded"
173+
);
174+
}
175+
}
176+
}
177+
178+
#[tokio::test]
179+
async fn test_parallel_encoder_accounts() {
180+
let (encoder, _handle) = ParallelEncoder::new(2);
181+
182+
let batch: Vec<(u64, Message)> = (0..10).map(|i| (i, create_test_account())).collect();
183+
184+
let encoded = encoder.encode(batch).await;
185+
186+
assert_eq!(encoded.len(), 10);
187+
for (_msgid, msg) in encoded {
188+
if let Message::Account(acc) = msg {
189+
assert!(
190+
acc.account.pre_encoded.get().is_some(),
191+
"account should be encoded"
192+
);
193+
}
194+
}
195+
}
196+
197+
#[tokio::test]
198+
async fn test_small_batch_uses_sync() {
199+
let (encoder, _handle) = ParallelEncoder::new(2);
200+
201+
// Small batch < 4 should use sync path
202+
let batch: Vec<(u64, Message)> = (0..2).map(|i| (i, create_test_transaction())).collect();
203+
204+
let encoded = encoder.encode(batch).await;
205+
206+
assert_eq!(encoded.len(), 2);
207+
}
208+
209+
#[tokio::test]
210+
async fn test_mixed_batch() {
211+
let (encoder, _handle) = ParallelEncoder::new(2);
212+
213+
let mut batch: Vec<(u64, Message)> = Vec::new();
214+
for i in 0..5 {
215+
batch.push((i * 2, create_test_transaction()));
216+
batch.push((i * 2 + 1, create_test_account()));
217+
}
218+
219+
let encoded = encoder.encode(batch).await;
220+
221+
assert_eq!(encoded.len(), 10);
222+
}
223+
}

0 commit comments

Comments
 (0)