Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The minor version will be incremented upon a breaking change and the patch versi
- yellowstone-grpc-proto-12.1.0

### Fixes
- geyser: replace Arc::get_mut with OnceLock for pre-encoding to support shared ownership, added metrics for pre-encode hit/miss ([#683](https://github.com/rpcpool/yellowstone-grpc/pull/683))

- geyser: replace `OnDrop` with `ClientSession` RAII guard in `client_loop`, fixing missed cleanup on early-exit paths ([#687](https://github.com/rpcpool/yellowstone-grpc/pull/687)), ([#690](https://github.com/rpcpool/yellowstone-grpc/pull/690))
- geyser: fix ping and traffic metric ([#698](https://github.com/rpcpool/yellowstone-grpc/pull/698))
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ prost = "0.14.0"
prost-types = "0.14.0"
prost_011 = { package = "prost", version = "0.11.9" }
protoc-bin-vendored = "3.2.0"
rayon = "1.11.0"
serde = "1.0.145"
serde_json = "1.0.86"
smallvec = "1.15.1"
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pin-project = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ pub struct ConfigGrpc {
deserialize_with = "deserialize_int_str"
)]
pub replay_stored_slots: u64,
/// Number of threads for parallel encoding
#[serde(
default = "ConfigGrpc::encoder_threads_default",
deserialize_with = "deserialize_int_str"
)]
pub encoder_threads: usize,
#[serde(default)]
pub server_http2_adaptive_window: Option<bool>,
#[serde(default, with = "humantime_serde")]
Expand Down Expand Up @@ -272,6 +278,10 @@ impl ConfigGrpc {
const fn default_replay_stored_slots() -> u64 {
0
}

const fn encoder_threads_default() -> usize {
4
}
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
18 changes: 14 additions & 4 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use {
metered::MeteredLayer,
metrics::{
self, incr_grpc_method_call_count, set_subscriber_queue_size,
subscription_limit_exceeded_inc, DebugClientMessage,
subscription_limit_exceeded_inc, DebugClientMessage, GEYSER_BATCH_SIZE,
},
parallel::ParallelEncoder,
plugin::{
filter::{
limits::FilterLimits,
Expand Down Expand Up @@ -516,6 +517,7 @@ impl GrpcService {
is_reload: bool,
service_cancellation_token: CancellationToken,
task_tracker: TaskTracker,
parallel_encoder: ParallelEncoder,
) -> anyhow::Result<(
Option<crossbeam_channel::Sender<Box<Message>>>,
mpsc::UnboundedSender<Message>,
Expand Down Expand Up @@ -637,6 +639,7 @@ impl GrpcService {
replay_stored_slots_rx,
replay_first_available_slot,
config.replay_stored_slots,
parallel_encoder,
)
.await;
});
Expand Down Expand Up @@ -679,6 +682,7 @@ impl GrpcService {
replay_stored_slots_rx: Option<mpsc::Receiver<ReplayStoredSlotsRequest>>,
replay_first_available_slot: Option<Arc<AtomicU64>>,
replay_stored_slots: u64,
parallel_encoder: ParallelEncoder,
) {
const PROCESSED_MESSAGES_MAX: usize = 31;
const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10);
Expand Down Expand Up @@ -923,8 +927,10 @@ impl GrpcService {

// processed
processed_messages.push(message.clone());
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
let encoded = parallel_encoder.encode(processed_messages).await;
let _ =
broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
broadcast_tx.send((CommitmentLevel::Processed, encoded.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
Expand Down Expand Up @@ -962,8 +968,10 @@ impl GrpcService {
|| !confirmed_messages.is_empty()
|| !finalized_messages.is_empty()
{
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
let encoded = parallel_encoder.encode(processed_messages).await;
let _ = broadcast_tx
.send((CommitmentLevel::Processed, processed_messages.into()));
.send((CommitmentLevel::Processed, encoded.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
Expand All @@ -984,7 +992,9 @@ impl GrpcService {
}
() = &mut processed_sleep => {
if !processed_messages.is_empty() {
let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
GEYSER_BATCH_SIZE.observe(processed_messages.len() as f64);
let encoded = parallel_encoder.encode(processed_messages).await;
let _ = broadcast_tx.send((CommitmentLevel::Processed, encoded.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
}
processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod config;
pub mod grpc;
pub mod metered;
pub mod metrics;
pub mod parallel;
pub mod plugin;
pub mod transport;
pub(crate) mod util;
Expand Down
157 changes: 97 additions & 60 deletions yellowstone-grpc-geyser/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ lazy_static::lazy_static! {
&["subscriber_id"]
).unwrap();

static ref GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD: IntGaugeVec = IntGaugeVec::new(
Opts::new(
"grpc_subscriber_send_bandwidth_load",
"Current Send load we send to subscriber channel (in bytes per second)"
),
&["subscriber_id"]
).unwrap();

static ref GRPC_SUBSCRIBER_QUEUE_SIZE: IntGaugeVec = IntGaugeVec::new(
Opts::new(
Expand All @@ -105,14 +112,6 @@ lazy_static::lazy_static! {
&["subscriber_id", "reason"]
).unwrap();

static ref GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION: IntGaugeVec = IntGaugeVec::new(
Opts::new(
"grpc_concurrent_subscribe_per_tcp_connection",
"Current concurrent subscriptions per remote TCP peer socket address"
),
&["remote_peer_sk_addr"]
).unwrap();

static ref GEYSER_ACCOUNT_UPDATE_RECEIVED: Histogram = Histogram::with_opts(
HistogramOpts::new(
"geyser_account_update_data_size_kib",
Expand All @@ -121,25 +120,44 @@ lazy_static::lazy_static! {
.buckets(vec![5.0, 10.0, 20.0, 30.0, 50.0, 100.0, 200.0, 300.0, 500.0, 1000.0, 2000.0, 3000.0, 5000.0, 10000.0])
).unwrap();

pub static ref GEYSER_BATCH_SIZE: Histogram = Histogram::with_opts(
HistogramOpts::new(
"yellowstone_geyser_batch_size",
"Size of processed message batches"
)
.buckets(vec![1.0, 4.0, 8.0, 16.0, 24.0, 31.0])
).unwrap();

static ref PRE_ENCODED_CACHE_HIT: IntCounterVec = IntCounterVec::new(
Opts::new("yellowstone_grpc_pre_encoded_cache_hit", "Pre-encoded cache hits by message type"),
&["type"]
).unwrap();

static ref PRE_ENCODED_CACHE_MISS: IntCounterVec = IntCounterVec::new(
Opts::new("yellowstone_grpc_pre_encoded_cache_miss", "Pre-encoded cache misses by message type"),
&["type"]
).unwrap();

static ref GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION: IntGaugeVec = IntGaugeVec::new(
Opts::new(
"grpc_concurrent_subscribe_per_tcp_connection",
"Current concurrent subscriptions per remote TCP peer socket address"
),
&["remote_peer_sk_addr"]
).unwrap();

static ref TOTAL_TRAFFIC_SENT: IntCounter = IntCounter::new(
"total_traffic_sent_bytes",
"Total traffic sent to subscriber by type (account_update, block_meta, etc)"
).unwrap();

static ref TRAFFIC_SENT_PER_REMOTE_IP: IntCounterVec = IntCounterVec::new(
Opts::new(
"traffic_sent_per_remote_ip_bytes",
"Total traffic sent to subscriber by remote IP"
),
Opts::new("traffic_sent_per_remote_ip_bytes", "Total traffic sent to subscriber by remote IP"),
&["remote_ip"]
).unwrap();


static ref GRPC_METHOD_CALL_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new(
"yellowstone_grpc_method_call_count",
"Total number of calls to GetVersion gRPC method"
),
Opts::new("yellowstone_grpc_method_call_count", "Total number of calls to GetVersion gRPC method"),
&["method"]
).unwrap();

Expand All @@ -152,50 +170,9 @@ lazy_static::lazy_static! {
).unwrap();

static ref GRPC_SERVICE_OUTBOUND_BYTES: IntGaugeVec = IntGaugeVec::new(
Opts::new(
"yellowstone_grpc_service_outbound_bytes",
"Current emitted bytes by tonic service response bodies per active subscriber stream"
),
Opts::new("yellowstone_grpc_service_outbound_bytes", "Current emitted bytes by tonic service response bodies per active subscriber stream"),
&["subscriber_id"]
).unwrap();

}

pub fn incr_grpc_method_call_count<S: AsRef<str>>(method: S) {
GRPC_METHOD_CALL_COUNT
.with_label_values(&[method.as_ref()])
.inc();
}

pub fn add_grpc_service_outbound_bytes<S: AsRef<str>>(subscriber_id: S, bytes: u64) {
GRPC_SERVICE_OUTBOUND_BYTES
.with_label_values(&[subscriber_id.as_ref()])
.add(bytes as i64);
}

pub fn reset_grpc_service_outbound_bytes<S: AsRef<str>>(subscriber_id: S) {
GRPC_SERVICE_OUTBOUND_BYTES
.with_label_values(&[subscriber_id.as_ref()])
.set(0);
}

pub fn add_traffic_sent_per_remote_ip<S: AsRef<str>>(remote_ip: S, bytes: u64) {
TRAFFIC_SENT_PER_REMOTE_IP
.with_label_values(&[remote_ip.as_ref()])
.inc_by(bytes);
}

pub fn reset_traffic_sent_per_remote_ip<S: AsRef<str>>(remote_ip: S) {
TRAFFIC_SENT_PER_REMOTE_IP
.with_label_values(&[remote_ip.as_ref()])
.reset();
TRAFFIC_SENT_PER_REMOTE_IP
.remove_label_values(&[remote_ip.as_ref()])
.expect("remove_label_values")
}

pub fn add_total_traffic_sent(bytes: u64) {
TOTAL_TRAFFIC_SENT.inc_by(bytes);
}

#[derive(Debug)]
Expand Down Expand Up @@ -344,8 +321,12 @@ impl PrometheusService {
register!(GRPC_MESSAGE_SENT);
register!(GRPC_BYTES_SENT);
register!(GEYSER_ACCOUNT_UPDATE_RECEIVED);
register!(GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD);
register!(GRPC_SUBSCRIBER_QUEUE_SIZE);
register!(GEYSER_BATCH_SIZE);
register!(GRPC_CLIENT_DISCONNECTS);
register!(PRE_ENCODED_CACHE_HIT);
register!(PRE_ENCODED_CACHE_MISS);
register!(GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION);
register!(TOTAL_TRAFFIC_SENT);
register!(TRAFFIC_SENT_PER_REMOTE_IP);
Expand Down Expand Up @@ -552,6 +533,12 @@ pub fn observe_geyser_account_update_received(data_bytesize: usize) {
GEYSER_ACCOUNT_UPDATE_RECEIVED.observe(data_bytesize as f64 / 1024.0);
}

pub fn set_subscriber_send_bandwidth_load<S: AsRef<str>>(subscriber_id: S, load: i64) {
GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD
.with_label_values(&[subscriber_id.as_ref()])
.set(load);
}

pub fn set_subscriber_queue_size<S: AsRef<str>>(subscriber_id: S, size: u64) {
GRPC_SUBSCRIBER_QUEUE_SIZE
.with_label_values(&[subscriber_id.as_ref()])
Expand All @@ -564,6 +551,51 @@ pub fn incr_client_disconnect<S: AsRef<str>>(subscriber_id: S, reason: &str) {
.inc();
}

pub fn pre_encoded_cache_hit(msg_type: &str) {
PRE_ENCODED_CACHE_HIT.with_label_values(&[msg_type]).inc();
}

pub fn pre_encoded_cache_miss(msg_type: &str) {
PRE_ENCODED_CACHE_MISS.with_label_values(&[msg_type]).inc();
}

pub fn incr_grpc_method_call_count<S: AsRef<str>>(method: S) {
GRPC_METHOD_CALL_COUNT
.with_label_values(&[method.as_ref()])
.inc();
}

pub fn add_grpc_service_outbound_bytes<S: AsRef<str>>(subscriber_id: S, bytes: u64) {
GRPC_SERVICE_OUTBOUND_BYTES
.with_label_values(&[subscriber_id.as_ref()])
.add(bytes as i64);
}

pub fn reset_grpc_service_outbound_bytes<S: AsRef<str>>(subscriber_id: S) {
GRPC_SERVICE_OUTBOUND_BYTES
.with_label_values(&[subscriber_id.as_ref()])
.set(0);
}

pub fn add_total_traffic_sent(bytes: u64) {
TOTAL_TRAFFIC_SENT.inc_by(bytes);
}

pub fn add_traffic_sent_per_remote_ip<S: AsRef<str>>(remote_ip: S, bytes: u64) {
TRAFFIC_SENT_PER_REMOTE_IP
.with_label_values(&[remote_ip.as_ref()])
.inc_by(bytes);
}

pub fn reset_traffic_sent_per_remote_ip<S: AsRef<str>>(remote_ip: S) {
TRAFFIC_SENT_PER_REMOTE_IP
.with_label_values(&[remote_ip.as_ref()])
.reset();
TRAFFIC_SENT_PER_REMOTE_IP
.remove_label_values(&[remote_ip.as_ref()])
.expect("remove_label_values");
}

pub fn set_grpc_concurrent_subscribe_per_tcp_connection<S: AsRef<str>>(
remote_peer_sk_addr: S,
size: u64,
Expand All @@ -590,20 +622,25 @@ pub fn reset_metrics() {
SLOT_STATUS.reset();
SLOT_STATUS_PLUGIN.reset();
INVALID_FULL_BLOCKS.reset();
GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD.reset();
GRPC_SUBSCRIBER_QUEUE_SIZE.reset();
GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION.reset();

// Reset counter vectors (clears all label combinations)
MISSED_STATUS_MESSAGE.reset();
GRPC_MESSAGE_SENT.reset();
GRPC_BYTES_SENT.reset();
GRPC_CLIENT_DISCONNECTS.reset();
GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION.reset();
TOTAL_TRAFFIC_SENT.reset();
TRAFFIC_SENT_PER_REMOTE_IP.reset();
GRPC_SERVICE_OUTBOUND_BYTES.reset();
GRPC_SUBSCRIPTION_LIMIT_EXCEEDED.reset();
GRPC_METHOD_CALL_COUNT.reset();

// Pre-encoding
PRE_ENCODED_CACHE_HIT.reset();
PRE_ENCODED_CACHE_MISS.reset();

// Note: VERSION and GEYSER_ACCOUNT_UPDATE_RECEIVED are intentionally not reset
// - VERSION contains build info set once on startup
// - GEYSER_ACCOUNT_UPDATE_RECEIVED is a Histogram which doesn't support reset()
Expand Down
Loading