diff --git a/Cargo.lock b/Cargo.lock index 2e7e2e2fb7..1dc37e6a01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -141,6 +141,7 @@ dependencies = [ "log", "solana-clock", "solana-hash 3.1.0", + "solana-message", "solana-signature", "solana-transaction", "solana-transaction-status", @@ -1697,9 +1698,9 @@ checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" [[package]] name = "bytemuck" -version = "1.24.0" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" dependencies = [ "bytemuck_derive", ] @@ -8860,6 +8861,7 @@ dependencies = [ "solana-hash 3.1.0", "solana-ledger", "solana-measure", + "solana-message", "solana-metrics", "solana-pubkey", "solana-rpc", diff --git a/Cargo.toml b/Cargo.toml index 09c5cb696d..7b08103d09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -231,7 +231,7 @@ borsh = { version = "1.5.7", features = ["derive", "unstable__schema"] } bs58 = { version = "0.5.1", default-features = false } bv = "0.11.1" byte-unit = "4.0.19" -bytemuck = "1.24.0" +bytemuck = "1.25.0" bytemuck_derive = "1.10.2" bytes = "1.10" bzip2 = "0.4.4" diff --git a/core/src/completed_data_sets_service.rs b/core/src/completed_data_sets_service.rs index 840e46b4c8..fb4b2af01d 100644 --- a/core/src/completed_data_sets_service.rs +++ b/core/src/completed_data_sets_service.rs @@ -7,13 +7,25 @@ use { crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, solana_entry::entry::Entry, - solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo}, + solana_ledger::{ + blockstore::{Blockstore, CompletedDataSetInfo}, + deshred_transaction_notifier_interface::DeshredTransactionNotifierArc, + }, + solana_measure::measure::Measure, + solana_message::{v0::LoadedAddresses, VersionedMessage}, + solana_metrics::*, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, + solana_runtime::bank_forks::BankForks, solana_signature::Signature, + solana_svm_transaction::message_address_table_lookup::SVMMessageAddressTableLookup, + solana_transaction::{ + simple_vote_transaction_checker::is_simple_vote_transaction_impl, + versioned::VersionedTransaction, + }, std::{ sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, RwLock, }, thread::{self, Builder, JoinHandle}, time::Duration, @@ -23,6 +35,46 @@ use { pub type CompletedDataSetsReceiver = Receiver>; pub type CompletedDataSetsSender = Sender>; +/// Check if a versioned transaction is a simple vote transaction. +/// This avoids cloning by extracting the required data directly. +fn is_simple_vote_transaction(tx: &VersionedTransaction) -> bool { + let is_legacy = matches!(&tx.message, VersionedMessage::Legacy(_)); + let (account_keys, instructions) = match &tx.message { + VersionedMessage::Legacy(msg) => (&msg.account_keys[..], &msg.instructions[..]), + VersionedMessage::V0(msg) => (&msg.account_keys[..], &msg.instructions[..]), + }; + let instruction_programs = instructions + .iter() + .filter_map(|ix| account_keys.get(ix.program_id_index as usize)); + is_simple_vote_transaction_impl(&tx.signatures, is_legacy, instruction_programs) +} + +/// Load addresses from address lookup tables for a versioned transaction. +/// Returns None for legacy transactions or if address resolution fails. +/// Takes a Bank reference to avoid repeated lock acquisition. +fn load_transaction_addresses( + tx: &VersionedTransaction, + bank: &solana_runtime::bank::Bank, +) -> Option { + let VersionedMessage::V0(message) = &tx.message else { + // Legacy transactions don't have address table lookups + return None; + }; + + if message.address_table_lookups.is_empty() { + return None; + } + + bank.load_addresses_from_ref( + message + .address_table_lookups + .iter() + .map(SVMMessageAddressTableLookup::from), + ) + .ok() + .map(|(addresses, _deactivation_slot)| addresses) +} + pub struct CompletedDataSetsService { thread_hdl: JoinHandle<()>, } @@ -32,8 +84,10 @@ impl CompletedDataSetsService { completed_sets_receiver: CompletedDataSetsReceiver, blockstore: Arc, rpc_subscriptions: Arc, + deshred_transaction_notifier: Option, exit: Arc, max_slots: Arc, + bank_forks: Arc>, ) -> Self { let thread_hdl = Builder::new() .name("solComplDataSet".to_string()) @@ -47,7 +101,9 @@ impl CompletedDataSetsService { &completed_sets_receiver, &blockstore, &rpc_subscriptions, + &deshred_transaction_notifier, &max_slots, + &bank_forks, ) { break; } @@ -62,13 +118,78 @@ impl CompletedDataSetsService { completed_sets_receiver: &CompletedDataSetsReceiver, blockstore: &Blockstore, rpc_subscriptions: &RpcSubscriptions, + deshred_transaction_notifier: &Option, max_slots: &Arc, + bank_forks: &RwLock, ) -> Result<(), RecvTimeoutError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let handle_completed_data_set_info = |completed_data_set_info| { + + let mut batch_measure = Measure::start("deshred_geyser_batch"); + + // Get root bank once per batch to minimize lock contention + let root_bank = deshred_transaction_notifier + .as_ref() + .and_then(|_| bank_forks.read().ok()) + .map(|forks| forks.root_bank()); + + // Metrics accumulators + let mut total_lut_load_us: u64 = 0; + let mut total_notify_us: u64 = 0; + let mut total_transactions: u64 = 0; + let mut total_entries: u64 = 0; + let mut total_data_sets: u64 = 0; + let mut lut_transactions: u64 = 0; + + let handle_completed_data_set_info = |completed_data_set_info: CompletedDataSetInfo, + total_lut_load_us: &mut u64, + total_notify_us: &mut u64, + total_transactions: &mut u64, + total_entries: &mut u64, + total_data_sets: &mut u64, + lut_transactions: &mut u64| { let CompletedDataSetInfo { slot, indices } = completed_data_set_info; match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) { Ok(entries) => { + *total_data_sets += 1; + *total_entries += entries.len() as u64; + + // Notify deshred transactions if notifier is enabled + if let Some(notifier) = deshred_transaction_notifier { + for entry in entries.iter() { + for tx in &entry.transactions { + if let Some(signature) = tx.signatures.first() { + *total_transactions += 1; + let is_vote = is_simple_vote_transaction(tx); + + // Measure LUT loading time + let mut lut_measure = Measure::start("load_lut"); + let loaded_addresses = root_bank + .as_ref() + .and_then(|bank| load_transaction_addresses(tx, bank)); + lut_measure.stop(); + + if loaded_addresses.is_some() { + *lut_transactions += 1; + *total_lut_load_us += lut_measure.as_us(); + } + + // Measure notification time + let mut notify_measure = Measure::start("notify_deshred"); + notifier.notify_deshred_transaction( + slot, + signature, + is_vote, + tx, + loaded_addresses.as_ref(), + ); + notify_measure.stop(); + *total_notify_us += notify_measure.as_us(); + } + } + } + } + + // Existing: notify signatures for RPC subscriptions let transactions = Self::get_transaction_signatures(entries); if !transactions.is_empty() { rpc_subscriptions.notify_signatures_received((slot, transactions)); @@ -78,15 +199,49 @@ impl CompletedDataSetsService { } slot }; + let slots = completed_sets_receiver .recv_timeout(RECV_TIMEOUT) .map(std::iter::once)? .chain(completed_sets_receiver.try_iter()) .flatten() - .map(handle_completed_data_set_info); + .map(|info| { + handle_completed_data_set_info( + info, + &mut total_lut_load_us, + &mut total_notify_us, + &mut total_transactions, + &mut total_entries, + &mut total_data_sets, + &mut lut_transactions, + ) + }); + if let Some(slot) = slots.max() { max_slots.shred_insert.fetch_max(slot, Ordering::Relaxed); } + + batch_measure.stop(); + + // Report metrics if we processed any transactions + if total_transactions > 0 { + datapoint_info!( + "deshred_geyser_timing", + ("batch_total_us", batch_measure.as_us() as i64, i64), + ("notify_total_us", total_notify_us as i64, i64), + ("lut_load_total_us", total_lut_load_us as i64, i64), + ("transactions_count", total_transactions as i64, i64), + ("lut_transactions_count", lut_transactions as i64, i64), + ("entries_count", total_entries as i64, i64), + ("data_sets_count", total_data_sets as i64, i64), + ( + "avg_notify_us", + (total_notify_us / total_transactions) as i64, + i64 + ), + ); + } + Ok(()) } diff --git a/core/src/validator.rs b/core/src/validator.rs index dc3b882e9c..601c796256 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -822,6 +822,7 @@ impl Validator { let ( accounts_update_notifier, transaction_notifier, + deshred_transaction_notifier, entry_notifier, block_metadata_notifier, slot_status_notifier, @@ -829,19 +830,21 @@ impl Validator { ( service.get_accounts_update_notifier(), service.get_transaction_notifier(), + service.get_deshred_transaction_notifier(), service.get_entry_notifier(), service.get_block_metadata_notifier(), service.get_slot_status_notifier(), ) } else { - (None, None, None, None, None) + (None, None, None, None, None, None) }; info!( "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \ - entry_notifier: {}", + deshred_transaction_notifier: {}, entry_notifier: {}", accounts_update_notifier.is_some(), transaction_notifier.is_some(), + deshred_transaction_notifier.is_some(), entry_notifier.is_some() ); @@ -1307,8 +1310,10 @@ impl Validator { completed_data_sets_receiver, blockstore.clone(), rpc_subscriptions.clone(), + deshred_transaction_notifier.clone(), exit.clone(), max_slots.clone(), + bank_forks.clone(), ); ( Some(completed_data_sets_sender), diff --git a/geyser-plugin-interface/Cargo.toml b/geyser-plugin-interface/Cargo.toml index fe3bee2917..92476f3c57 100644 --- a/geyser-plugin-interface/Cargo.toml +++ b/geyser-plugin-interface/Cargo.toml @@ -19,6 +19,7 @@ agave-unstable-api = [] log = { workspace = true, features = ["std"] } solana-clock = { workspace = true } solana-hash = { workspace = true } +solana-message = { workspace = true } solana-signature = { workspace = true } solana-transaction = { workspace = true } solana-transaction-status = { workspace = true } diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index b3dc245c05..9df84814ae 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -5,6 +5,7 @@ use { solana_clock::{Slot, UnixTimestamp}, solana_hash::Hash, + solana_message::v0::LoadedAddresses, solana_signature::Signature, solana_transaction::{sanitized::SanitizedTransaction, versioned::VersionedTransaction}, solana_transaction_status::{Reward, RewardsAndNumPartitions, TransactionStatusMeta}, @@ -191,6 +192,33 @@ pub enum ReplicaTransactionInfoVersions<'a> { V0_0_3(&'a ReplicaTransactionInfoV3<'a>), } +/// Information about a transaction after deshredding (when entries are formed from shreds). +/// This is sent before any execution occurs. +/// Unlike ReplicaTransactionInfo, this does not include TransactionStatusMeta +/// since execution has not happened yet. +#[derive(Clone, Debug)] +#[repr(C)] +pub struct ReplicaDeshredTransactionInfo<'a> { + /// The transaction signature, used for identifying the transaction. + pub signature: &'a Signature, + + /// Indicates if the transaction is a simple vote transaction. + pub is_vote: bool, + + /// The versioned transaction. + pub transaction: &'a VersionedTransaction, + + /// Addresses loaded from address lookup tables for V0 transactions. + /// This is None for legacy transactions or if address resolution failed. + pub loaded_addresses: Option<&'a LoadedAddresses>, +} + +/// A wrapper to future-proof ReplicaDeshredTransactionInfo handling. +#[repr(u32)] +pub enum ReplicaDeshredTransactionInfoVersions<'a> { + V0_0_1(&'a ReplicaDeshredTransactionInfo<'a>), +} + #[derive(Clone, Debug)] #[repr(C)] pub struct ReplicaEntryInfo<'a> { @@ -471,6 +499,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> { Ok(()) } + + /// Called when a transaction is deshredded (entries formed from shreds). + /// This is triggered before any execution occurs. Unlike notify_transaction, + /// this does not include execution metadata (TransactionStatusMeta). + #[allow(unused_variables)] + fn notify_deshred_transaction( + &self, + transaction: ReplicaDeshredTransactionInfoVersions, + slot: Slot, + ) -> Result<()> { + Ok(()) + } /// Check if the plugin is interested in account data /// Default is true -- if the plugin is not interested in @@ -500,4 +540,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { fn entry_notifications_enabled(&self) -> bool { false } + + /// Check if the plugin is interested in deshred transaction data. + /// Default is false -- if the plugin is interested in receiving + /// transactions when they are deshredded, return true. + fn deshred_transaction_notifications_enabled(&self) -> bool { + false + } } diff --git a/geyser-plugin-manager/Cargo.toml b/geyser-plugin-manager/Cargo.toml index d720305782..019749a499 100644 --- a/geyser-plugin-manager/Cargo.toml +++ b/geyser-plugin-manager/Cargo.toml @@ -31,6 +31,7 @@ solana-entry = { workspace = true } solana-hash = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } +solana-message = { workspace = true } solana-metrics = { workspace = true } solana-pubkey = { workspace = true } solana-rpc = { workspace = true } diff --git a/geyser-plugin-manager/src/deshred_transaction_notifier.rs b/geyser-plugin-manager/src/deshred_transaction_notifier.rs new file mode 100644 index 0000000000..39fd67989f --- /dev/null +++ b/geyser-plugin-manager/src/deshred_transaction_notifier.rs @@ -0,0 +1,102 @@ +/// Module responsible for notifying plugins of transactions when deshredded +use { + crate::geyser_plugin_manager::GeyserPluginManager, + agave_geyser_plugin_interface::geyser_plugin_interface::{ + ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions, + }, + log::*, + solana_clock::Slot, + solana_ledger::deshred_transaction_notifier_interface::DeshredTransactionNotifier, + solana_measure::measure::Measure, + solana_message::v0::LoadedAddresses, + solana_metrics::*, + solana_signature::Signature, + solana_transaction::versioned::VersionedTransaction, + std::sync::{Arc, RwLock}, +}; + +/// This implementation of DeshredTransactionNotifier is passed to the CompletedDataSetsService +/// at validator startup. CompletedDataSetsService invokes the notify_deshred_transaction method +/// when entries are formed from shreds. The implementation in turn invokes the +/// notify_deshred_transaction of each plugin enabled with deshred transaction notification +/// managed by the GeyserPluginManager. +pub(crate) struct DeshredTransactionNotifierImpl { + plugin_manager: Arc>, +} + +impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl { + fn notify_deshred_transaction( + &self, + slot: Slot, + signature: &Signature, + is_vote: bool, + transaction: &VersionedTransaction, + loaded_addresses: Option<&LoadedAddresses>, + ) { + let mut measure = + Measure::start("geyser-plugin-notify_plugins_of_deshred_transaction_info"); + let transaction_info = Self::build_replica_deshred_transaction_info( + signature, + is_vote, + transaction, + loaded_addresses, + ); + + let plugin_manager = self.plugin_manager.read().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter() { + if !plugin.deshred_transaction_notifications_enabled() { + continue; + } + match plugin.notify_deshred_transaction( + ReplicaDeshredTransactionInfoVersions::V0_0_1(&transaction_info), + slot, + ) { + Err(err) => { + error!( + "Failed to notify deshred transaction, error: ({}) to plugin {}", + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully notified deshred transaction to plugin {}", + plugin.name() + ); + } + } + } + measure.stop(); + inc_new_counter_debug!( + "geyser-plugin-notify_plugins_of_deshred_transaction_info-us", + measure.as_us() as usize, + 10000, + 10000 + ); + } +} + +impl DeshredTransactionNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + Self { plugin_manager } + } + + fn build_replica_deshred_transaction_info<'a>( + signature: &'a Signature, + is_vote: bool, + transaction: &'a VersionedTransaction, + loaded_addresses: Option<&'a LoadedAddresses>, + ) -> ReplicaDeshredTransactionInfo<'a> { + ReplicaDeshredTransactionInfo { + signature, + is_vote, + transaction, + loaded_addresses, + } + } +} diff --git a/geyser-plugin-manager/src/geyser_plugin_manager.rs b/geyser-plugin-manager/src/geyser_plugin_manager.rs index 08354df044..309b89a6e6 100644 --- a/geyser-plugin-manager/src/geyser_plugin_manager.rs +++ b/geyser-plugin-manager/src/geyser_plugin_manager.rs @@ -109,6 +109,16 @@ impl GeyserPluginManager { false } + /// Check if there is any plugin interested in deshred transaction data + pub fn deshred_transaction_notifications_enabled(&self) -> bool { + for plugin in &self.plugins { + if plugin.deshred_transaction_notifications_enabled() { + return true; + } + } + false + } + /// Admin RPC request handler pub(crate) fn list_plugins(&self) -> JsonRpcResult> { Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect()) diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index 29c9cc03a4..6784ade6c2 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -5,6 +5,7 @@ use { block_metadata_notifier_interface::BlockMetadataNotifierArc, entry_notifier::EntryNotifierImpl, geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest}, + deshred_transaction_notifier::DeshredTransactionNotifierImpl, slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl, @@ -12,7 +13,10 @@ use { crossbeam_channel::Receiver, log::*, solana_accounts_db::accounts_update_notifier_interface::AccountsUpdateNotifier, - solana_ledger::entry_notifier_interface::EntryNotifierArc, + solana_ledger::{ + entry_notifier_interface::EntryNotifierArc, + deshred_transaction_notifier_interface::DeshredTransactionNotifierArc, + }, solana_rpc::{ optimistically_confirmed_bank_tracker::SlotNotification, slot_status_notifier::SlotStatusNotifier, @@ -36,6 +40,7 @@ pub struct GeyserPluginService { plugin_manager: Arc>, accounts_update_notifier: Option, transaction_notifier: Option, + deshred_transaction_notifier: Option, entry_notifier: Option, block_metadata_notifier: Option, slot_status_notifier: Option, @@ -91,6 +96,9 @@ impl GeyserPluginService { plugin_manager.account_data_snapshot_notifications_enabled(); let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled() || geyser_plugin_always_enabled; + let deshred_transaction_notifications_enabled = + plugin_manager.deshred_transaction_notifications_enabled() + || geyser_plugin_always_enabled; let entry_notifications_enabled = plugin_manager.entry_notifications_enabled() || geyser_plugin_always_enabled; let plugin_manager = Arc::new(RwLock::new(plugin_manager)); @@ -114,6 +122,15 @@ impl GeyserPluginService { None }; + let deshred_transaction_notifier: Option = + if deshred_transaction_notifications_enabled { + let deshred_transaction_notifier = + DeshredTransactionNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(deshred_transaction_notifier)) + } else { + None + }; + let entry_notifier: Option = if entry_notifications_enabled { let entry_notifier = EntryNotifierImpl::new(plugin_manager.clone()); Some(Arc::new(entry_notifier)) @@ -127,6 +144,7 @@ impl GeyserPluginService { Option, ) = if account_data_notifications_enabled || transaction_notifications_enabled + || deshred_transaction_notifications_enabled || entry_notifications_enabled { let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); @@ -157,6 +175,7 @@ impl GeyserPluginService { plugin_manager, accounts_update_notifier, transaction_notifier, + deshred_transaction_notifier, entry_notifier, block_metadata_notifier, slot_status_notifier, @@ -181,6 +200,10 @@ impl GeyserPluginService { self.transaction_notifier.clone() } + pub fn get_deshred_transaction_notifier(&self) -> Option { + self.deshred_transaction_notifier.clone() + } + pub fn get_entry_notifier(&self) -> Option { self.entry_notifier.clone() } diff --git a/geyser-plugin-manager/src/lib.rs b/geyser-plugin-manager/src/lib.rs index 8ecdbb2d23..8ca2eafbac 100644 --- a/geyser-plugin-manager/src/lib.rs +++ b/geyser-plugin-manager/src/lib.rs @@ -13,6 +13,7 @@ pub mod block_metadata_notifier_interface; pub mod entry_notifier; pub mod geyser_plugin_manager; pub mod geyser_plugin_service; +pub mod deshred_transaction_notifier; pub mod slot_status_notifier; pub mod slot_status_observer; pub mod transaction_notifier; diff --git a/ledger/src/deshred_transaction_notifier_interface.rs b/ledger/src/deshred_transaction_notifier_interface.rs new file mode 100644 index 0000000000..42171c024e --- /dev/null +++ b/ledger/src/deshred_transaction_notifier_interface.rs @@ -0,0 +1,22 @@ +use { + solana_clock::Slot, + solana_message::v0::LoadedAddresses, + solana_signature::Signature, + solana_transaction::versioned::VersionedTransaction, + std::sync::Arc, +}; + +/// Trait for notifying about transactions when they are deshredded. +/// This is called when entries are formed from shreds, before any execution occurs. +pub trait DeshredTransactionNotifier { + fn notify_deshred_transaction( + &self, + slot: Slot, + signature: &Signature, + is_vote: bool, + transaction: &VersionedTransaction, + loaded_addresses: Option<&LoadedAddresses>, + ); +} + +pub type DeshredTransactionNotifierArc = Arc; diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index e7bf5659a4..fb08b87bbd 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -29,6 +29,7 @@ pub mod blockstore_options; pub mod blockstore_processor; pub mod entry_notifier_interface; pub mod entry_notifier_service; +pub mod deshred_transaction_notifier_interface; pub mod genesis_utils; pub mod leader_schedule; pub mod leader_schedule_cache; diff --git a/rpc-client-types/src/config.rs b/rpc-client-types/src/config.rs index d28b88ff4f..0e71254deb 100644 --- a/rpc-client-types/src/config.rs +++ b/rpc-client-types/src/config.rs @@ -20,12 +20,24 @@ pub struct RpcSignatureStatusConfig { pub struct RpcSendTransactionConfig { #[serde(default)] pub skip_preflight: bool, + #[serde(default)] + pub skip_sanitize: bool, pub preflight_commitment: Option, pub encoding: Option, pub max_retries: Option, pub min_context_slot: Option, } +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RcpSanitizeTransactionConfig { + pub sig_verify: bool, + #[serde(flatten)] + pub commitment: Option, + pub encoding: Option, + pub min_context_slot: Option, +} + #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RpcSimulateTransactionAccountsConfig { @@ -343,3 +355,18 @@ pub struct RpcContextConfig { pub commitment: Option, pub min_context_slot: Option, } + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcRecentPrioritizationFeesConfig { + pub percentile: Option, +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcLatestBlockhashConfig { + #[serde(flatten)] + pub context: RpcContextConfig, + #[serde(default)] + pub rollback: usize, +} diff --git a/rpc-client-types/src/filter.rs b/rpc-client-types/src/filter.rs index cdd11554cc..57a7401e33 100644 --- a/rpc-client-types/src/filter.rs +++ b/rpc-client-types/src/filter.rs @@ -1,6 +1,8 @@ use { base64::{prelude::BASE64_STANDARD, Engine}, serde::{Deserialize, Serialize}, + solana_account::{AccountSharedData, ReadableAccount}, + spl_generic_token::{token::GenericTokenAccount, token_2022::Account}, std::borrow::Cow, thiserror::Error, }; @@ -15,6 +17,7 @@ pub enum RpcFilterType { DataSize(u64), Memcmp(Memcmp), TokenAccountState, + ValueCmp(ValueCmp), } impl RpcFilterType { @@ -55,6 +58,22 @@ impl RpcFilterType { } } RpcFilterType::TokenAccountState => Ok(()), + RpcFilterType::ValueCmp(_) => Ok(()), + } + } + + #[deprecated( + since = "2.0.0", + note = "Use solana_rpc::filter::filter_allows instead" + )] + pub fn allows(&self, account: &AccountSharedData) -> bool { + match self { + RpcFilterType::DataSize(size) => account.data().len() as u64 == *size, + RpcFilterType::Memcmp(compare) => compare.bytes_match(account.data()), + RpcFilterType::TokenAccountState => Account::valid_account_data(account.data()), + RpcFilterType::ValueCmp(compare) => { + compare.values_match(account.data()).unwrap_or(false) + } } } } @@ -67,6 +86,8 @@ pub enum RpcFilterError { Base58DecodeError(#[from] bs58::decode::Error), #[error("base64 decode error")] Base64DecodeError(#[from] base64::DecodeError), + #[error("invalid ValueCmp filter")] + InvalidValueCmp, } #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -208,6 +229,178 @@ impl Memcmp { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ValueCmp { + pub left: Operand, + comparator: Comparator, + pub right: Operand, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum Operand { + Mem { + offset: usize, + value_type: ValueType, + }, + Constant(String), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum ValueType { + U8, + U16, + U32, + U64, + U128, +} + +enum WrappedValueType { + U8(u8), + U16(u16), + U32(u32), + U64(u64), + U128(u128), +} + +impl ValueCmp { + fn parse_mem_into_value_type( + o: &Operand, + data: &[u8], + ) -> Result { + match o { + Operand::Mem { offset, value_type } => match value_type { + ValueType::U8 => { + if *offset >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + + Ok(WrappedValueType::U8(data[*offset])) + } + ValueType::U16 => { + if *offset + 1 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U16(u16::from_le_bytes( + data[*offset..*offset + 2].try_into().unwrap(), + ))) + } + ValueType::U32 => { + if *offset + 3 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U32(u32::from_le_bytes( + data[*offset..*offset + 4].try_into().unwrap(), + ))) + } + ValueType::U64 => { + if *offset + 7 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U64(u64::from_le_bytes( + data[*offset..*offset + 8].try_into().unwrap(), + ))) + } + ValueType::U128 => { + if *offset + 15 >= data.len() { + return Err(RpcFilterError::InvalidValueCmp); + } + Ok(WrappedValueType::U128(u128::from_le_bytes( + data[*offset..*offset + 16].try_into().unwrap(), + ))) + } + }, + _ => Err(RpcFilterError::InvalidValueCmp), + } + } + + pub fn values_match(&self, data: &[u8]) -> Result { + match (&self.left, &self.right) { + (left @ Operand::Mem { .. }, right @ Operand::Mem { .. }) => { + let left = Self::parse_mem_into_value_type(left, data)?; + let right = Self::parse_mem_into_value_type(right, data)?; + + match (left, right) { + (WrappedValueType::U8(left), WrappedValueType::U8(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U16(left), WrappedValueType::U16(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U32(left), WrappedValueType::U32(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U64(left), WrappedValueType::U64(right)) => { + Ok(self.comparator.compare(left, right)) + } + (WrappedValueType::U128(left), WrappedValueType::U128(right)) => { + Ok(self.comparator.compare(left, right)) + } + _ => Err(RpcFilterError::InvalidValueCmp), + } + } + (left @ Operand::Mem { .. }, Operand::Constant(constant)) => { + match Self::parse_mem_into_value_type(left, data)? { + WrappedValueType::U8(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U16(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U32(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U64(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + WrappedValueType::U128(left) => { + let right = constant + .parse::() + .map_err(|_| RpcFilterError::InvalidValueCmp)?; + Ok(self.comparator.compare(left, right)) + } + } + } + _ => Err(RpcFilterError::InvalidValueCmp), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum Comparator { + Eq = 0, + Ne, + Gt, + Ge, + Lt, + Le, +} + +impl Comparator { + // write a generic function to compare two values + pub fn compare(&self, left: T, right: T) -> bool { + match self { + Comparator::Eq => left == right, + Comparator::Ne => left != right, + Comparator::Gt => left > right, + Comparator::Ge => left >= right, + Comparator::Lt => left < right, + Comparator::Le => left <= right, + } + } +} + #[cfg(test)] mod tests { use { @@ -441,4 +634,56 @@ mod tests { serde_json::from_str::(BYTES_FILTER_WITH_ENCODING).unwrap() ); } + + #[test] + fn test_values_match() { + // test all the ValueCmp cases + let data = vec![1, 2, 3, 4, 5]; + + let filter = ValueCmp { + left: Operand::Mem { + offset: 1, + value_type: ValueType::U8, + }, + comparator: Comparator::Eq, + right: Operand::Constant("2".to_string()), + }; + + assert!(ValueCmp { + left: Operand::Mem { + offset: 1, + value_type: ValueType::U8 + }, + comparator: Comparator::Eq, + right: Operand::Constant("2".to_string()) + } + .values_match(&data) + .unwrap()); + + assert!(ValueCmp { + left: Operand::Mem { + offset: 1, + value_type: ValueType::U8 + }, + comparator: Comparator::Lt, + right: Operand::Constant("3".to_string()) + } + .values_match(&data) + .unwrap()); + + assert!(ValueCmp { + left: Operand::Mem { + offset: 0, + value_type: ValueType::U32 + }, + comparator: Comparator::Eq, + right: Operand::Constant("67305985".to_string()) + } + .values_match(&data) + .unwrap()); + + // serialize + let s = serde_json::to_string(&filter).unwrap(); + println!("{}", s); + } } diff --git a/rpc-client-types/src/request.rs b/rpc-client-types/src/request.rs index ce7d980a99..ebbf42120d 100644 --- a/rpc-client-types/src/request.rs +++ b/rpc-client-types/src/request.rs @@ -67,6 +67,7 @@ pub enum RpcRequest { RegisterNode, RequestAirdrop, SendTransaction, + SanitizeTransaction, SimulateTransaction, SignVote, } @@ -132,6 +133,7 @@ impl fmt::Display for RpcRequest { RpcRequest::RegisterNode => "registerNode", RpcRequest::RequestAirdrop => "requestAirdrop", RpcRequest::SendTransaction => "sendTransaction", + RpcRequest::SanitizeTransaction => "sanitizeTransaction", RpcRequest::SimulateTransaction => "simulateTransaction", RpcRequest::SignVote => "signVote", }; diff --git a/rpc-client/src/nonblocking/rpc_client.rs b/rpc-client/src/nonblocking/rpc_client.rs index 8a4cfd99fc..dbaffee6ad 100644 --- a/rpc-client/src/nonblocking/rpc_client.rs +++ b/rpc-client/src/nonblocking/rpc_client.rs @@ -1240,6 +1240,26 @@ impl RpcClient { } } + pub async fn sanitize_transaction( + &self, + transaction: &impl SerializableTransaction, + config: RcpSanitizeTransactionConfig, + ) -> ClientResult<()> { + let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64); + let commitment = config.commitment.unwrap_or_default(); + let config = RcpSanitizeTransactionConfig { + encoding: Some(encoding), + commitment: Some(commitment), + ..config + }; + let serialized_encoded = serialize_and_encode(transaction, encoding)?; + self.send( + RpcRequest::SanitizeTransaction, + json!([serialized_encoded, config]), + ) + .await + } + /// Simulates sending a transaction. /// /// If the transaction fails, then the [`err`] field of the returned @@ -4776,6 +4796,27 @@ impl RpcClient { Ok(blockhash) } + pub async fn get_latest_blockhash_with_config( + &self, + config: RpcLatestBlockhashConfig, + ) -> ClientResult<(Hash, u64)> { + let RpcBlockhash { + blockhash, + last_valid_block_height, + } = self + .send::>(RpcRequest::GetLatestBlockhash, json!([config])) + .await? + .value; + let blockhash = blockhash.parse().map_err(|_| { + ClientError::new_with_request( + RpcError::ParseError("Hash".to_string()).into(), + RpcRequest::GetLatestBlockhash, + ) + })?; + Ok((blockhash, last_valid_block_height)) + } + + #[allow(deprecated)] pub async fn get_latest_blockhash_with_commitment( &self, commitment: CommitmentConfig, diff --git a/rpc/src/filter.rs b/rpc/src/filter.rs index 6c2b13d20f..4af5751cf8 100644 --- a/rpc/src/filter.rs +++ b/rpc/src/filter.rs @@ -9,5 +9,6 @@ pub fn filter_allows(filter: &RpcFilterType, account: &AccountSharedData) -> boo RpcFilterType::DataSize(size) => account.data().len() as u64 == *size, RpcFilterType::Memcmp(compare) => compare.bytes_match(account.data()), RpcFilterType::TokenAccountState => Account::valid_account_data(account.data()), + RpcFilterType::ValueCmp(compare) => compare.values_match(account.data()).unwrap_or(false), } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 94ba7ee588..bfe3e8697d 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2349,8 +2349,28 @@ impl JsonRpcRequestProcessor { } } - fn get_latest_blockhash(&self, config: RpcContextConfig) -> Result> { - let bank = self.get_bank_with_config(config)?; + fn get_latest_blockhash( + &self, + config: RpcLatestBlockhashConfig, + ) -> Result> { + let mut bank = self.get_bank_with_config(config.context)?; + if config.rollback > MAX_PROCESSING_AGE { + return Err(Error::invalid_params(format!("rollback exceeds ${MAX_PROCESSING_AGE}"))); + } + if config.rollback > 0 { + let r_bank_forks = self.bank_forks.read().unwrap(); + for _ in 0..config.rollback { + bank = match r_bank_forks.get(bank.parent_slot()).or_else(|| { + r_bank_forks + .banks_frozen + .get(&bank.parent_slot()) + .map(Clone::clone) + }) { + Some(bank) => bank, + None => return Err(Error::invalid_params("failed to rollback block")), + }; + } + } let blockhash = bank.last_blockhash(); let last_valid_block_height = bank .get_blockhash_last_valid_block_height(&blockhash) @@ -2386,10 +2406,18 @@ impl JsonRpcRequestProcessor { fn get_recent_prioritization_fees( &self, pubkeys: Vec, + percentile: Option, ) -> Result> { - Ok(self - .prioritization_fee_cache - .get_prioritization_fees(&pubkeys) + let cache = match percentile { + Some(percentile) => self + .prioritization_fee_cache + .get_prioritization_fees2(&pubkeys, percentile), + None => self + .prioritization_fee_cache + .get_prioritization_fees(&pubkeys), + }; + + Ok(cache .into_iter() .map(|(slot, prioritization_fee)| RpcPrioritizationFee { slot, @@ -2580,6 +2608,7 @@ fn get_spl_token_owner_filter( } } RpcFilterType::TokenAccountState => token_account_state_filter = true, + RpcFilterType::ValueCmp(_) => {} } } if data_size_filter == Some(account_packed_len as u64) @@ -2631,6 +2660,7 @@ fn get_spl_token_mint_filter( } } RpcFilterType::TokenAccountState => token_account_state_filter = true, + RpcFilterType::ValueCmp(_) => {} } } if data_size_filter == Some(account_packed_len as u64) @@ -3499,6 +3529,15 @@ pub mod rpc_full { config: Option, ) -> Result; + #[rpc(meta, name = "sanitizeTransaction")] + fn sanitize_transaction( + &self, + meta: Self::Metadata, + data: String, + config: Option, + enable_static_instruction_limit: bool, + ) -> Result<()>; + #[rpc(meta, name = "simulateTransaction")] fn simulate_transaction( &self, @@ -3566,7 +3605,7 @@ pub mod rpc_full { fn get_latest_blockhash( &self, meta: Self::Metadata, - config: Option, + config: Option, ) -> Result>; #[rpc(meta, name = "isBlockhashValid")] @@ -3597,6 +3636,7 @@ pub mod rpc_full { &self, meta: Self::Metadata, pubkey_strs: Option>, + config: Option, ) -> Result>; } @@ -3802,6 +3842,7 @@ pub mod rpc_full { debug!("send_transaction rpc request received"); let RpcSendTransactionConfig { skip_preflight, + skip_sanitize, preflight_commitment, encoding, max_retries, @@ -3826,35 +3867,54 @@ pub mod rpc_full { min_context_slot, })?; - let transaction = sanitize_transaction( - unsanitized_tx, - preflight_bank, - preflight_bank.get_reserved_account_keys(), - preflight_bank - .feature_set - .is_active(&agave_feature_set::static_instruction_limit::id()), - )?; - let blockhash = *transaction.message().recent_blockhash(); - let message_hash = *transaction.message_hash(); - let signature = *transaction.signature(); + let recent_blockhash = *unsanitized_tx.message.recent_blockhash(); + let (signature, sanitized_tx, message_hash) = if skip_preflight && skip_sanitize { + unsanitized_tx.sanitize().map_err(|_err| { + Error::invalid_params(format!( + "invalid transaction: {}", + TransactionError::SanitizeFailure + )) + })?; + let message_hash = unsanitized_tx.message.hash(); + (unsanitized_tx.signatures[0], None, message_hash) + } else { + let tx = sanitize_transaction( + unsanitized_tx, + preflight_bank, + preflight_bank.get_reserved_account_keys(), + preflight_bank + .feature_set + .is_active(&agave_feature_set::static_instruction_limit::id()), + )?; + let message_hash = *tx.message_hash(); + (*tx.signature(), Some(tx), message_hash) + }; let mut last_valid_block_height = preflight_bank - .get_blockhash_last_valid_block_height(&blockhash) + .get_blockhash_last_valid_block_height(&recent_blockhash) .unwrap_or(0); - let durable_nonce_info = transaction - .get_durable_nonce() - .map(|&pubkey| (pubkey, blockhash)); - if durable_nonce_info.is_some() || (skip_preflight && last_valid_block_height == 0) { - // While it uses a defined constant, this last_valid_block_height value is chosen arbitrarily. - // It provides a fallback timeout for durable-nonce transaction retries in case of - // malicious packing of the retry queue. Durable-nonce transactions are otherwise - // retried until the nonce is advanced. - last_valid_block_height = preflight_bank.block_height() + MAX_PROCESSING_AGE as u64; + let mut durable_nonce_info = None; + if let Some(sanitized_tx) = &sanitized_tx { + durable_nonce_info = sanitized_tx + .get_durable_nonce() + .map(|&pubkey| (pubkey, recent_blockhash)); + if durable_nonce_info.is_some() || (skip_preflight && last_valid_block_height == 0) + { + // While it uses a defined constant, this last_valid_block_height value is chosen arbitrarily. + // It provides a fallback timeout for durable-nonce transaction retries in case of + // malicious packing of the retry queue. Durable-nonce transactions are otherwise + // retried until the nonce is advanced. + last_valid_block_height = + preflight_bank.block_height() + MAX_PROCESSING_AGE as u64; + } } if !skip_preflight { - let verification_error = transaction.verify().err(); + let Some(sanitized_tx) = sanitized_tx else { + return Err(Error::invalid_params("sanitized transaction should exists")); + }; + let verification_error = sanitized_tx.verify().err(); if verification_error.is_none() && !meta.config.skip_preflight_health_check { match meta.health.check() { @@ -3876,12 +3936,6 @@ pub mod rpc_full { } } - let simulation_result = if let Some(err) = verification_error { - TransactionSimulationResult::new_error(err) - } else { - preflight_bank.simulate_transaction(&transaction, false) - }; - if let TransactionSimulationResult { result: Err(err), logs, @@ -3895,7 +3949,7 @@ pub mod rpc_full { post_balances: _, pre_token_balances: _, post_token_balances: _, - } = simulation_result + } = preflight_bank.simulate_transaction(&sanitized_tx, false) { match err { TransactionError::BlockhashNotFound => { @@ -3932,7 +3986,7 @@ pub mod rpc_full { meta, message_hash, signature, - blockhash, + recent_blockhash, wire_transaction, last_valid_block_height, durable_nonce_info, @@ -3940,6 +3994,46 @@ pub mod rpc_full { ) } + fn sanitize_transaction( + &self, + meta: Self::Metadata, + data: String, + config: Option, + enable_static_instruction_limit: bool, + ) -> Result<()> { + let RcpSanitizeTransactionConfig { + sig_verify, + commitment, + encoding, + min_context_slot, + } = config.unwrap_or_default(); + let tx_encoding = encoding.unwrap_or(UiTransactionEncoding::Base58); + let binary_encoding = tx_encoding.into_binary_encoding().ok_or_else(|| { + Error::invalid_params(format!( + "unsupported encoding: {tx_encoding}. Supported encodings: base58, base64" + )) + })?; + let (_wire_transaction, unsanitized_tx) = + decode_and_deserialize::(data, binary_encoding)?; + + let bank = &*meta.get_bank_with_config(RpcContextConfig { + commitment, + min_context_slot, + })?; + let transaction = sanitize_transaction( + unsanitized_tx, + bank, + bank.get_reserved_account_keys(), + enable_static_instruction_limit, + )?; + if sig_verify { + // verify_transaction(&transaction)?; + transaction.verify().err(); + } + + Ok(()) + } + fn simulate_transaction( &self, meta: Self::Metadata, @@ -4234,7 +4328,7 @@ pub mod rpc_full { fn get_latest_blockhash( &self, meta: Self::Metadata, - config: Option, + config: Option, ) -> Result> { debug!("get_latest_blockhash rpc request received"); meta.get_latest_blockhash(config.unwrap_or_default()) @@ -4290,6 +4384,7 @@ pub mod rpc_full { &self, meta: Self::Metadata, pubkey_strs: Option>, + config: Option, ) -> Result> { let pubkey_strs = pubkey_strs.unwrap_or_default(); debug!( @@ -4305,7 +4400,17 @@ pub mod rpc_full { .into_iter() .map(|pubkey_str| verify_pubkey(&pubkey_str)) .collect::>>()?; - meta.get_recent_prioritization_fees(pubkeys) + + let RpcRecentPrioritizationFeesConfig { percentile } = config.unwrap_or_default(); + if let Some(percentile) = percentile { + if percentile > 10_000 { + return Err(Error::invalid_params( + "Percentile is too big; max value is 10000".to_owned(), + )); + } + } + + meta.get_recent_prioritization_fees(pubkeys, percentile) } } } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 1a24cca363..66605ae53b 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -11,13 +11,13 @@ use { }, arc_swap::ArcSwap, log::*, - solana_clock::{BankId, Slot}, + solana_clock::{BankId, Slot, MAX_PROCESSING_AGE}, solana_hash::Hash, solana_measure::measure::Measure, solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph}, solana_unified_scheduler_logic::SchedulingMode, std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, ops::Index, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -93,6 +93,7 @@ struct SetRootTimings { pub struct BankForks { banks: HashMap, + pub banks_frozen: BTreeMap>, descendants: HashMap>, root: Arc, working_slot: Slot, @@ -151,6 +152,7 @@ impl BankForks { working_bank: Arc::new(ArcSwap::from(root_bank.clone())), }, banks, + banks_frozen: Default::default(), descendants, in_vote_only_mode: Arc::new(AtomicBool::new(false)), highest_slot_at_startup: 0, @@ -309,6 +311,13 @@ impl BankForks { pub fn remove(&mut self, slot: Slot) -> Option { let bank = self.banks.remove(&slot)?; + if bank.is_frozen() { + self.banks_frozen + .insert(bank.slot(), bank.clone_without_scheduler()); + while self.banks_frozen.len() > MAX_PROCESSING_AGE { + self.banks_frozen.pop_first(); + } + } for parent in bank.proper_ancestors() { let Entry::Occupied(mut entry) = self.descendants.entry(parent) else { panic!("this should not happen!"); diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs index 52324d5516..bcf2e0b962 100644 --- a/runtime/src/prioritization_fee.rs +++ b/runtime/src/prioritization_fee.rs @@ -23,14 +23,14 @@ struct PrioritizationFeeMetrics { // Count of attempted update on finalized PrioritizationFee attempted_update_on_finalized_fee_count: Saturating, - // Total transaction fees of non-vote transactions included in this slot. + // Total prioritization fees included in this slot. total_prioritization_fee: Saturating, - // The minimum compute unit price of prioritized transactions in this slot. - min_compute_unit_price: Option, + // The minimum prioritization fee of prioritized transactions in this slot. + min_prioritization_fee: Option, - // The maximum compute unit price of prioritized transactions in this slot. - max_compute_unit_price: u64, + // The maximum prioritization fee of prioritized transactions in this slot. + max_prioritization_fee: u64, // Accumulated time spent on tracking prioritization fee for each slot. total_update_elapsed_us: Saturating, @@ -49,8 +49,8 @@ impl PrioritizationFeeMetrics { self.attempted_update_on_finalized_fee_count += val; } - fn update_compute_unit_price(&mut self, cu_price: u64) { - if cu_price == 0 { + fn update_prioritization_fee(&mut self, fee: u64) { + if fee == 0 { self.non_prioritized_transactions_count += 1; return; } @@ -58,11 +58,11 @@ impl PrioritizationFeeMetrics { // update prioritized transaction fee metrics. self.prioritized_transactions_count += 1; - self.max_compute_unit_price = self.max_compute_unit_price.max(cu_price); + self.max_prioritization_fee = self.max_prioritization_fee.max(fee); - self.min_compute_unit_price = Some( - self.min_compute_unit_price - .map_or(cu_price, |min_cu_price| min_cu_price.min(cu_price)), + self.min_prioritization_fee = Some( + self.min_prioritization_fee + .map_or(fee, |min_fee| min_fee.min(fee)), ); } @@ -75,8 +75,8 @@ impl PrioritizationFeeMetrics { attempted_update_on_finalized_fee_count: Saturating(attempted_update_on_finalized_fee_count), total_prioritization_fee: Saturating(total_prioritization_fee), - min_compute_unit_price, - max_compute_unit_price, + min_prioritization_fee, + max_prioritization_fee, total_update_elapsed_us: Saturating(total_update_elapsed_us), } = self; datapoint_info!( @@ -113,11 +113,11 @@ impl PrioritizationFeeMetrics { i64 ), ( - "min_compute_unit_price", - min_compute_unit_price.unwrap_or(0) as i64, + "min_prioritization_fee", + min_prioritization_fee.unwrap_or(0) as i64, i64 ), - ("max_compute_unit_price", max_compute_unit_price as i64, i64), + ("max_prioritization_fee", max_prioritization_fee as i64, i64), ( "total_update_elapsed_us", total_update_elapsed_us as i64, @@ -144,14 +144,14 @@ pub enum PrioritizationFeeError { /// Block minimum prioritization fee stats, includes the minimum prioritization fee for a transaction in this /// block; and the minimum fee for each writable account in all transactions in this block. The only relevant /// write account minimum fees are those greater than the block minimum transaction fee, because the minimum fee needed to land -/// a transaction is determined by Max( min_compute_unit_price, min_writable_account_fees(key), ...) -#[derive(Debug)] +/// a transaction is determined by Max( min_transaction_fee, min_writable_account_fees(key), ...) +#[derive(Debug, Default)] pub struct PrioritizationFee { - // The minimum prioritization fee of transactions that landed in this block. - min_compute_unit_price: u64, + // Prioritization fee of transactions that landed in this block. + transaction_fees: Vec, - // The minimum prioritization fee of each writable account in transactions in this block. - min_writable_account_fees: HashMap, + // Prioritization fee of each writable account in transactions in this block. + writable_account_fees: HashMap>, // Default to `false`, set to `true` when a block is completed, therefore the minimum fees recorded // are finalized, and can be made available for use (e.g., RPC query) @@ -161,43 +161,23 @@ pub struct PrioritizationFee { metrics: PrioritizationFeeMetrics, } -impl Default for PrioritizationFee { - fn default() -> Self { - PrioritizationFee { - min_compute_unit_price: u64::MAX, - min_writable_account_fees: HashMap::new(), - is_finalized: false, - metrics: PrioritizationFeeMetrics::default(), - } - } -} - impl PrioritizationFee { /// Update self for minimum transaction fee in the block and minimum fee for each writable account. - pub fn update( - &mut self, - compute_unit_price: u64, - prioritization_fee: u64, - writable_accounts: Vec, - ) { + pub fn update(&mut self, transaction_fee: u64, writable_accounts: Vec) { let (_, update_us) = measure_us!({ if !self.is_finalized { - if compute_unit_price < self.min_compute_unit_price { - self.min_compute_unit_price = compute_unit_price; - } + self.transaction_fees.push(transaction_fee); for write_account in writable_accounts { - self.min_writable_account_fees + self.writable_account_fees .entry(write_account) - .and_modify(|write_lock_fee| { - *write_lock_fee = std::cmp::min(*write_lock_fee, compute_unit_price) - }) - .or_insert(compute_unit_price); + .or_default() + .push(transaction_fee); } self.metrics - .accumulate_total_prioritization_fee(prioritization_fee); - self.metrics.update_compute_unit_price(compute_unit_price); + .accumulate_total_prioritization_fee(transaction_fee); + self.metrics.update_prioritization_fee(transaction_fee); } else { self.metrics .increment_attempted_update_on_finalized_fee_count(1); @@ -207,38 +187,54 @@ impl PrioritizationFee { self.metrics.accumulate_total_update_elapsed_us(update_us); } - /// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are - /// removed to reduce memory footprint when mark_block_completed() is called. - fn prune_irrelevant_writable_accounts(&mut self) { - self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; - self.min_writable_account_fees - .retain(|_, account_fee| account_fee > &mut self.min_compute_unit_price); - self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; - } - pub fn mark_block_completed(&mut self) -> Result<(), PrioritizationFeeError> { if self.is_finalized { return Err(PrioritizationFeeError::BlockIsAlreadyFinalized); } - self.prune_irrelevant_writable_accounts(); self.is_finalized = true; + + self.transaction_fees.sort(); + for fees in self.writable_account_fees.values_mut() { + fees.sort() + } + + self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; + self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; + Ok(()) } - pub fn get_min_compute_unit_price(&self) -> Option { - (self.min_compute_unit_price != u64::MAX).then_some(self.min_compute_unit_price) + pub fn get_min_transaction_fee(&self) -> Option { + self.transaction_fees.first().copied() + } + + fn get_percentile(fees: &[u64], percentile: u16) -> Option { + let index = (percentile as usize).min(9_999) * fees.len() / 10_000; + fees.get(index).copied() + } + + pub fn get_transaction_fee(&self, percentile: u16) -> Option { + Self::get_percentile(&self.transaction_fees, percentile) + } + + pub fn get_min_writable_account_fee(&self, key: &Pubkey) -> Option { + self.writable_account_fees + .get(key) + .and_then(|fees| fees.first().copied()) } - pub fn get_writable_account_fee(&self, key: &Pubkey) -> Option { - self.min_writable_account_fees.get(key).copied() + pub fn get_writable_account_fee(&self, key: &Pubkey, percentile: u16) -> Option { + self.writable_account_fees + .get(key) + .and_then(|fees| Self::get_percentile(fees, percentile)) } - pub fn get_writable_account_fees(&self) -> impl Iterator { - self.min_writable_account_fees.iter() + pub fn get_writable_account_fees(&self) -> impl Iterator)> { + self.writable_account_fees.iter() } pub fn get_writable_accounts_count(&self) -> usize { - self.min_writable_account_fees.len() + self.writable_account_fees.len() } pub fn is_finalized(&self) -> bool { @@ -247,6 +243,33 @@ impl PrioritizationFee { pub fn report_metrics(&self, slot: Slot) { self.metrics.report(slot); + + // report this slot's min_transaction_fee and top 10 min_writable_account_fees + let min_transaction_fee = self.get_min_transaction_fee().unwrap_or(0); + datapoint_info!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", "block", String), + ("min_prioritization_fee", min_transaction_fee as i64, i64), + ); + + let mut accounts_fees: Vec<(&Pubkey, u64)> = self + .get_writable_account_fees() + .filter_map(|(account, fees)| { + fees.first() + .copied() + .map(|min_account_fee| (account, min_transaction_fee.min(min_account_fee))) + }) + .collect(); + accounts_fees.sort_by(|lh, rh| rh.1.cmp(&lh.1)); + for (account_key, fee) in accounts_fees.into_iter().take(10) { + datapoint_trace!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", account_key.to_string(), String), + ("min_prioritization_fee", fee as i64, i64), + ); + } } } @@ -260,128 +283,128 @@ mod tests { let write_account_a = Pubkey::new_unique(); let write_account_b = Pubkey::new_unique(); let write_account_c = Pubkey::new_unique(); - let tx_fee = 10; let mut prioritization_fee = PrioritizationFee::default(); - assert!(prioritization_fee.get_min_compute_unit_price().is_none()); + assert!(prioritization_fee.get_min_transaction_fee().is_none()); // Assert for 1st transaction - // [cu_px, write_accounts...] --> [block, account_a, account_b, account_c] + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] // ----------------------------------------------------------------------- // [5, a, b ] --> [5, 5, 5, nil ] { - prioritization_fee.update(5, tx_fee, vec![write_account_a, write_account_b]); - assert_eq!(5, prioritization_fee.get_min_compute_unit_price().unwrap()); + prioritization_fee.update(5, vec![write_account_a, write_account_b]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert!(prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .is_none()); + + prioritization_fee.is_finalized = false; } // Assert for second transaction: - // [cu_px, write_accounts...] --> [block, account_a, account_b, account_c] + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] // ----------------------------------------------------------------------- // [9, b, c ] --> [5, 5, 5, 9 ] { - prioritization_fee.update(9, tx_fee, vec![write_account_b, write_account_c]); - assert_eq!(5, prioritization_fee.get_min_compute_unit_price().unwrap()); + prioritization_fee.update(9, vec![write_account_b, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert_eq!( 9, prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .unwrap() ); + + prioritization_fee.is_finalized = false; } // Assert for third transaction: - // [cu_px, write_accounts...] --> [block, account_a, account_b, account_c] + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] // ----------------------------------------------------------------------- // [2, a, c ] --> [2, 2, 5, 2 ] { - prioritization_fee.update(2, tx_fee, vec![write_account_a, write_account_c]); - assert_eq!(2, prioritization_fee.get_min_compute_unit_price().unwrap()); + prioritization_fee.update(2, vec![write_account_a, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 2, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert_eq!( 2, prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .unwrap() ); + + prioritization_fee.is_finalized = false; } - // assert after prune, account a and c should be removed from cache to save space + // assert after sort { - prioritization_fee.prune_irrelevant_writable_accounts(); - assert_eq!(1, prioritization_fee.min_writable_account_fees.len()); - assert_eq!(2, prioritization_fee.get_min_compute_unit_price().unwrap()); - assert!(prioritization_fee - .get_writable_account_fee(&write_account_a) - .is_none()); + prioritization_fee.update(2, vec![write_account_a, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!(3, prioritization_fee.writable_account_fees.len()); + assert_eq!( + 2, + prioritization_fee + .get_min_writable_account_fee(&write_account_a) + .unwrap() + ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) + .unwrap() + ); + assert_eq!( + 2, + prioritization_fee + .get_min_writable_account_fee(&write_account_c) .unwrap() ); - assert!(prioritization_fee - .get_writable_account_fee(&write_account_c) - .is_none()); } } - #[test] - fn test_total_prioritization_fee() { - let mut prioritization_fee = PrioritizationFee::default(); - prioritization_fee.update(0, 10, vec![]); - assert_eq!(10, prioritization_fee.metrics.total_prioritization_fee.0); - - prioritization_fee.update(10, u64::MAX, vec![]); - assert_eq!( - u64::MAX, - prioritization_fee.metrics.total_prioritization_fee.0 - ); - - prioritization_fee.update(10, 100, vec![]); - assert_eq!( - u64::MAX, - prioritization_fee.metrics.total_prioritization_fee.0 - ); - } - #[test] fn test_mark_block_completed() { let mut prioritization_fee = PrioritizationFee::default(); diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 6dc4dde146..a1a6b282d4 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -1,5 +1,5 @@ use { - crate::{bank::Bank, prioritization_fee::PrioritizationFee}, + crate::{bank::Bank, prioritization_fee::*}, crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError}, log::*, solana_accounts_db::account_locks::validate_account_locks, @@ -47,9 +47,6 @@ struct PrioritizationFeeCacheMetrics { // Accumulated time spent on finalizing block prioritization fees. total_block_finalize_elapsed_us: AtomicU64, - - // Accumulated time spent on calculate transaction fees. - total_calculate_prioritization_fee_elapsed_us: AtomicU64, } impl PrioritizationFeeCacheMetrics { @@ -83,11 +80,6 @@ impl PrioritizationFeeCacheMetrics { .fetch_add(val, Ordering::Relaxed); } - fn accumulate_total_calculate_prioritization_fee_elapsed_us(&self, val: u64) { - self.total_calculate_prioritization_fee_elapsed_us - .fetch_add(val, Ordering::Relaxed); - } - fn report(&self, slot: Slot) { datapoint_info!( "block_prioritization_fee_counters", @@ -125,12 +117,6 @@ impl PrioritizationFeeCacheMetrics { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "total_calculate_prioritization_fee_elapsed_us", - self.total_calculate_prioritization_fee_elapsed_us - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ); } } @@ -140,8 +126,7 @@ enum CacheServiceUpdate { TransactionUpdate { slot: Slot, bank_id: BankId, - compute_unit_price: u64, - prioritization_fee: u64, + transaction_fee: u64, writable_accounts: Vec, }, BankFinalized { @@ -248,25 +233,18 @@ impl PrioritizationFeeCache { .map(|(_, key)| *key) .collect(); - let (prioritization_fee, calculate_prioritization_fee_us) = measure_us!({ - solana_fee_structure::FeeBudgetLimits::from(compute_budget_limits) - .prioritization_fee - }); - self.metrics - .accumulate_total_calculate_prioritization_fee_elapsed_us( - calculate_prioritization_fee_us, - ); - self.sender .send(CacheServiceUpdate::TransactionUpdate { slot: bank.slot(), bank_id: bank.bank_id(), - compute_unit_price: compute_budget_limits.compute_unit_price, - prioritization_fee, + transaction_fee: compute_budget_limits.compute_unit_price, writable_accounts, }) .unwrap_or_else(|err| { - warn!("prioritization fee cache transaction updates failed: {err:?}"); + warn!( + "prioritization fee cache transaction updates failed: {:?}", + err + ); }); } }); @@ -281,7 +259,10 @@ impl PrioritizationFeeCache { self.sender .send(CacheServiceUpdate::BankFinalized { slot, bank_id }) .unwrap_or_else(|err| { - warn!("prioritization fee cache signalling bank frozen failed: {err:?}") + warn!( + "prioritization fee cache signalling bank frozen failed: {:?}", + err + ) }); } @@ -290,8 +271,7 @@ impl PrioritizationFeeCache { unfinalized: &mut UnfinalizedPrioritizationFees, slot: Slot, bank_id: BankId, - compute_unit_price: u64, - prioritization_fee: u64, + transaction_fee: u64, writable_accounts: Vec, metrics: &PrioritizationFeeCacheMetrics, ) { @@ -300,7 +280,7 @@ impl PrioritizationFeeCache { .or_default() .entry(bank_id) .or_default() - .update(compute_unit_price, prioritization_fee, writable_accounts)); + .update(transaction_fee, writable_accounts)); metrics.accumulate_total_entry_update_elapsed_us(entry_update_us); metrics.accumulate_successful_transaction_update_count(1); } @@ -339,15 +319,15 @@ impl PrioritizationFeeCache { // It should be rare that optimistically confirmed bank had no prioritized // transactions, but duplicated and unconfirmed bank had. if pre_purge_bank_count > 0 && post_purge_bank_count == 0 { - warn!( - "Finalized bank has empty prioritization fee cache. slot {slot} bank id \ - {bank_id}" - ); + warn!("Finalized bank has empty prioritization fee cache. slot {slot} bank id {bank_id}"); } if let Some(prioritization_fee) = &mut prioritization_fee { if let Err(err) = prioritization_fee.mark_block_completed() { - error!("Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {err:?}"); + error!( + "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", + err + ); } prioritization_fee.report_metrics(slot); } @@ -394,15 +374,13 @@ impl PrioritizationFeeCache { CacheServiceUpdate::TransactionUpdate { slot, bank_id, - compute_unit_price, - prioritization_fee, + transaction_fee, writable_accounts, } => Self::update_cache( &mut unfinalized, slot, bank_id, - compute_unit_price, - prioritization_fee, + transaction_fee, writable_accounts, &metrics, ), @@ -436,11 +414,36 @@ impl PrioritizationFeeCache { .iter() .map(|(slot, slot_prioritization_fee)| { let mut fee = slot_prioritization_fee - .get_min_compute_unit_price() + .get_min_transaction_fee() .unwrap_or_default(); for account_key in account_keys { if let Some(account_fee) = - slot_prioritization_fee.get_writable_account_fee(account_key) + slot_prioritization_fee.get_min_writable_account_fee(account_key) + { + fee = std::cmp::max(fee, account_fee); + } + } + (*slot, fee) + }) + .collect() + } + + pub fn get_prioritization_fees2( + &self, + account_keys: &[Pubkey], + percentile: u16, + ) -> Vec<(Slot, u64)> { + self.cache + .read() + .unwrap() + .iter() + .map(|(slot, slot_prioritization_fee)| { + let mut fee = slot_prioritization_fee + .get_transaction_fee(percentile) + .unwrap_or_default(); + for account_key in account_keys { + if let Some(account_fee) = + slot_prioritization_fee.get_writable_account_fee(account_key, percentile) { fee = std::cmp::max(fee, account_fee); } @@ -571,10 +574,19 @@ mod tests { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id()); let lock = prioritization_fee_cache.cache.read().unwrap(); let fee = lock.get(&slot).unwrap(); - assert_eq!(2, fee.get_min_compute_unit_price().unwrap()); - assert!(fee.get_writable_account_fee(&write_account_a).is_none()); - assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); - assert!(fee.get_writable_account_fee(&write_account_c).is_none()); + assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 2, + fee.get_min_writable_account_fee(&write_account_a).unwrap() + ); + assert_eq!( + 5, + fee.get_min_writable_account_fee(&write_account_b).unwrap() + ); + assert_eq!( + 2, + fee.get_min_writable_account_fee(&write_account_c).unwrap() + ); } } diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index d615b5bdac..e7776a608a 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -9,6 +9,7 @@ use { backoff::{future::retry, Error as BackoffError, ExponentialBackoff}, log::*, std::{ + collections::HashMap, str::FromStr, time::{Duration, Instant}, }, @@ -781,6 +782,31 @@ impl) -> InterceptedRequestResult> BigTable { .collect()) } + pub async fn get_bincode_cells2( + &mut self, + table: &str, + keys: &[RowKey], + ) -> Result<(HashMap>, usize)> + where + T: serde::de::DeserializeOwned, + { + let mut size = 0; + let rows = self + .get_multi_row_data(table, keys) + .await? + .into_iter() + .map(|(key, row_data)| { + size += row_data.len(); + let key_str = key.to_string(); + ( + key, + deserialize_bincode_cell_data(&row_data, table, key_str), + ) + }) + .collect(); + Ok((rows, size)) + } + pub async fn get_protobuf_cell

(&mut self, table: &str, key: RowKey) -> Result

where P: prost::Message + Default, @@ -827,6 +853,33 @@ impl) -> InterceptedRequestResult> BigTable { })) } + pub async fn get_protobuf_or_bincode_cells2<'a, B, P>( + &mut self, + table: &'a str, + row_keys: impl IntoIterator, + ) -> Result, usize)> + 'a> + where + B: serde::de::DeserializeOwned, + P: prost::Message + Default, + { + Ok(self + .get_multi_row_data( + table, + row_keys.into_iter().collect::>().as_slice(), + ) + .await? + .into_iter() + .map(|(key, row_data)| { + let size = row_data.iter().fold(0, |acc, row| acc + row.1.len()); + let key_str = key.to_string(); + ( + key, + deserialize_protobuf_or_bincode_cell_data(&row_data, table, key_str).unwrap(), + size, + ) + })) + } + pub async fn put_bincode_cells( &mut self, table: &str, diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 7c0b3aed5b..fbcfc287cd 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -85,6 +85,16 @@ impl std::convert::From for Error { } } +impl Error { + pub fn is_rpc_unauthenticated(&self) -> bool { + if let Error::BigTableError(bigtable::Error::Rpc(status)) = self { + status.code() == tonic::Code::Unauthenticated + } else { + false + } + } +} + pub type Result = std::result::Result; // Convert a slot to its bucket representation whereby lower slots are always lexically ordered @@ -742,6 +752,170 @@ impl LedgerStorage { } } + // Fetches and gets a vector of confirmed blocks via a multirow fetch + pub async fn get_confirmed_blocks_with_data2<'a>( + &self, + slots: &'a [Slot], + ) -> Result, usize)> + 'a> { + debug!( + "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}", + slots + ); + inc_new_counter_debug!("storage-bigtable-query", 1); + let mut bigtable = self.connection.client(); + let row_keys = slots.iter().copied().map(slot_to_blocks_key); + Ok(bigtable + .get_protobuf_or_bincode_cells2("blocks", row_keys) + .await? + .map( + |(row_key, block_cell_data, size): ( + RowKey, + bigtable::CellData, + usize, + )| { + let block = match block_cell_data { + bigtable::CellData::Bincode(block) => block.into(), + bigtable::CellData::Protobuf(block) => match block.try_into() { + Ok(block) => block, + Err(_) => return (None, size), + }, + }; + (Some((key_to_slot(&row_key).unwrap(), block)), size) + }, + )) + } + + /// Fetch blocks and transactions + pub async fn get_confirmed_blocks_transactions( + &self, + blocks: &[Slot], + transactions: &[String], + transactions_status: &[String], + ) -> Result<( + Vec<(Slot, ConfirmedBlock)>, + Vec, + HashMap, + usize, + )> { + let mut bigtable = self.connection.client(); + + let mut blocks_resp = Vec::with_capacity(blocks.len()); + let mut transactions_resp = Vec::with_capacity(transactions.len()); + let mut transactions_status_resp = HashMap::new(); + let mut size = 0; + + // Collect slots for request + let mut blocks_map: HashMap> = HashMap::new(); + for block in blocks { + blocks_map.entry(*block).or_default(); + } + + // Fetch transactions info and collect slots + if !transactions.is_empty() || !transactions_status.is_empty() { + let mut keys = Vec::with_capacity(transactions.len() + transactions_status.len()); + keys.extend(transactions.iter().cloned()); + keys.extend(transactions_status.iter().cloned()); + + let (mut cells, bt_size) = bigtable + .get_bincode_cells2::("tx", keys.as_slice()) + .await?; + size += bt_size; + + for signature in transactions_status { + if let Some(Ok(info)) = cells.get(signature) { + transactions_status_resp.insert( + signature.clone(), + TransactionStatus { + slot: info.slot, + confirmations: None, + status: match &info.err { + Some(err) => Err(err.clone()), + None => Ok(()), + }, + err: info.err.clone(), + confirmation_status: Some(TransactionConfirmationStatus::Finalized), + }, + ); + } + } + for signature in transactions { + if let Some((signature, Ok(TransactionInfo { slot, index, .. }))) = + cells.remove_entry(signature) + { + blocks_map.entry(slot).or_default().push((index, signature)); + } + } + } + + // Fetch blocks + if !blocks_map.is_empty() { + let keys = blocks_map.keys().copied().collect::>(); + let cells = self.get_confirmed_blocks_with_data2(&keys).await?; + for (maybe_slot_block, row_size) in cells { + size += row_size; + if let Some((slot, block)) = maybe_slot_block { + if let Some(entries) = blocks_map.get(&slot) { + for (index, signature) in entries.iter() { + if let Some(tx_with_meta) = block.transactions.get(*index as usize) { + if tx_with_meta.transaction_signature().to_string() != *signature { + warn!( + "Transaction info or confirmed block for {} is corrupt", + signature + ); + } else { + transactions_resp.push(ConfirmedTransactionWithStatusMeta { + slot, + tx_with_meta: tx_with_meta.clone(), + block_time: block.block_time, + }); + } + } + } + blocks_resp.push((slot, block)); + } + } + } + } + + Ok(( + blocks_resp, + transactions_resp, + transactions_status_resp, + size, + )) + } + + /// Fetch TX index for transactions + pub async fn get_txindex( + &self, + transactions: &[String], + ) -> Result<(Vec>, usize)> { + let mut bigtable = self.connection.client(); + + let mut response = Vec::with_capacity(transactions.len()); + let mut size = 0; + + // Fetch transactions info and collect slots + if transactions.is_empty() { + response.resize(transactions.len(), None); + } else { + let (cells, bt_size) = bigtable + .get_bincode_cells2::("tx", transactions) + .await?; + size += bt_size; + + for signature in transactions { + if let Some(Ok(TransactionInfo { slot, index, .. })) = cells.get(signature) { + response.push(Some((*slot, *index))); + } else { + response.push(None); + } + } + } + + Ok((response, size)) + } + /// Get confirmed signatures for the provided address, in descending ledger order /// /// address: address to search for