1717 } ,
1818 log:: { debug, error, info} ,
1919 prometheus:: {
20- Histogram , HistogramOpts , IntCounterVec , IntGauge , IntGaugeVec , Opts , Registry , TextEncoder ,
20+ Histogram , HistogramOpts , IntCounter , IntCounterVec , IntGauge , IntGaugeVec , Opts , Registry ,
21+ TextEncoder ,
2122 } ,
2223 solana_clock:: Slot ,
2324 std:: {
@@ -188,6 +189,34 @@ static ref FALLBACK_ENCODE_TIME_US: Histogram = Histogram::with_opts(
188189 )
189190 . buckets( vec![ 1.0 , 5.0 , 10.0 , 25.0 , 50.0 , 100.0 , 250.0 , 500.0 ] )
190191) . unwrap( ) ;
192+
193+ static ref GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION : IntGaugeVec = IntGaugeVec :: new(
194+ Opts :: new(
195+ "grpc_concurrent_subscribe_per_tcp_connection" ,
196+ "Current concurrent subscriptions per remote TCP peer socket address"
197+ ) ,
198+ & [ "remote_peer_sk_addr" ]
199+ ) . unwrap( ) ;
200+
201+ static ref TOTAL_TRAFFIC_SENT : IntCounter = IntCounter :: new(
202+ "total_traffic_sent_bytes" ,
203+ "Total traffic sent to subscriber by type (account_update, block_meta, etc)"
204+ ) . unwrap( ) ;
205+
206+ static ref TRAFFIC_SENT_PER_REMOTE_IP : IntCounterVec = IntCounterVec :: new(
207+ Opts :: new( "traffic_sent_per_remote_ip_bytes" , "Total traffic sent to subscriber by remote IP" ) ,
208+ & [ "remote_ip" ]
209+ ) . unwrap( ) ;
210+
211+ static ref GRPC_METHOD_CALL_COUNT : IntCounterVec = IntCounterVec :: new(
212+ Opts :: new( "yellowstone_grpc_method_call_count" , "Total number of calls to GetVersion gRPC method" ) ,
213+ & [ "method" ]
214+ ) . unwrap( ) ;
215+
216+ static ref GRPC_SERVICE_OUTBOUND_BYTES : IntGaugeVec = IntGaugeVec :: new(
217+ Opts :: new( "yellowstone_grpc_service_outbound_bytes" , "Current emitted bytes by tonic service response bodies per active subscriber stream" ) ,
218+ & [ "subscriber_id" ]
219+ ) . unwrap( ) ;
191220}
192221
193222#[ derive( Debug ) ]
@@ -350,6 +379,11 @@ impl PrometheusService {
350379 register ! ( PARALLEL_ENCODER_QUEUE_DEPTH ) ;
351380 register ! ( FAST_PATH_TIME_US ) ;
352381 register ! ( FALLBACK_ENCODE_TIME_US ) ;
382+ register ! ( GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION ) ;
383+ register ! ( TOTAL_TRAFFIC_SENT ) ;
384+ register ! ( TRAFFIC_SENT_PER_REMOTE_IP ) ;
385+ register ! ( GRPC_METHOD_CALL_COUNT ) ;
386+ register ! ( GRPC_SERVICE_OUTBOUND_BYTES ) ;
353387
354388 VERSION
355389 . with_label_values ( & [
@@ -612,6 +646,58 @@ pub fn observe_fallback_encode_time_us(latency: f64) {
612646 FALLBACK_ENCODE_TIME_US . observe ( latency) ;
613647}
614648
649+ pub fn incr_grpc_method_call_count < S : AsRef < str > > ( method : S ) {
650+ GRPC_METHOD_CALL_COUNT
651+ . with_label_values ( & [ method. as_ref ( ) ] )
652+ . inc ( ) ;
653+ }
654+
655+ pub fn add_grpc_service_outbound_bytes < S : AsRef < str > > ( subscriber_id : S , bytes : u64 ) {
656+ GRPC_SERVICE_OUTBOUND_BYTES
657+ . with_label_values ( & [ subscriber_id. as_ref ( ) ] )
658+ . add ( bytes as i64 ) ;
659+ }
660+
661+ pub fn reset_grpc_service_outbound_bytes < S : AsRef < str > > ( subscriber_id : S ) {
662+ GRPC_SERVICE_OUTBOUND_BYTES
663+ . with_label_values ( & [ subscriber_id. as_ref ( ) ] )
664+ . set ( 0 ) ;
665+ }
666+
667+ pub fn add_total_traffic_sent ( bytes : u64 ) {
668+ TOTAL_TRAFFIC_SENT . inc_by ( bytes) ;
669+ }
670+
671+ pub fn add_traffic_sent_per_remote_ip < S : AsRef < str > > ( remote_ip : S , bytes : u64 ) {
672+ TRAFFIC_SENT_PER_REMOTE_IP
673+ . with_label_values ( & [ remote_ip. as_ref ( ) ] )
674+ . inc_by ( bytes) ;
675+ }
676+
677+ pub fn reset_traffic_sent_per_remote_ip < S : AsRef < str > > ( remote_ip : S ) {
678+ TRAFFIC_SENT_PER_REMOTE_IP
679+ . with_label_values ( & [ remote_ip. as_ref ( ) ] )
680+ . reset ( ) ;
681+ TRAFFIC_SENT_PER_REMOTE_IP
682+ . remove_label_values ( & [ remote_ip. as_ref ( ) ] )
683+ . expect ( "remove_label_values" ) ;
684+ }
685+
686+ pub fn set_grpc_concurrent_subscribe_per_tcp_connection < S : AsRef < str > > (
687+ remote_peer_sk_addr : S ,
688+ size : u64 ,
689+ ) {
690+ GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION
691+ . with_label_values ( & [ remote_peer_sk_addr. as_ref ( ) ] )
692+ . set ( size as i64 ) ;
693+ }
694+
695+ pub fn remove_grpc_concurrent_subscribe_per_tcp_connection < S : AsRef < str > > ( remote_peer_sk_addr : S ) {
696+ GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION
697+ . remove_label_values ( & [ remote_peer_sk_addr. as_ref ( ) ] )
698+ . expect ( "remove_label_values" ) ;
699+ }
700+
615701/// Reset all metrics on plugin unload to prevent metric accumulation across plugin lifecycle
616702pub fn reset_metrics ( ) {
617703 // Reset gauge metrics to 0
@@ -632,6 +718,11 @@ pub fn reset_metrics() {
632718 GRPC_MESSAGE_SENT . reset ( ) ;
633719 GRPC_BYTES_SENT . reset ( ) ;
634720 GRPC_CLIENT_DISCONNECTS . reset ( ) ;
721+ GRPC_CONCURRENT_SUBSCRIBE_PER_TCP_CONNECTION . reset ( ) ;
722+ TOTAL_TRAFFIC_SENT . reset ( ) ;
723+ TRAFFIC_SENT_PER_REMOTE_IP . reset ( ) ;
724+ GRPC_SERVICE_OUTBOUND_BYTES . reset ( ) ;
725+ GRPC_METHOD_CALL_COUNT . reset ( ) ;
635726
636727 // Note: VERSION and GEYSER_ACCOUNT_UPDATE_RECEIVED are intentionally not reset
637728 // - VERSION contains build info set once on startup
0 commit comments