diff --git a/Cargo.lock b/Cargo.lock index f77aa67..4c5cb80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7157,7 +7157,7 @@ dependencies = [ [[package]] name = "yellowstone-jet" -version = "14.9.3" +version = "14.9.4" dependencies = [ "anyhow", "async-trait", @@ -7248,7 +7248,7 @@ dependencies = [ [[package]] name = "yellowstone-jet-tpu-client" -version = "0.3.0" +version = "0.3.1" dependencies = [ "async-trait", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index e9444be..478e32d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,7 +109,7 @@ solana-transaction-error = "3.0.0" # Yellowstone yellowstone-grpc-client = "10.2.0" yellowstone-grpc-proto = "10.1.1" -yellowstone-jet-tpu-client = { path = "crates/tpu-client", version = "0.3.0" } +yellowstone-jet-tpu-client = { path = "crates/tpu-client", version = "0.3.1" } yellowstone-shield-store = "0.9.1" [workspace.lints.clippy] diff --git a/apps/jet/Cargo.toml b/apps/jet/Cargo.toml index f42bd1c..694a1ba 100644 --- a/apps/jet/Cargo.toml +++ b/apps/jet/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-jet" -version = "14.9.3" +version = "14.9.4" description = "Yellowstone Jet" edition.workspace = true authors.workspace = true diff --git a/apps/jet/src/bin/jet.rs b/apps/jet/src/bin/jet.rs index ba7a9cd..f597eb8 100644 --- a/apps/jet/src/bin/jet.rs +++ b/apps/jet/src/bin/jet.rs @@ -430,6 +430,7 @@ async fn run_jet( config.listen_solana_like.bind[0], RpcServerType::SolanaLike { tx_handler: tx_handler.clone(), + log_invalid_txn: config.log_invalid_txn, }, ) .await; @@ -447,7 +448,10 @@ async fn run_jet( info!("starting jet-gateway listener"); let stake_info = stake_info_map.clone(); let jet_gw_identity = initial_identity.insecure_clone(); - let tx_sender = RpcServer::create_solana_like_rpc_server_impl(tx_handler); + let tx_sender = RpcServer::create_solana_like_rpc_server_impl( + tx_handler, + config.log_invalid_txn, + ); let (jet_gw_identity_updater, jet_gw_fut) = spawn_jet_gw_listener( stake_info, jet_gw_config, diff --git a/apps/jet/src/bin/lite-jet.rs b/apps/jet/src/bin/lite-jet.rs index e6b2cbd..bd98e9f 100644 --- a/apps/jet/src/bin/lite-jet.rs +++ b/apps/jet/src/bin/lite-jet.rs @@ -261,6 +261,7 @@ async fn run_jet( config.listen_solana_like.bind[0], RpcServerType::SolanaLike { tx_handler: tx_handler.clone(), + log_invalid_txn: config.log_invalid_txn, }, ) .await; @@ -280,7 +281,10 @@ async fn run_jet( info!("starting jet-gateway listener"); let stake_info = stake_info_map.clone(); let jet_gw_identity = initial_identity.insecure_clone(); - let tx_sender = RpcServer::create_solana_like_rpc_server_impl(tx_handler); + let tx_sender = RpcServer::create_solana_like_rpc_server_impl( + tx_handler, + config.log_invalid_txn, + ); let (jet_gw_identity_updater, jet_gw_fut) = spawn_jet_gw_listener( stake_info, jet_gw_config, diff --git a/apps/jet/src/config.rs b/apps/jet/src/config.rs index 423a38a..9dee26a 100644 --- a/apps/jet/src/config.rs +++ b/apps/jet/src/config.rs @@ -83,6 +83,11 @@ pub struct ConfigJet { /// Shield Program ID (Optional, default to yellowstone-shield-store default) #[serde(default, deserialize_with = "ConfigJet::deserialize_maybe_program_id")] pub program_id: Option, + + /// Whether to log invalid transactions in RPC handler + /// This is useful for debugging transaction handling errors, but may cause log spam if there are many invalid transactions. + #[serde(default)] + pub log_invalid_txn: bool, } impl ConfigJet { diff --git a/apps/jet/src/grpc_jet.rs b/apps/jet/src/grpc_jet.rs index ea62d4c..ed27bfe 100644 --- a/apps/jet/src/grpc_jet.rs +++ b/apps/jet/src/grpc_jet.rs @@ -85,9 +85,16 @@ impl GrpcTransactionHandler { transaction: SubscribeTransaction, ) -> Result<(), TransactionHandlerError> { let payload = TransactionPayload::try_from(transaction) + .inspect_err(|e| { + error!(?e, "failed to parse transaction payload"); + metrics::jet::increment_transaction_deserialize_error(e.variant_name()); + }) .map_err(|e| TransactionHandlerError::PayloadParseError(e.to_string()))?; let (transaction, config_option) = TransactionDecoder::decode(&payload) + .inspect_err(|e| { + metrics::jet::increment_transaction_decode_error(e.variant_name()); + }) .map_err(|e| TransactionHandlerError::DecodeError(e.to_string()))?; let config = config_option.unwrap_or_default(); diff --git a/apps/jet/src/metrics.rs b/apps/jet/src/metrics.rs index 3dcd808..d01c180 100644 --- a/apps/jet/src/metrics.rs +++ b/apps/jet/src/metrics.rs @@ -243,6 +243,11 @@ pub mod jet { ]) ).unwrap(); + + static ref VERSIONED_TXN_HANDLE_ERROR: IntCounterVec = IntCounterVec::new( + Opts::new("transaction_handler_error_total", "Number of errors in transaction handler by type"), + &["error"] + ).unwrap(); } pub fn init() { @@ -291,12 +296,19 @@ pub mod jet { register!(BLOCK_META_EMISSIONS_COUNT); register!(GRPC_MESSAGES_PROCESSED_RATE); register!(NEW_SLOT_ARRIVAL_INTERVAL); + register!(VERSIONED_TXN_HANDLE_ERROR); yellowstone_jet_tpu_client::prom::register_metrics(®ISTRY); grpc_lewis::prom::register_metrics(®ISTRY); }); } + pub fn incr_versioned_txn_handler_error(error_type: &str) { + VERSIONED_TXN_HANDLE_ERROR + .with_label_values(&[error_type]) + .inc(); + } + pub fn incr_send_tx_attempt(leader: Pubkey) { SEND_TRANSACTION_ATTEMPT .with_label_values(&[&leader.to_string()]) diff --git a/apps/jet/src/payload.rs b/apps/jet/src/payload.rs index ae9741c..1345dcd 100644 --- a/apps/jet/src/payload.rs +++ b/apps/jet/src/payload.rs @@ -95,6 +95,21 @@ pub enum PayloadError { ProtoConversionError(String), } +impl PayloadError { + pub const fn variant_name(&self) -> &'static str { + match self { + PayloadError::EmptyPayload => "EmptyPayload", + PayloadError::LegacyDeserialize(_) => "LegacyDeserialize", + PayloadError::Base58Decode(_) => "Base58Decode", + PayloadError::Base64Decode(_) => "Base64Decode", + PayloadError::UnsupportedEncoding => "UnsupportedEncoding", + PayloadError::BincodeError(_) => "BincodeError", + PayloadError::InvalidPubkey(_) => "InvalidPubkey", + PayloadError::ProtoConversionError(_) => "ProtoConversionError", + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct LegacyPayload { pub transaction: String, // base58/base64 encoded transaction diff --git a/apps/jet/src/rpc.rs b/apps/jet/src/rpc.rs index 26c1e3d..1ef7782 100644 --- a/apps/jet/src/rpc.rs +++ b/apps/jet/src/rpc.rs @@ -36,6 +36,7 @@ pub enum RpcServerType { SolanaLike { tx_handler: TransactionHandler, + log_invalid_txn: bool, }, } @@ -113,10 +114,14 @@ impl RpcServer { ) }) } - RpcServerType::SolanaLike { tx_handler } => { + RpcServerType::SolanaLike { + tx_handler, + log_invalid_txn, + } => { use rpc_solana_like::RpcServer; - let rpc_server_impl = Self::create_solana_like_rpc_server_impl(tx_handler); + let rpc_server_impl = + Self::create_solana_like_rpc_server_impl(tx_handler, log_invalid_txn); let server_config = ServerConfigBuilder::default() .max_request_body_size(MAX_REQUEST_BODY_SIZE) .build(); @@ -138,8 +143,12 @@ impl RpcServer { pub const fn create_solana_like_rpc_server_impl( tx_handler: TransactionHandler, + log_invalid_txn: bool, ) -> rpc_solana_like::RpcServerImpl { - rpc_solana_like::RpcServerImpl { tx_handler } + rpc_solana_like::RpcServerImpl { + tx_handler, + log_invalid_txn, + } } pub fn shutdown(self) { @@ -281,7 +290,7 @@ pub mod rpc_admin { pub mod rpc_solana_like { use { crate::{ - payload::JetRpcSendTransactionConfig, rpc::invalid_params, + metrics, payload::JetRpcSendTransactionConfig, rpc::invalid_params, solana::decode_and_deserialize, transaction_handler::TransactionHandler, }, jsonrpsee::{ @@ -309,6 +318,7 @@ pub mod rpc_solana_like { #[derive(Clone)] pub struct RpcServerImpl { + pub log_invalid_txn: bool, pub tx_handler: TransactionHandler, } @@ -320,10 +330,23 @@ pub mod rpc_solana_like { config: JetRpcSendTransactionConfig, ) -> RpcResult { debug!("handling internal versioned transaction"); - + let maybe_txn_sig = transaction.signatures.first().cloned(); self.tx_handler .handle_versioned_transaction(transaction, config) .await + .inspect_err(|e| { + let name = e.variant_name(); + if self.log_invalid_txn + && let Some(signature) = maybe_txn_sig + { + tracing::warn!( + error = %e, + signature = signature.to_string(), + category = "txn_handle_error", + ); + } + metrics::jet::incr_versioned_txn_handler_error(name); + }) .map_err(Into::into) } } diff --git a/apps/jet/src/transaction_handler.rs b/apps/jet/src/transaction_handler.rs index 16e39db..075e1f4 100644 --- a/apps/jet/src/transaction_handler.rs +++ b/apps/jet/src/transaction_handler.rs @@ -34,6 +34,18 @@ pub enum TransactionHandlerError { UnsupportedEncoding, } +impl TransactionHandlerError { + pub const fn variant_name(&self) -> &'static str { + match self { + TransactionHandlerError::InvalidTransaction(_) => "InvalidTransaction", + TransactionHandlerError::SerializationFailed(_) => "SerializationFailed", + TransactionHandlerError::PreflightNotSupported => "PreflightNotSupported", + TransactionHandlerError::InvalidParams(_) => "InvalidParams", + TransactionHandlerError::UnsupportedEncoding => "UnsupportedEncoding", + } + } +} + impl From for TransactionHandlerError { fn from(err: ErrorObjectOwned) -> Self { TransactionHandlerError::InvalidParams(err.message().to_string()) @@ -82,16 +94,13 @@ impl TransactionHandler { .map_err(|e| TransactionHandlerError::InvalidTransaction(e.to_string()))?; let signature = transaction.signatures[0]; - let mut wire_transaction = bincode::serialize(&transaction)?; + let wire_transaction = bincode::serialize(&transaction)?; if wire_transaction.len() > PACKET_DATA_SIZE { - wire_transaction.shrink_to_fit(); - if wire_transaction.len() > PACKET_DATA_SIZE { - return Err(TransactionHandlerError::InvalidTransaction(format!( - "transaction size {} exceeds maximum allowed size of {} bytes", - wire_transaction.len(), - PACKET_DATA_SIZE - ))); - } + return Err(TransactionHandlerError::InvalidTransaction(format!( + "transaction size {} exceeds maximum allowed size of {} bytes", + wire_transaction.len(), + PACKET_DATA_SIZE + ))); } self.transaction_sink diff --git a/crates/tpu-client/Cargo.toml b/crates/tpu-client/Cargo.toml index d3be03e..79ec373 100644 --- a/crates/tpu-client/Cargo.toml +++ b/crates/tpu-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-jet-tpu-client" -version = "0.3.0" +version = "0.3.1" edition.workspace = true authors.workspace = true license.workspace = true diff --git a/crates/tpu-client/src/config.rs b/crates/tpu-client/src/config.rs index 388d2f0..ee56561 100644 --- a/crates/tpu-client/src/config.rs +++ b/crates/tpu-client/src/config.rs @@ -226,6 +226,7 @@ impl TpuSenderConfig { /// This function enables sending transactions of arbitrary size, which may lead to unexpected behavior or security vulnerabilities. /// It should only be used in controlled testing environments. /// + #[allow(unreachable_code)] pub unsafe fn allow_arbitrary_txn_size(&mut self) { #[cfg(not(feature = "intg-testing"))] { diff --git a/crates/tpu-client/src/core.rs b/crates/tpu-client/src/core.rs index a4ba013..a30e42e 100644 --- a/crates/tpu-client/src/core.rs +++ b/crates/tpu-client/src/core.rs @@ -362,6 +362,12 @@ impl OrphanConnectionSet { self.curr_len } + fn clear(&mut self) { + self.prio_queue.clear(); + self.prio_queue_rev_index.clear(); + self.curr_len = 0; + } + fn insert(&mut self, info: OrphanConnectionInfo, now: Instant) { // Check if not already present if let Some(version_set) = self.prio_queue_rev_index.get_mut(&info.remote_peer_addr) { @@ -440,6 +446,7 @@ struct OrphanConnectionInfo { struct ConnectionEvictionSet { set: HashSet, socket_addr_set: HashSet, + socket_addr_refcount: HashMap, } impl ConnectionEvictionSet { @@ -454,12 +461,17 @@ impl ConnectionEvictionSet { fn clear(&mut self) { self.set.clear(); self.socket_addr_set.clear(); + self.socket_addr_refcount.clear(); } fn insert(&mut self, eviction: ConnectionEviction) -> bool { let inserted = self.set.insert(eviction.clone()); if inserted { self.socket_addr_set.insert(eviction.remote_peer_addr); + self.socket_addr_refcount + .entry(eviction.remote_peer_addr) + .and_modify(|count| *count += 1) + .or_insert(1); } inserted } @@ -467,7 +479,15 @@ impl ConnectionEvictionSet { fn remove(&mut self, eviction: &ConnectionEviction) -> bool { let removed = self.set.remove(eviction); if removed { - self.socket_addr_set.remove(&eviction.remote_peer_addr); + let entry = self + .socket_addr_refcount + .get_mut(&eviction.remote_peer_addr) + .expect("missing socket addr refcount for existing eviction"); + *entry = entry.saturating_sub(1); + if *entry == 0 { + self.socket_addr_refcount.remove(&eviction.remote_peer_addr); + self.socket_addr_set.remove(&eviction.remote_peer_addr); + } } removed } @@ -494,6 +514,7 @@ pub struct ConnectionEviction { } pub struct ActiveConnection { + remote_peer_addr: SocketAddr, conn: Arc, connection_version: u64, multiplexed_remote_peer_identity_with_stake: HashMap, @@ -835,6 +856,7 @@ impl ConnectingTask { /// benchmarks conducted by the Anza team indicate that this approach degrades performance rather than improving it. struct QuicTxSenderWorker { remote_peer: Pubkey, + remote_peer_addr: SocketAddr, connection: Arc, /// The current client identity being used for the connection current_client_identity: Pubkey, @@ -887,7 +909,7 @@ where attempt: usize, ) -> Option { let result = self.send_tx(tx.wire.as_ref()).await; - let remote_addr = self.connection.remote_address(); + let remote_addr = self.remote_peer_addr; let tx_sig = tx.tx_sig; match result { Ok(sent_ok) => { @@ -937,7 +959,7 @@ where ); let resp = TxFailed { remote_peer_identity: self.remote_peer, - remote_peer_addr: self.connection.remote_address(), + remote_peer_addr: self.remote_peer_addr, failure_reason: e.to_string(), tx_sig, }; @@ -1887,7 +1909,7 @@ where continue; } let connection_eviction = ConnectionEviction { - remote_peer_addr: active_conn.conn.remote_address(), + remote_peer_addr: active_conn.remote_peer_addr, connection_version: active_conn.connection_version, }; self.pending_connection_eviction_set @@ -1949,6 +1971,7 @@ where let worker = QuicTxSenderWorker { remote_peer: remote_peer_identity, + remote_peer_addr, connection, current_client_identity: self.identity.pubkey(), incoming_rx: rx, @@ -2073,6 +2096,7 @@ where } let conn = Arc::new(conn); let active_connection = ActiveConnection { + remote_peer_addr: remote_peer_address, conn: Arc::clone(&conn), connection_version: self.next_connection_version(), multiplexed_remote_peer_identity_with_stake: Default::default(), @@ -2612,6 +2636,7 @@ where async fn update_identity(&mut self, new_identity: Keypair, barrier_like: impl Future) { self.schedule_graceful_drop_all_worker(); self.connection_map.clear(); + self.orphan_connection_set.clear(); self.being_evicted_peers.clear(); self.pending_connection_eviction_set.clear(); self.connecting_tasks.abort_all(); @@ -2850,29 +2875,24 @@ where remote_peer_addr, connection_version, } = unused_conn_info; - // If for some reason, the connection is still active and has multiplexed peers, we skip eviction. - let is_false_positive = - self.connection_map - .get(&remote_peer_addr) - .is_some_and(|active_conn| { - active_conn.connection_version == connection_version - && !active_conn - .multiplexed_remote_peer_identity_with_stake - .is_empty() - }); - - if is_false_positive { - continue; - } - - let Some(active_conn) = self.connection_map.remove(&remote_peer_addr) else { + let Some(active_conn) = self.connection_map.get(&remote_peer_addr) else { continue; }; - if active_conn.connection_version != connection_version { // Connection has been re-established since it was marked as unused. continue; } + if !active_conn + .multiplexed_remote_peer_identity_with_stake + .is_empty() + { + // If for some reason, the connection is still active and has multiplexed peers, + // this orphan entry is stale. + continue; + } + let Some(active_conn) = self.connection_map.remove(&remote_peer_addr) else { + continue; + }; assert!( active_conn @@ -3651,6 +3671,33 @@ mod orphan_connect_set_test { } } +#[cfg(test)] +mod connection_eviction_set_test { + use {super::ConnectionEvictionSet, crate::core::ConnectionEviction}; + + #[test] + fn remove_keeps_socket_addr_index_if_another_version_exists() { + let mut set = ConnectionEvictionSet::default(); + let addr = "127.0.0.1:9999".parse().unwrap(); + let ev1 = ConnectionEviction { + remote_peer_addr: addr, + connection_version: 1, + }; + let ev2 = ConnectionEviction { + remote_peer_addr: addr, + connection_version: 2, + }; + + assert!(set.insert(ev1.clone())); + assert!(set.insert(ev2.clone())); + assert!(set.contains_socket_addr(&addr)); + + assert!(set.remove(&ev1)); + assert!(set.contains_socket_addr(&addr)); + assert!(set.contains(&ev2)); + } +} + #[cfg(test)] mod stake_based_eviction_strategy_test { use {