Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ solana-transaction-error = "3.0.0"
# Yellowstone
yellowstone-grpc-client = "10.2.0"
yellowstone-grpc-proto = "10.1.1"
yellowstone-jet-tpu-client = { path = "crates/tpu-client", version = "0.3.0" }
yellowstone-jet-tpu-client = { path = "crates/tpu-client", version = "0.3.1" }
yellowstone-shield-store = "0.9.1"

[workspace.lints.clippy]
Expand Down
2 changes: 1 addition & 1 deletion apps/jet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-jet"
version = "14.9.3"
version = "14.9.4"
description = "Yellowstone Jet"
edition.workspace = true
authors.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion apps/jet/src/bin/jet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ async fn run_jet(
config.listen_solana_like.bind[0],
RpcServerType::SolanaLike {
tx_handler: tx_handler.clone(),
log_invalid_txn: config.log_invalid_txn,
},
)
.await;
Expand All @@ -447,7 +448,10 @@ async fn run_jet(
info!("starting jet-gateway listener");
let stake_info = stake_info_map.clone();
let jet_gw_identity = initial_identity.insecure_clone();
let tx_sender = RpcServer::create_solana_like_rpc_server_impl(tx_handler);
let tx_sender = RpcServer::create_solana_like_rpc_server_impl(
tx_handler,
config.log_invalid_txn,
);
let (jet_gw_identity_updater, jet_gw_fut) = spawn_jet_gw_listener(
stake_info,
jet_gw_config,
Expand Down
6 changes: 5 additions & 1 deletion apps/jet/src/bin/lite-jet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async fn run_jet(
config.listen_solana_like.bind[0],
RpcServerType::SolanaLike {
tx_handler: tx_handler.clone(),
log_invalid_txn: config.log_invalid_txn,
},
)
.await;
Expand All @@ -280,7 +281,10 @@ async fn run_jet(
info!("starting jet-gateway listener");
let stake_info = stake_info_map.clone();
let jet_gw_identity = initial_identity.insecure_clone();
let tx_sender = RpcServer::create_solana_like_rpc_server_impl(tx_handler);
let tx_sender = RpcServer::create_solana_like_rpc_server_impl(
tx_handler,
config.log_invalid_txn,
);
let (jet_gw_identity_updater, jet_gw_fut) = spawn_jet_gw_listener(
stake_info,
jet_gw_config,
Expand Down
5 changes: 5 additions & 0 deletions apps/jet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ pub struct ConfigJet {
/// Shield Program ID (Optional, default to yellowstone-shield-store default)
#[serde(default, deserialize_with = "ConfigJet::deserialize_maybe_program_id")]
pub program_id: Option<Pubkey>,

/// Whether to log invalid transactions in RPC handler
/// This is useful for debugging transaction handling errors, but may cause log spam if there are many invalid transactions.
#[serde(default)]
pub log_invalid_txn: bool,
}

impl ConfigJet {
Expand Down
7 changes: 7 additions & 0 deletions apps/jet/src/grpc_jet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,16 @@ impl GrpcTransactionHandler {
transaction: SubscribeTransaction,
) -> Result<(), TransactionHandlerError> {
let payload = TransactionPayload::try_from(transaction)
.inspect_err(|e| {
error!(?e, "failed to parse transaction payload");
metrics::jet::increment_transaction_deserialize_error(e.variant_name());
})
.map_err(|e| TransactionHandlerError::PayloadParseError(e.to_string()))?;

let (transaction, config_option) = TransactionDecoder::decode(&payload)
.inspect_err(|e| {
metrics::jet::increment_transaction_decode_error(e.variant_name());
})
.map_err(|e| TransactionHandlerError::DecodeError(e.to_string()))?;

let config = config_option.unwrap_or_default();
Expand Down
12 changes: 12 additions & 0 deletions apps/jet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ pub mod jet {
])
).unwrap();


static ref VERSIONED_TXN_HANDLE_ERROR: IntCounterVec = IntCounterVec::new(
Opts::new("transaction_handler_error_total", "Number of errors in transaction handler by type"),
&["error"]
).unwrap();
}

pub fn init() {
Expand Down Expand Up @@ -291,12 +296,19 @@ pub mod jet {
register!(BLOCK_META_EMISSIONS_COUNT);
register!(GRPC_MESSAGES_PROCESSED_RATE);
register!(NEW_SLOT_ARRIVAL_INTERVAL);
register!(VERSIONED_TXN_HANDLE_ERROR);

yellowstone_jet_tpu_client::prom::register_metrics(&REGISTRY);
grpc_lewis::prom::register_metrics(&REGISTRY);
});
}

pub fn incr_versioned_txn_handler_error(error_type: &str) {
VERSIONED_TXN_HANDLE_ERROR
.with_label_values(&[error_type])
.inc();
}

pub fn incr_send_tx_attempt(leader: Pubkey) {
SEND_TRANSACTION_ATTEMPT
.with_label_values(&[&leader.to_string()])
Expand Down
15 changes: 15 additions & 0 deletions apps/jet/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ pub enum PayloadError {
ProtoConversionError(String),
}

impl PayloadError {
pub const fn variant_name(&self) -> &'static str {
match self {
PayloadError::EmptyPayload => "EmptyPayload",
PayloadError::LegacyDeserialize(_) => "LegacyDeserialize",
PayloadError::Base58Decode(_) => "Base58Decode",
PayloadError::Base64Decode(_) => "Base64Decode",
PayloadError::UnsupportedEncoding => "UnsupportedEncoding",
PayloadError::BincodeError(_) => "BincodeError",
PayloadError::InvalidPubkey(_) => "InvalidPubkey",
PayloadError::ProtoConversionError(_) => "ProtoConversionError",
}
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LegacyPayload {
pub transaction: String, // base58/base64 encoded transaction
Expand Down
33 changes: 28 additions & 5 deletions apps/jet/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum RpcServerType {

SolanaLike {
tx_handler: TransactionHandler,
log_invalid_txn: bool,
},
}

Expand Down Expand Up @@ -113,10 +114,14 @@ impl RpcServer {
)
})
}
RpcServerType::SolanaLike { tx_handler } => {
RpcServerType::SolanaLike {
tx_handler,
log_invalid_txn,
} => {
use rpc_solana_like::RpcServer;

let rpc_server_impl = Self::create_solana_like_rpc_server_impl(tx_handler);
let rpc_server_impl =
Self::create_solana_like_rpc_server_impl(tx_handler, log_invalid_txn);
let server_config = ServerConfigBuilder::default()
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
.build();
Expand All @@ -138,8 +143,12 @@ impl RpcServer {

pub const fn create_solana_like_rpc_server_impl(
tx_handler: TransactionHandler,
log_invalid_txn: bool,
) -> rpc_solana_like::RpcServerImpl {
rpc_solana_like::RpcServerImpl { tx_handler }
rpc_solana_like::RpcServerImpl {
tx_handler,
log_invalid_txn,
}
}

pub fn shutdown(self) {
Expand Down Expand Up @@ -281,7 +290,7 @@ pub mod rpc_admin {
pub mod rpc_solana_like {
use {
crate::{
payload::JetRpcSendTransactionConfig, rpc::invalid_params,
metrics, payload::JetRpcSendTransactionConfig, rpc::invalid_params,
solana::decode_and_deserialize, transaction_handler::TransactionHandler,
},
jsonrpsee::{
Expand Down Expand Up @@ -309,6 +318,7 @@ pub mod rpc_solana_like {

#[derive(Clone)]
pub struct RpcServerImpl {
pub log_invalid_txn: bool,
pub tx_handler: TransactionHandler,
}

Expand All @@ -320,10 +330,23 @@ pub mod rpc_solana_like {
config: JetRpcSendTransactionConfig,
) -> RpcResult<String /* Signature */> {
debug!("handling internal versioned transaction");

let maybe_txn_sig = transaction.signatures.first().cloned();
self.tx_handler
.handle_versioned_transaction(transaction, config)
.await
.inspect_err(|e| {
let name = e.variant_name();
if self.log_invalid_txn
&& let Some(signature) = maybe_txn_sig
{
tracing::warn!(
error = %e,
signature = signature.to_string(),
category = "txn_handle_error",
);
}
metrics::jet::incr_versioned_txn_handler_error(name);
})
.map_err(Into::into)
}
}
Expand Down
27 changes: 18 additions & 9 deletions apps/jet/src/transaction_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ pub enum TransactionHandlerError {
UnsupportedEncoding,
}

impl TransactionHandlerError {
pub const fn variant_name(&self) -> &'static str {
match self {
TransactionHandlerError::InvalidTransaction(_) => "InvalidTransaction",
TransactionHandlerError::SerializationFailed(_) => "SerializationFailed",
TransactionHandlerError::PreflightNotSupported => "PreflightNotSupported",
TransactionHandlerError::InvalidParams(_) => "InvalidParams",
TransactionHandlerError::UnsupportedEncoding => "UnsupportedEncoding",
}
}
}

impl From<ErrorObjectOwned> for TransactionHandlerError {
fn from(err: ErrorObjectOwned) -> Self {
TransactionHandlerError::InvalidParams(err.message().to_string())
Expand Down Expand Up @@ -82,16 +94,13 @@ impl TransactionHandler {
.map_err(|e| TransactionHandlerError::InvalidTransaction(e.to_string()))?;

let signature = transaction.signatures[0];
let mut wire_transaction = bincode::serialize(&transaction)?;
let wire_transaction = bincode::serialize(&transaction)?;
if wire_transaction.len() > PACKET_DATA_SIZE {
wire_transaction.shrink_to_fit();
if wire_transaction.len() > PACKET_DATA_SIZE {
return Err(TransactionHandlerError::InvalidTransaction(format!(
"transaction size {} exceeds maximum allowed size of {} bytes",
wire_transaction.len(),
PACKET_DATA_SIZE
)));
}
return Err(TransactionHandlerError::InvalidTransaction(format!(
"transaction size {} exceeds maximum allowed size of {} bytes",
wire_transaction.len(),
PACKET_DATA_SIZE
)));
}

self.transaction_sink
Expand Down
2 changes: 1 addition & 1 deletion crates/tpu-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-jet-tpu-client"
version = "0.3.0"
version = "0.3.1"
edition.workspace = true
authors.workspace = true
license.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/tpu-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl TpuSenderConfig {
/// This function enables sending transactions of arbitrary size, which may lead to unexpected behavior or security vulnerabilities.
/// It should only be used in controlled testing environments.
///
#[allow(unreachable_code)]
pub unsafe fn allow_arbitrary_txn_size(&mut self) {
#[cfg(not(feature = "intg-testing"))]
{
Expand Down
Loading