diff --git a/CHANGELOG.md b/CHANGELOG.md index c99847804..fc79fdc9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The minor version will be incremented upon a breaking change and the patch versi - yellowstone-grpc-proto-12.1.0 ### Fixes +- geyser: replace Arc::get_mut with OnceLock for pre-encoding to support shared ownership, added metrics for pre-encode hit/miss ([#683](https://github.com/rpcpool/yellowstone-grpc/pull/683)) - geyser: replace `OnDrop` with `ClientSession` RAII guard in `client_loop`, fixing missed cleanup on early-exit paths ([#687](https://github.com/rpcpool/yellowstone-grpc/pull/687)), ([#690](https://github.com/rpcpool/yellowstone-grpc/pull/690)) - geyser: fix ping and traffic metric ([#698](https://github.com/rpcpool/yellowstone-grpc/pull/698)) diff --git a/Cargo.lock b/Cargo.lock index 37e3f6f08..8c9422ce1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5366,6 +5366,7 @@ dependencies = [ "prost 0.14.3", "prost-types 0.14.3", "protoc-bin-vendored", + "rayon", "serde", "serde_json", "smallvec", diff --git a/Cargo.toml b/Cargo.toml index 35f7b651b..d4cdc3345 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ prost = "0.14.0" prost-types = "0.14.0" prost_011 = { package = "prost", version = "0.11.9" } protoc-bin-vendored = "3.2.0" +rayon = "1.11.0" serde = "1.0.145" serde_json = "1.0.86" smallvec = "1.15.1" diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index adc343bbc..f83ad4e30 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -39,6 +39,7 @@ pin-project = { workspace = true } prometheus = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +rayon = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index bd2e8ae2a..e8beddc6a 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -213,6 +213,12 @@ pub struct ConfigGrpc { deserialize_with = "deserialize_int_str" )] pub replay_stored_slots: u64, + /// Number of threads for parallel encoding + #[serde( + default = "ConfigGrpc::encoder_threads_default", + deserialize_with = "deserialize_int_str" + )] + pub encoder_threads: usize, #[serde(default)] pub server_http2_adaptive_window: Option, #[serde(default, with = "humantime_serde")] @@ -272,6 +278,10 @@ impl ConfigGrpc { const fn default_replay_stored_slots() -> u64 { 0 } + + const fn encoder_threads_default() -> usize { + 4 + } } #[derive(Debug, Clone, Deserialize)] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 532b894eb..f1474ffd6 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -4,8 +4,9 @@ use { metered::MeteredLayer, metrics::{ self, incr_grpc_method_call_count, set_subscriber_queue_size, - subscription_limit_exceeded_inc, DebugClientMessage, + subscription_limit_exceeded_inc, DebugClientMessage, GEYSER_BATCH_SIZE, }, + parallel::ParallelEncoder, plugin::{ filter::{ limits::FilterLimits, @@ -516,6 +517,7 @@ impl GrpcService { is_reload: bool, service_cancellation_token: CancellationToken, task_tracker: TaskTracker, + parallel_encoder: ParallelEncoder, ) -> anyhow::Result<( Option>>, mpsc::UnboundedSender, @@ -637,6 +639,7 @@ impl GrpcService { replay_stored_slots_rx, replay_first_available_slot, config.replay_stored_slots, + parallel_encoder, ) .await; }); @@ -679,6 +682,7 @@ impl GrpcService { replay_stored_slots_rx: Option>, replay_first_available_slot: Option>, replay_stored_slots: u64, + parallel_encoder: ParallelEncoder, ) { const PROCESSED_MESSAGES_MAX: usize = 31; const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10); @@ -923,8 +927,10 @@ impl GrpcService { // processed processed_messages.push(message.clone()); + GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64); + let encoded = parallel_encoder.encode(processed_messages).await; let _ = - broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into())); + broadcast_tx.send((CommitmentLevel::Processed, encoded.into())); processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX); processed_sleep .as_mut() @@ -962,8 +968,10 @@ impl GrpcService { || !confirmed_messages.is_empty() || !finalized_messages.is_empty() { + GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64); + let encoded = parallel_encoder.encode(processed_messages).await; let _ = broadcast_tx - .send((CommitmentLevel::Processed, processed_messages.into())); + .send((CommitmentLevel::Processed, encoded.into())); processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX); processed_sleep .as_mut() @@ -984,7 +992,9 @@ impl GrpcService { } () = &mut processed_sleep => { if !processed_messages.is_empty() { - let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into())); + GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64); + let encoded = parallel_encoder.encode(processed_messages).await; + let _ = broadcast_tx.send((CommitmentLevel::Processed, encoded.into())); processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX); } processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP); diff --git a/yellowstone-grpc-geyser/src/lib.rs b/yellowstone-grpc-geyser/src/lib.rs index b364a206a..20f192cd5 100644 --- a/yellowstone-grpc-geyser/src/lib.rs +++ b/yellowstone-grpc-geyser/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod grpc; pub mod metered; pub mod metrics; +pub mod parallel; pub mod plugin; pub mod transport; pub(crate) mod util; diff --git a/yellowstone-grpc-geyser/src/metrics.rs b/yellowstone-grpc-geyser/src/metrics.rs index dfeb2eaba..62db13026 100644 --- a/yellowstone-grpc-geyser/src/metrics.rs +++ b/yellowstone-grpc-geyser/src/metrics.rs @@ -88,6 +88,13 @@ lazy_static::lazy_static! { &["subscriber_id"] ).unwrap(); + static ref GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD: IntGaugeVec = IntGaugeVec::new( + Opts::new( + "grpc_subscriber_send_bandwidth_load", + "Current Send load we send to subscriber channel (in bytes per second)" + ), + &["subscriber_id"] + ).unwrap(); static ref GRPC_SUBSCRIBER_QUEUE_SIZE: IntGaugeVec = IntGaugeVec::new( Opts::new( @@ -105,14 +112,6 @@ lazy_static::lazy_static! { &["subscriber_id", "reason"] ).unwrap(); - static ref GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION: IntGaugeVec = IntGaugeVec::new( - Opts::new( - "grpc_concurrent_subscribe_per_tcp_connection", - "Current concurrent subscriptions per remote TCP peer socket address" - ), - &["remote_peer_sk_addr"] - ).unwrap(); - static ref GEYSER_ACCOUNT_UPDATE_RECEIVED: Histogram = Histogram::with_opts( HistogramOpts::new( "geyser_account_update_data_size_kib", @@ -121,25 +120,44 @@ lazy_static::lazy_static! { .buckets(vec![5.0, 10.0, 20.0, 30.0, 50.0, 100.0, 200.0, 300.0, 500.0, 1000.0, 2000.0, 3000.0, 5000.0, 10000.0]) ).unwrap(); + pub static ref GEYSER_BATCH_SIZE: Histogram = Histogram::with_opts( + HistogramOpts::new( + "yellowstone_geyser_batch_size", + "Size of processed message batches" + ) + .buckets(vec![1.0, 4.0, 8.0, 16.0, 24.0, 31.0]) + ).unwrap(); + + static ref PRE_ENCODED_CACHE_HIT: IntCounterVec = IntCounterVec::new( + Opts::new("yellowstone_grpc_pre_encoded_cache_hit", "Pre-encoded cache hits by message type"), + &["type"] + ).unwrap(); + + static ref PRE_ENCODED_CACHE_MISS: IntCounterVec = IntCounterVec::new( + Opts::new("yellowstone_grpc_pre_encoded_cache_miss", "Pre-encoded cache misses by message type"), + &["type"] + ).unwrap(); + + static ref GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION: IntGaugeVec = IntGaugeVec::new( + Opts::new( + "grpc_concurrent_subscribe_per_tcp_connection", + "Current concurrent subscriptions per remote TCP peer socket address" + ), + &["remote_peer_sk_addr"] + ).unwrap(); + static ref TOTAL_TRAFFIC_SENT: IntCounter = IntCounter::new( "total_traffic_sent_bytes", "Total traffic sent to subscriber by type (account_update, block_meta, etc)" ).unwrap(); static ref TRAFFIC_SENT_PER_REMOTE_IP: IntCounterVec = IntCounterVec::new( - Opts::new( - "traffic_sent_per_remote_ip_bytes", - "Total traffic sent to subscriber by remote IP" - ), + Opts::new("traffic_sent_per_remote_ip_bytes", "Total traffic sent to subscriber by remote IP"), &["remote_ip"] ).unwrap(); - static ref GRPC_METHOD_CALL_COUNT: IntCounterVec = IntCounterVec::new( - Opts::new( - "yellowstone_grpc_method_call_count", - "Total number of calls to GetVersion gRPC method" - ), + Opts::new("yellowstone_grpc_method_call_count", "Total number of calls to GetVersion gRPC method"), &["method"] ).unwrap(); @@ -152,50 +170,9 @@ lazy_static::lazy_static! { ).unwrap(); static ref GRPC_SERVICE_OUTBOUND_BYTES: IntGaugeVec = IntGaugeVec::new( - Opts::new( - "yellowstone_grpc_service_outbound_bytes", - "Current emitted bytes by tonic service response bodies per active subscriber stream" - ), + Opts::new("yellowstone_grpc_service_outbound_bytes", "Current emitted bytes by tonic service response bodies per active subscriber stream"), &["subscriber_id"] ).unwrap(); - -} - -pub fn incr_grpc_method_call_count>(method: S) { - GRPC_METHOD_CALL_COUNT - .with_label_values(&[method.as_ref()]) - .inc(); -} - -pub fn add_grpc_service_outbound_bytes>(subscriber_id: S, bytes: u64) { - GRPC_SERVICE_OUTBOUND_BYTES - .with_label_values(&[subscriber_id.as_ref()]) - .add(bytes as i64); -} - -pub fn reset_grpc_service_outbound_bytes>(subscriber_id: S) { - GRPC_SERVICE_OUTBOUND_BYTES - .with_label_values(&[subscriber_id.as_ref()]) - .set(0); -} - -pub fn add_traffic_sent_per_remote_ip>(remote_ip: S, bytes: u64) { - TRAFFIC_SENT_PER_REMOTE_IP - .with_label_values(&[remote_ip.as_ref()]) - .inc_by(bytes); -} - -pub fn reset_traffic_sent_per_remote_ip>(remote_ip: S) { - TRAFFIC_SENT_PER_REMOTE_IP - .with_label_values(&[remote_ip.as_ref()]) - .reset(); - TRAFFIC_SENT_PER_REMOTE_IP - .remove_label_values(&[remote_ip.as_ref()]) - .expect("remove_label_values") -} - -pub fn add_total_traffic_sent(bytes: u64) { - TOTAL_TRAFFIC_SENT.inc_by(bytes); } #[derive(Debug)] @@ -344,8 +321,12 @@ impl PrometheusService { register!(GRPC_MESSAGE_SENT); register!(GRPC_BYTES_SENT); register!(GEYSER_ACCOUNT_UPDATE_RECEIVED); + register!(GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD); register!(GRPC_SUBSCRIBER_QUEUE_SIZE); + register!(GEYSER_BATCH_SIZE); register!(GRPC_CLIENT_DISCONNECTS); + register!(PRE_ENCODED_CACHE_HIT); + register!(PRE_ENCODED_CACHE_MISS); register!(GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION); register!(TOTAL_TRAFFIC_SENT); register!(TRAFFIC_SENT_PER_REMOTE_IP); @@ -552,6 +533,12 @@ pub fn observe_geyser_account_update_received(data_bytesize: usize) { GEYSER_ACCOUNT_UPDATE_RECEIVED.observe(data_bytesize as f64 / 1024.0); } +pub fn set_subscriber_send_bandwidth_load>(subscriber_id: S, load: i64) { + GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD + .with_label_values(&[subscriber_id.as_ref()]) + .set(load); +} + pub fn set_subscriber_queue_size>(subscriber_id: S, size: u64) { GRPC_SUBSCRIBER_QUEUE_SIZE .with_label_values(&[subscriber_id.as_ref()]) @@ -564,6 +551,51 @@ pub fn incr_client_disconnect>(subscriber_id: S, reason: &str) { .inc(); } +pub fn pre_encoded_cache_hit(msg_type: &str) { + PRE_ENCODED_CACHE_HIT.with_label_values(&[msg_type]).inc(); +} + +pub fn pre_encoded_cache_miss(msg_type: &str) { + PRE_ENCODED_CACHE_MISS.with_label_values(&[msg_type]).inc(); +} + +pub fn incr_grpc_method_call_count>(method: S) { + GRPC_METHOD_CALL_COUNT + .with_label_values(&[method.as_ref()]) + .inc(); +} + +pub fn add_grpc_service_outbound_bytes>(subscriber_id: S, bytes: u64) { + GRPC_SERVICE_OUTBOUND_BYTES + .with_label_values(&[subscriber_id.as_ref()]) + .add(bytes as i64); +} + +pub fn reset_grpc_service_outbound_bytes>(subscriber_id: S) { + GRPC_SERVICE_OUTBOUND_BYTES + .with_label_values(&[subscriber_id.as_ref()]) + .set(0); +} + +pub fn add_total_traffic_sent(bytes: u64) { + TOTAL_TRAFFIC_SENT.inc_by(bytes); +} + +pub fn add_traffic_sent_per_remote_ip>(remote_ip: S, bytes: u64) { + TRAFFIC_SENT_PER_REMOTE_IP + .with_label_values(&[remote_ip.as_ref()]) + .inc_by(bytes); +} + +pub fn reset_traffic_sent_per_remote_ip>(remote_ip: S) { + TRAFFIC_SENT_PER_REMOTE_IP + .with_label_values(&[remote_ip.as_ref()]) + .reset(); + TRAFFIC_SENT_PER_REMOTE_IP + .remove_label_values(&[remote_ip.as_ref()]) + .expect("remove_label_values"); +} + pub fn set_grpc_concurrent_subscribe_per_tcp_connection>( remote_peer_sk_addr: S, size: u64, @@ -590,20 +622,25 @@ pub fn reset_metrics() { SLOT_STATUS.reset(); SLOT_STATUS_PLUGIN.reset(); INVALID_FULL_BLOCKS.reset(); + GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD.reset(); GRPC_SUBSCRIBER_QUEUE_SIZE.reset(); - GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION.reset(); // Reset counter vectors (clears all label combinations) MISSED_STATUS_MESSAGE.reset(); GRPC_MESSAGE_SENT.reset(); GRPC_BYTES_SENT.reset(); GRPC_CLIENT_DISCONNECTS.reset(); + GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION.reset(); TOTAL_TRAFFIC_SENT.reset(); TRAFFIC_SENT_PER_REMOTE_IP.reset(); GRPC_SERVICE_OUTBOUND_BYTES.reset(); GRPC_SUBSCRIPTION_LIMIT_EXCEEDED.reset(); GRPC_METHOD_CALL_COUNT.reset(); + // Pre-encoding + PRE_ENCODED_CACHE_HIT.reset(); + PRE_ENCODED_CACHE_MISS.reset(); + // Note: VERSION and GEYSER_ACCOUNT_UPDATE_RECEIVED are intentionally not reset // - VERSION contains build info set once on startup // - GEYSER_ACCOUNT_UPDATE_RECEIVED is a Histogram which doesn't support reset() diff --git a/yellowstone-grpc-geyser/src/parallel.rs b/yellowstone-grpc-geyser/src/parallel.rs new file mode 100644 index 000000000..8f46cd055 --- /dev/null +++ b/yellowstone-grpc-geyser/src/parallel.rs @@ -0,0 +1,223 @@ +use { + crate::plugin::{ + filter::encoder::{AccountEncoder, TransactionEncoder}, + message::Message, + }, + rayon::{ThreadPool, ThreadPoolBuilder}, + tokio::sync::{mpsc, oneshot}, +}; + +pub struct ParallelEncoder { + tx: mpsc::UnboundedSender, +} + +struct EncodeRequest { + batch: Vec<(u64, Message)>, + response: oneshot::Sender>, +} + +impl ParallelEncoder { + pub fn new(num_threads: usize) -> (Self, std::thread::JoinHandle<()>) { + let pool = ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("geyser-encoder-{i}")) + .build() + .expect("failed to create rayon pool"); + + let (tx, rx) = mpsc::unbounded_channel(); + + let handle = std::thread::Builder::new() + .name("geyser-encoder-bridge".into()) + .spawn(move || Self::bridge_loop(rx, pool)) + .expect("failed to spawn encoder bridge"); + + (Self { tx }, handle) + } + + fn bridge_loop(mut rx: mpsc::UnboundedReceiver, pool: ThreadPool) { + use rayon::prelude::*; + + while let Some(req) = rx.blocking_recv() { + let EncodeRequest { + mut batch, + response, + } = req; + + pool.install(|| { + batch.par_iter_mut().for_each(|(_msgid, msg)| { + Self::encode_message(msg); + }); + }); + + let _ = response.send(batch); + } + + log::info!("exiting encoder bridge loop"); + } + + fn encode_message(msg: &Message) { + match msg { + Message::Transaction(tx) => { + if tx.transaction.pre_encoded.get().is_none() { + TransactionEncoder::pre_encode(&tx.transaction); + } + } + Message::Account(acc) => { + if acc.account.pre_encoded.get().is_none() { + AccountEncoder::pre_encode(&acc.account); + } + } + _ => {} + } + } + + pub async fn encode(&self, batch: Vec<(u64, Message)>) -> Vec<(u64, Message)> { + if batch.len() < 4 { + return Self::encode_sync(batch); + } + + let (tx, rx) = oneshot::channel(); + + // move batch, don't clone + if self + .tx + .send(EncodeRequest { + batch, + response: tx, + }) + .is_err() + { + // channel closed - this shouldn't happen in normal operation + panic!("encoder channel closed"); + } + + rx.await.expect("encoder response failed") + } + + fn encode_sync(mut batch: Vec<(u64, Message)>) -> Vec<(u64, Message)> { + for (_msgid, msg) in &mut batch { + Self::encode_message(msg); + } + batch + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::plugin::message::{ + MessageAccount, MessageAccountInfo, MessageTransaction, MessageTransactionInfo, + }, + bytes::Bytes, + prost_types::Timestamp, + solana_pubkey::Pubkey, + solana_signature::Signature, + std::{ + sync::{Arc, OnceLock}, + time::SystemTime, + }, + }; + + fn create_test_transaction() -> Message { + let tx_info = MessageTransactionInfo { + signature: Signature::from([1u8; 64]), + is_vote: false, + transaction: Default::default(), + meta: Default::default(), + index: 0, + account_keys: Default::default(), + pre_encoded: OnceLock::new(), + }; + Message::Transaction(MessageTransaction { + transaction: Arc::new(tx_info), + slot: 100, + created_at: Timestamp::from(SystemTime::now()), + }) + } + + fn create_test_account() -> Message { + let acc_info = MessageAccountInfo { + pubkey: Pubkey::new_unique(), + lamports: 1000, + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: 0, + data: Bytes::from(vec![1, 2, 3]), + write_version: 1, + txn_signature: None, + pre_encoded: OnceLock::new(), + }; + Message::Account(MessageAccount { + account: Arc::new(acc_info), + slot: 100, + is_startup: false, + created_at: Timestamp::from(SystemTime::now()), + }) + } + + #[tokio::test] + async fn test_parallel_encoder_transactions() { + let (encoder, _handle) = ParallelEncoder::new(2); + + let batch: Vec<(u64, Message)> = (0..10).map(|i| (i, create_test_transaction())).collect(); + + let encoded = encoder.encode(batch).await; + + assert_eq!(encoded.len(), 10); + for (_msgid, msg) in encoded { + if let Message::Transaction(tx) = msg { + assert!( + tx.transaction.pre_encoded.get().is_some(), + "transaction should be encoded" + ); + } + } + } + + #[tokio::test] + async fn test_parallel_encoder_accounts() { + let (encoder, _handle) = ParallelEncoder::new(2); + + let batch: Vec<(u64, Message)> = (0..10).map(|i| (i, create_test_account())).collect(); + + let encoded = encoder.encode(batch).await; + + assert_eq!(encoded.len(), 10); + for (_msgid, msg) in encoded { + if let Message::Account(acc) = msg { + assert!( + acc.account.pre_encoded.get().is_some(), + "account should be encoded" + ); + } + } + } + + #[tokio::test] + async fn test_small_batch_uses_sync() { + let (encoder, _handle) = ParallelEncoder::new(2); + + // Small batch < 4 should use sync path + let batch: Vec<(u64, Message)> = (0..2).map(|i| (i, create_test_transaction())).collect(); + + let encoded = encoder.encode(batch).await; + + assert_eq!(encoded.len(), 2); + } + + #[tokio::test] + async fn test_mixed_batch() { + let (encoder, _handle) = ParallelEncoder::new(2); + + let mut batch: Vec<(u64, Message)> = Vec::new(); + for i in 0..5 { + batch.push((i * 2, create_test_transaction())); + batch.push((i * 2 + 1, create_test_account())); + } + + let encoded = encoder.encode(batch).await; + + assert_eq!(encoded.len(), 10); + } +} diff --git a/yellowstone-grpc-geyser/src/plugin/entry.rs b/yellowstone-grpc-geyser/src/plugin/entry.rs index 7044fb638..57686bd7f 100644 --- a/yellowstone-grpc-geyser/src/plugin/entry.rs +++ b/yellowstone-grpc-geyser/src/plugin/entry.rs @@ -3,6 +3,7 @@ use { config::Config, grpc::GrpcService, metrics::{self, PrometheusService}, + parallel::ParallelEncoder, plugin::message::{ Message, MessageAccount, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransaction, @@ -36,6 +37,7 @@ pub struct PluginInner { grpc_channel: mpsc::UnboundedSender, plugin_cancellation_token: CancellationToken, plugin_task_tracker: TaskTracker, + encoder_handle: std::thread::JoinHandle<()>, } impl PluginInner { @@ -106,6 +108,9 @@ impl GeyserPlugin for Plugin { .build() .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; + let encoder_threads = config.grpc.encoder_threads; + let (encoder, encoder_handle) = ParallelEncoder::new(encoder_threads); + let result = runtime.block_on(async move { let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel(); // Create prometheus service First so if it fails the plugin doesn't spawn geyser tasks unnecessarily. @@ -124,16 +129,23 @@ impl GeyserPlugin for Plugin { is_reload, grpc_cancellation_token, grpc_task_tracker, + encoder, ) .await .map_err(|error| GeyserPluginError::Custom(format!("{error:?}").into()))?; Ok::<_, GeyserPluginError>((snapshot_channel, grpc_channel)) }); - let (snapshot_channel, grpc_channel) = result.inspect_err(|e| { - log::error!("failed to start plugin services: {e}"); - plugin_cancellation_token.cancel(); - })?; + let (snapshot_channel, grpc_channel) = match result { + Ok(val) => val, + Err(e) => { + log::error!("failed to start plugin services: {e}"); + plugin_cancellation_token.cancel(); + // join before returning because encoder already dropped, channel closed + let _ = encoder_handle.join(); + return Err(GeyserPluginError::Custom(format!("{e:?}").into())); + } + }; self.inner = Some(PluginInner { runtime, @@ -142,6 +154,7 @@ impl GeyserPlugin for Plugin { grpc_channel, plugin_cancellation_token, plugin_task_tracker, + encoder_handle, }); Ok(()) @@ -162,6 +175,10 @@ impl GeyserPlugin for Plugin { ); inner.runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); log::info!("tokio runtime shut down in {:?}", now.elapsed()); + if let Err(e) = inner.encoder_handle.join() { + log::error!("encoder thread panicked: {:?}", e); + } + log::info!("plugin shutdown complete"); } } diff --git a/yellowstone-grpc-geyser/src/plugin/filter/encoder.rs b/yellowstone-grpc-geyser/src/plugin/filter/encoder.rs new file mode 100644 index 000000000..3826e8b9b --- /dev/null +++ b/yellowstone-grpc-geyser/src/plugin/filter/encoder.rs @@ -0,0 +1,127 @@ +use { + crate::plugin::{ + filter::message::{prost_bytes_encode_raw, prost_bytes_encoded_len}, + message::{MessageAccountInfo, MessageTransactionInfo}, + }, + bytes::Bytes, +}; + +pub struct TransactionEncoder; + +impl TransactionEncoder { + pub fn pre_encode(tx: &MessageTransactionInfo) { + let len = Self::encoded_len(tx); + let mut buf = Vec::with_capacity(len); + Self::encode_raw(tx, &mut buf); + let _ = tx.pre_encoded.set(Bytes::from(buf)); + } + + fn encode_raw(tx: &MessageTransactionInfo, buf: &mut impl bytes::BufMut) { + use prost::encoding::{encode_key, encode_varint, message, WireType}; + + let index = tx.index as u64; + + encode_key(1u32, WireType::LengthDelimited, buf); + encode_varint(tx.signature.as_ref().len() as u64, buf); + buf.put_slice(tx.signature.as_ref()); + + if tx.is_vote { + prost::encoding::bool::encode(2u32, &tx.is_vote, buf); + } + + message::encode(3u32, &tx.transaction, buf); + message::encode(4u32, &tx.meta, buf); + + if index != 0u64 { + prost::encoding::uint64::encode(5u32, &index, buf); + } + } + + pub fn encoded_len(tx: &MessageTransactionInfo) -> usize { + use prost::encoding::{encoded_len_varint, key_len, message}; + + let index = tx.index as u64; + let sig_len = tx.signature.as_ref().len(); + + key_len(1u32) + + encoded_len_varint(sig_len as u64) + + sig_len + + if tx.is_vote { + prost::encoding::bool::encoded_len(2u32, &tx.is_vote) + } else { + 0 + } + + message::encoded_len(3u32, &tx.transaction) + + message::encoded_len(4u32, &tx.meta) + + if index != 0u64 { + prost::encoding::uint64::encoded_len(5u32, &index) + } else { + 0 + } + } +} + +pub struct AccountEncoder; + +impl AccountEncoder { + pub fn pre_encode(account: &MessageAccountInfo) { + let len = Self::encoded_len(account); + let mut buf = Vec::with_capacity(len); + + prost_bytes_encode_raw(1u32, account.pubkey.as_ref(), &mut buf); + if account.lamports != 0u64 { + ::prost::encoding::uint64::encode(2u32, &account.lamports, &mut buf); + } + prost_bytes_encode_raw(3u32, account.owner.as_ref(), &mut buf); + if account.executable { + ::prost::encoding::bool::encode(4u32, &account.executable, &mut buf); + } + if account.rent_epoch != 0u64 { + ::prost::encoding::uint64::encode(5u32, &account.rent_epoch, &mut buf); + } + if !account.data.is_empty() { + prost_bytes_encode_raw(6u32, &account.data, &mut buf); + } + if account.write_version != 0u64 { + ::prost::encoding::uint64::encode(7u32, &account.write_version, &mut buf); + } + if let Some(value) = &account.txn_signature { + prost_bytes_encode_raw(8u32, value.as_ref(), &mut buf); + } + + let _ = account.pre_encoded.set(Bytes::from(buf)); + } + + pub fn encoded_len(account: &MessageAccountInfo) -> usize { + prost_bytes_encoded_len(1u32, account.pubkey.as_ref()) + + if account.lamports != 0u64 { + ::prost::encoding::uint64::encoded_len(2u32, &account.lamports) + } else { + 0 + } + + prost_bytes_encoded_len(3u32, account.owner.as_ref()) + + if account.executable { + ::prost::encoding::bool::encoded_len(4u32, &account.executable) + } else { + 0 + } + + if account.rent_epoch != 0u64 { + ::prost::encoding::uint64::encoded_len(5u32, &account.rent_epoch) + } else { + 0 + } + + if !account.data.is_empty() { + prost_bytes_encoded_len(6u32, &account.data) + } else { + 0 + } + + if account.write_version != 0u64 { + ::prost::encoding::uint64::encoded_len(7u32, &account.write_version) + } else { + 0 + } + + account + .txn_signature + .map_or(0, |sig| prost_bytes_encoded_len(8u32, sig.as_ref())) + } +} diff --git a/yellowstone-grpc-geyser/src/plugin/filter/filter.rs b/yellowstone-grpc-geyser/src/plugin/filter/filter.rs index 591437afa..1450e76ef 100644 --- a/yellowstone-grpc-geyser/src/plugin/filter/filter.rs +++ b/yellowstone-grpc-geyser/src/plugin/filter/filter.rs @@ -1131,7 +1131,7 @@ mod tests { solana_transaction_status::TransactionStatusMeta, std::{ collections::HashMap, - sync::Arc, + sync::{Arc, OnceLock}, time::{Duration, SystemTime}, }, yellowstone_grpc_proto::geyser::{ @@ -1191,6 +1191,7 @@ mod tests { meta, index: 1, account_keys, + pre_encoded: OnceLock::new(), }), slot: 100, created_at: Timestamp::from(SystemTime::now()), diff --git a/yellowstone-grpc-geyser/src/plugin/filter/message.rs b/yellowstone-grpc-geyser/src/plugin/filter/message.rs index 62ba830fa..985f87a68 100644 --- a/yellowstone-grpc-geyser/src/plugin/filter/message.rs +++ b/yellowstone-grpc-geyser/src/plugin/filter/message.rs @@ -1,9 +1,12 @@ use { - crate::plugin::{ - filter::{name::FilterName, FilterAccountsDataSlice}, - message::{ - MessageAccount, MessageAccountInfo, MessageBlock, MessageBlockMeta, MessageEntry, - MessageSlot, MessageTransaction, MessageTransactionInfo, + crate::{ + metrics, + plugin::{ + filter::{name::FilterName, FilterAccountsDataSlice}, + message::{ + MessageAccount, MessageAccountInfo, MessageBlock, MessageBlockMeta, MessageEntry, + MessageSlot, MessageTransaction, MessageTransactionInfo, + }, }, }, bytes::{ @@ -23,7 +26,7 @@ use { std::{ collections::HashSet, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, OnceLock}, time::SystemTime, }, yellowstone_grpc_proto::{ @@ -44,7 +47,7 @@ pub const fn prost_field_encoded_len(tag: u32, len: usize) -> usize { } #[inline] -fn prost_bytes_encode_raw(tag: u32, value: &[u8], buf: &mut impl BufMut) { +pub fn prost_bytes_encode_raw(tag: u32, value: &[u8], buf: &mut impl BufMut) { encode_key(tag, WireType::LengthDelimited, buf); encode_varint(value.len() as u64, buf); buf.put(value); @@ -286,6 +289,7 @@ impl FilteredUpdate { }, index: msg.index as usize, account_keys: HashSet::new(), + pre_encoded: OnceLock::new(), }), slot: msg.slot, }) @@ -488,6 +492,19 @@ impl FilteredUpdateAccount { data_slice: &FilterAccountsDataSlice, buf: &mut impl BufMut, ) { + // use pre-encoded if: no slicing and pre-encoded exists + if data_slice.as_ref().is_empty() { + if let Some(pre_encoded) = account.get_pre_encoded() { + metrics::pre_encoded_cache_hit("account"); + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(pre_encoded.len() as u64, buf); + buf.put_slice(pre_encoded); + return; + } + metrics::pre_encoded_cache_miss("account"); + } + + // fallback: slice-aware encoding encode_key(tag, WireType::LengthDelimited, buf); encode_varint(Self::account_encoded_len(account, data_slice) as u64, buf); @@ -515,6 +532,14 @@ impl FilteredUpdateAccount { account: &MessageAccountInfo, data_slice: &FilterAccountsDataSlice, ) -> usize { + // use pre-encoded length if: no slicing and pre-encoded exists + if data_slice.as_ref().is_empty() { + if let Some(pre_encoded) = account.get_pre_encoded() { + return pre_encoded.len(); + } + } + + // fallback: calculate with slicing let data_len = data_slice.get_slice_len(&account.data); prost_bytes_encoded_len(1u32, account.pubkey.as_ref()) @@ -659,6 +684,18 @@ impl prost::Message for FilteredUpdateTransaction { impl FilteredUpdateTransaction { fn tx_encode_raw(tag: u32, tx: &MessageTransactionInfo, buf: &mut impl BufMut) { + // try to use pre-encoded bytes (fast path) + if let Some(pre_encoded) = tx.get_pre_encoded() { + metrics::pre_encoded_cache_hit("txn"); + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(pre_encoded.len() as u64, buf); + buf.put_slice(pre_encoded); + return; + } + + metrics::pre_encoded_cache_miss("txn"); + + // fallback: encode from scratch encode_key(tag, WireType::LengthDelimited, buf); encode_varint(Self::tx_encoded_len(tx) as u64, buf); @@ -676,6 +713,11 @@ impl FilteredUpdateTransaction { } fn tx_encoded_len(tx: &MessageTransactionInfo) -> usize { + // try to use pre-encoded length (fast path) + if let Some(pre_encoded) = tx.get_pre_encoded() { + return pre_encoded.len(); + } + let index = tx.index as u64; prost_bytes_encoded_len(1u32, tx.signature.as_ref()) @@ -984,7 +1026,12 @@ pub mod tests { super::{FilteredUpdate, FilteredUpdateBlock, FilteredUpdateFilters, FilteredUpdateOneof}, crate::plugin::{ convert_to, - filter::{name::FilterName, FilterAccountsDataSlice}, + filter::{ + encoder::{AccountEncoder, TransactionEncoder}, + message::{FilteredUpdateAccount, FilteredUpdateTransaction}, + name::FilterName, + FilterAccountsDataSlice, + }, message::{ MessageAccount, MessageAccountInfo, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransaction, MessageTransactionInfo, SlotStatus, @@ -1004,7 +1051,7 @@ pub mod tests { fs, ops::Range, str::FromStr, - sync::Arc, + sync::{Arc, OnceLock}, time::SystemTime, }, yellowstone_grpc_proto::geyser::{SubscribeUpdate, SubscribeUpdateBlockMeta}, @@ -1058,6 +1105,7 @@ pub mod tests { data: Bytes::from(data.clone()), write_version, txn_signature, + pre_encoded: OnceLock::new(), })); } } @@ -1165,6 +1213,7 @@ pub mod tests { meta: convert_to::create_transaction_meta(&tx.meta), index, account_keys: HashSet::new(), + pre_encoded: OnceLock::new(), } }) .map(Arc::new) @@ -1243,6 +1292,88 @@ pub mod tests { ); } + #[test] + fn test_account_pre_encoded_matches_manual_encoding() { + use {bytes::Bytes, solana_pubkey::Pubkey, solana_signature::Signature}; + + // Test various account configurations + let pubkeys = [Pubkey::new_unique(), Pubkey::new_unique()]; + let owners = [Pubkey::new_unique(), Pubkey::new_unique()]; + let data_samples = [ + vec![], + vec![1, 2, 3, 4], + vec![0u8; 1000], // larger account data + ]; + + for pubkey in &pubkeys { + for owner in &owners { + for lamports in [0u64, 100, 1_000_000] { + for executable in [false, true] { + for rent_epoch in [0u64, 100] { + for write_version in [0u64, 42] { + for data in &data_samples { + for txn_signature in [None, Some(Signature::from([0u8; 64]))] { + // Create account WITH pre-encoding + let mut account_with = MessageAccountInfo { + pubkey: *pubkey, + lamports, + owner: *owner, + executable, + rent_epoch, + data: Bytes::from(data.clone()), + write_version, + txn_signature, + pre_encoded: OnceLock::new(), + }; + AccountEncoder::pre_encode(&mut account_with); + + // Create account WITHOUT pre-encoding (fallback path) + let account_without = MessageAccountInfo { + pubkey: *pubkey, + lamports, + owner: *owner, + executable, + rent_epoch, + data: Bytes::from(data.clone()), + write_version, + txn_signature, + pre_encoded: OnceLock::new(), + }; + + // Encode both using FilteredUpdateAccount (no slicing) + let data_slice = FilterAccountsDataSlice::default(); + + let mut buf_with = Vec::new(); + FilteredUpdateAccount::account_encode_raw( + 1u32, + &account_with, + &data_slice, + &mut buf_with, + ); + + let mut buf_without = Vec::new(); + FilteredUpdateAccount::account_encode_raw( + 1u32, + &account_without, + &data_slice, + &mut buf_without, + ); + + // Must be identical + assert_eq!( + buf_with, buf_without, + "pre-encoded bytes don't match manual encoding" + ); + } + } + } + } + } + } + } + } + } + #[test] fn test_message_account() { for (msg, data_slice) in create_accounts() { @@ -1288,6 +1419,68 @@ pub mod tests { } } + #[test] + fn test_pre_encoded_matches_manual_encoding() { + // Get real transactions from fixtures (these have pre_encoded: None) + for tx_arc in load_predefined_transactions() { + // Clone the transaction info + let mut tx_with_cache = MessageTransactionInfo { + signature: tx_arc.signature, + is_vote: tx_arc.is_vote, + transaction: tx_arc.transaction.clone(), + meta: tx_arc.meta.clone(), + index: tx_arc.index, + account_keys: tx_arc.account_keys.clone(), + pre_encoded: OnceLock::new(), + }; + + // Create version without cache (fallback path) + let tx_without_cache = MessageTransactionInfo { + signature: tx_arc.signature, + is_vote: tx_arc.is_vote, + transaction: tx_arc.transaction.clone(), + meta: tx_arc.meta.clone(), + index: tx_arc.index, + account_keys: tx_arc.account_keys.clone(), + pre_encoded: OnceLock::new(), + }; + + // Pre-encode one of them + TransactionEncoder::pre_encode(&mut tx_with_cache); + + assert!( + tx_with_cache.pre_encoded.get().is_some(), + "pre_encode should populate the field" + ); + + // Wrap both in FilteredUpdateTransaction and encode via the public Message trait + let wrapped_cached = FilteredUpdateTransaction { + transaction: Arc::new(tx_with_cache), + slot: 42, + }; + let wrapped_manual = FilteredUpdateTransaction { + transaction: Arc::new(tx_without_cache), + slot: 42, + }; + + // Encode both using prost::Message::encode_to_vec (public API) + let buf_cached = wrapped_cached.encode_to_vec(); + let buf_manual = wrapped_manual.encode_to_vec(); + + // They must be identical + assert_eq!( + buf_cached, buf_manual, + "Pre-encoded bytes differ from manual encoding for tx {:?}", + tx_arc.signature + ); + } + + println!( + "Tested {} transactions - all match!", + load_predefined_transactions().len() + ); + } + #[test] fn test_message_transaction() { for transaction in load_predefined_transactions() { diff --git a/yellowstone-grpc-geyser/src/plugin/filter/mod.rs b/yellowstone-grpc-geyser/src/plugin/filter/mod.rs index a3323f2a5..f22c3d001 100644 --- a/yellowstone-grpc-geyser/src/plugin/filter/mod.rs +++ b/yellowstone-grpc-geyser/src/plugin/filter/mod.rs @@ -1,3 +1,4 @@ +pub mod encoder; #[allow(clippy::module_inception)] mod filter; pub mod limits; diff --git a/yellowstone-grpc-geyser/src/plugin/message.rs b/yellowstone-grpc-geyser/src/plugin/message.rs index 092cb0dc0..cee44b09b 100644 --- a/yellowstone-grpc-geyser/src/plugin/message.rs +++ b/yellowstone-grpc-geyser/src/plugin/message.rs @@ -13,7 +13,7 @@ use { std::{ collections::HashSet, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, OnceLock}, time::SystemTime, }, yellowstone_grpc_proto::{ @@ -194,6 +194,7 @@ pub struct MessageAccountInfo { pub data: Bytes, pub write_version: u64, pub txn_signature: Option, + pub pre_encoded: OnceLock, } impl MessageAccountInfo { @@ -209,6 +210,7 @@ impl MessageAccountInfo { data, write_version: info.write_version, txn_signature: info.txn.map(|txn| *txn.signature()), + pre_encoded: OnceLock::new(), } } @@ -227,8 +229,13 @@ impl MessageAccountInfo { Signature::try_from(sig.as_slice()).map_err(|_| "invalid signature length") }) .transpose()?, + pre_encoded: OnceLock::new(), }) } + + pub fn get_pre_encoded(&self) -> Option<&Bytes> { + self.pre_encoded.get() + } } #[derive(Debug, Clone, PartialEq)] @@ -272,6 +279,7 @@ pub struct MessageTransactionInfo { pub meta: confirmed_block::TransactionStatusMeta, pub index: usize, pub account_keys: HashSet, + pub pre_encoded: OnceLock, } impl MessageTransactionInfo { @@ -303,6 +311,7 @@ impl MessageTransactionInfo { meta: convert_to::create_transaction_meta(info.transaction_status_meta), index: info.index, account_keys, + pre_encoded: OnceLock::new(), } } @@ -317,6 +326,7 @@ impl MessageTransactionInfo { meta: msg.meta.ok_or("meta message should be defined")?, index: msg.index as usize, account_keys: HashSet::new(), + pre_encoded: OnceLock::new(), }) } @@ -350,6 +360,11 @@ impl MessageTransactionInfo { self.account_keys = account_keys; Ok(()) } + + #[inline] + pub fn get_pre_encoded(&self) -> Option<&Bytes> { + self.pre_encoded.get() + } } #[derive(Debug, Clone, PartialEq)]