Skip to content
Open
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ borsh = { version = "1.5.7", features = ["derive", "unstable__schema"] }
bs58 = { version = "0.5.1", default-features = false }
bv = "0.11.1"
byte-unit = "4.0.19"
bytemuck = "1.24.0"
bytemuck = "1.25.0"
bytemuck_derive = "1.10.2"
bytes = "1.10"
bzip2 = "0.4.4"
Expand Down
163 changes: 159 additions & 4 deletions core/src/completed_data_sets_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_entry::entry::Entry,
solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo},
solana_ledger::{
blockstore::{Blockstore, CompletedDataSetInfo},
deshred_transaction_notifier_interface::DeshredTransactionNotifierArc,
},
solana_measure::measure::Measure,
solana_message::{v0::LoadedAddresses, VersionedMessage},
solana_metrics::*,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::bank_forks::BankForks,
solana_signature::Signature,
solana_svm_transaction::message_address_table_lookup::SVMMessageAddressTableLookup,
solana_transaction::{
simple_vote_transaction_checker::is_simple_vote_transaction_impl,
versioned::VersionedTransaction,
},
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::Duration,
Expand All @@ -23,6 +35,46 @@ use {
pub type CompletedDataSetsReceiver = Receiver<Vec<CompletedDataSetInfo>>;
pub type CompletedDataSetsSender = Sender<Vec<CompletedDataSetInfo>>;

/// Check if a versioned transaction is a simple vote transaction.
/// This avoids cloning by extracting the required data directly.
fn is_simple_vote_transaction(tx: &VersionedTransaction) -> bool {
let is_legacy = matches!(&tx.message, VersionedMessage::Legacy(_));
let (account_keys, instructions) = match &tx.message {
VersionedMessage::Legacy(msg) => (&msg.account_keys[..], &msg.instructions[..]),
VersionedMessage::V0(msg) => (&msg.account_keys[..], &msg.instructions[..]),
};
let instruction_programs = instructions
.iter()
.filter_map(|ix| account_keys.get(ix.program_id_index as usize));
is_simple_vote_transaction_impl(&tx.signatures, is_legacy, instruction_programs)
}

/// Load addresses from address lookup tables for a versioned transaction.
/// Returns None for legacy transactions or if address resolution fails.
/// Takes a Bank reference to avoid repeated lock acquisition.
fn load_transaction_addresses(
tx: &VersionedTransaction,
bank: &solana_runtime::bank::Bank,
) -> Option<LoadedAddresses> {
let VersionedMessage::V0(message) = &tx.message else {
// Legacy transactions don't have address table lookups
return None;
};

if message.address_table_lookups.is_empty() {
return None;
}

bank.load_addresses_from_ref(
message
.address_table_lookups
.iter()
.map(SVMMessageAddressTableLookup::from),
)
.ok()
.map(|(addresses, _deactivation_slot)| addresses)
}

pub struct CompletedDataSetsService {
thread_hdl: JoinHandle<()>,
}
Expand All @@ -32,8 +84,10 @@ impl CompletedDataSetsService {
completed_sets_receiver: CompletedDataSetsReceiver,
blockstore: Arc<Blockstore>,
rpc_subscriptions: Arc<RpcSubscriptions>,
deshred_transaction_notifier: Option<DeshredTransactionNotifierArc>,
exit: Arc<AtomicBool>,
max_slots: Arc<MaxSlots>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
let thread_hdl = Builder::new()
.name("solComplDataSet".to_string())
Expand All @@ -47,7 +101,9 @@ impl CompletedDataSetsService {
&completed_sets_receiver,
&blockstore,
&rpc_subscriptions,
&deshred_transaction_notifier,
&max_slots,
&bank_forks,
) {
break;
}
Expand All @@ -62,13 +118,78 @@ impl CompletedDataSetsService {
completed_sets_receiver: &CompletedDataSetsReceiver,
blockstore: &Blockstore,
rpc_subscriptions: &RpcSubscriptions,
deshred_transaction_notifier: &Option<DeshredTransactionNotifierArc>,
max_slots: &Arc<MaxSlots>,
bank_forks: &RwLock<BankForks>,
) -> Result<(), RecvTimeoutError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let handle_completed_data_set_info = |completed_data_set_info| {

let mut batch_measure = Measure::start("deshred_geyser_batch");

// Get root bank once per batch to minimize lock contention
let root_bank = deshred_transaction_notifier
.as_ref()
.and_then(|_| bank_forks.read().ok())
.map(|forks| forks.root_bank());

// Metrics accumulators
let mut total_lut_load_us: u64 = 0;
let mut total_notify_us: u64 = 0;
let mut total_transactions: u64 = 0;
let mut total_entries: u64 = 0;
let mut total_data_sets: u64 = 0;
let mut lut_transactions: u64 = 0;

let handle_completed_data_set_info = |completed_data_set_info: CompletedDataSetInfo,
total_lut_load_us: &mut u64,
total_notify_us: &mut u64,
total_transactions: &mut u64,
total_entries: &mut u64,
total_data_sets: &mut u64,
lut_transactions: &mut u64| {
let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) {
Ok(entries) => {
*total_data_sets += 1;
*total_entries += entries.len() as u64;

// Notify deshred transactions if notifier is enabled
if let Some(notifier) = deshred_transaction_notifier {
for entry in entries.iter() {
for tx in &entry.transactions {
if let Some(signature) = tx.signatures.first() {
*total_transactions += 1;
let is_vote = is_simple_vote_transaction(tx);

// Measure LUT loading time
let mut lut_measure = Measure::start("load_lut");
let loaded_addresses = root_bank
.as_ref()
.and_then(|bank| load_transaction_addresses(tx, bank));
lut_measure.stop();

if loaded_addresses.is_some() {
*lut_transactions += 1;
*total_lut_load_us += lut_measure.as_us();
}

// Measure notification time
let mut notify_measure = Measure::start("notify_deshred");
notifier.notify_deshred_transaction(
slot,
signature,
is_vote,
tx,
loaded_addresses.as_ref(),
);
notify_measure.stop();
*total_notify_us += notify_measure.as_us();
}
}
}
}

// Existing: notify signatures for RPC subscriptions
let transactions = Self::get_transaction_signatures(entries);
if !transactions.is_empty() {
rpc_subscriptions.notify_signatures_received((slot, transactions));
Expand All @@ -78,15 +199,49 @@ impl CompletedDataSetsService {
}
slot
};

let slots = completed_sets_receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
.chain(completed_sets_receiver.try_iter())
.flatten()
.map(handle_completed_data_set_info);
.map(|info| {
handle_completed_data_set_info(
info,
&mut total_lut_load_us,
&mut total_notify_us,
&mut total_transactions,
&mut total_entries,
&mut total_data_sets,
&mut lut_transactions,
)
});

if let Some(slot) = slots.max() {
max_slots.shred_insert.fetch_max(slot, Ordering::Relaxed);
}

batch_measure.stop();

// Report metrics if we processed any transactions
if total_transactions > 0 {
datapoint_info!(
"deshred_geyser_timing",
("batch_total_us", batch_measure.as_us() as i64, i64),
("notify_total_us", total_notify_us as i64, i64),
("lut_load_total_us", total_lut_load_us as i64, i64),
("transactions_count", total_transactions as i64, i64),
("lut_transactions_count", lut_transactions as i64, i64),
("entries_count", total_entries as i64, i64),
("data_sets_count", total_data_sets as i64, i64),
(
"avg_notify_us",
(total_notify_us / total_transactions) as i64,
i64
),
);
}

Ok(())
}

Expand Down
9 changes: 7 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,26 +822,29 @@ impl Validator {
let (
accounts_update_notifier,
transaction_notifier,
deshred_transaction_notifier,
entry_notifier,
block_metadata_notifier,
slot_status_notifier,
) = if let Some(service) = &geyser_plugin_service {
(
service.get_accounts_update_notifier(),
service.get_transaction_notifier(),
service.get_deshred_transaction_notifier(),
service.get_entry_notifier(),
service.get_block_metadata_notifier(),
service.get_slot_status_notifier(),
)
} else {
(None, None, None, None, None)
(None, None, None, None, None, None)
};

info!(
"Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
entry_notifier: {}",
deshred_transaction_notifier: {}, entry_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some(),
deshred_transaction_notifier.is_some(),
entry_notifier.is_some()
);

Expand Down Expand Up @@ -1307,8 +1310,10 @@ impl Validator {
completed_data_sets_receiver,
blockstore.clone(),
rpc_subscriptions.clone(),
deshred_transaction_notifier.clone(),
exit.clone(),
max_slots.clone(),
bank_forks.clone(),
);
(
Some(completed_data_sets_sender),
Expand Down
1 change: 1 addition & 0 deletions geyser-plugin-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ agave-unstable-api = []
log = { workspace = true, features = ["std"] }
solana-clock = { workspace = true }
solana-hash = { workspace = true }
solana-message = { workspace = true }
solana-signature = { workspace = true }
solana-transaction = { workspace = true }
solana-transaction-status = { workspace = true }
Expand Down
47 changes: 47 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use {
solana_clock::{Slot, UnixTimestamp},
solana_hash::Hash,
solana_message::v0::LoadedAddresses,
solana_signature::Signature,
solana_transaction::{sanitized::SanitizedTransaction, versioned::VersionedTransaction},
solana_transaction_status::{Reward, RewardsAndNumPartitions, TransactionStatusMeta},
Expand Down Expand Up @@ -191,6 +192,33 @@ pub enum ReplicaTransactionInfoVersions<'a> {
V0_0_3(&'a ReplicaTransactionInfoV3<'a>),
}

/// Information about a transaction after deshredding (when entries are formed from shreds).
/// This is sent before any execution occurs.
/// Unlike ReplicaTransactionInfo, this does not include TransactionStatusMeta
/// since execution has not happened yet.
#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplicaDeshredTransactionInfo<'a> {
/// The transaction signature, used for identifying the transaction.
pub signature: &'a Signature,

/// Indicates if the transaction is a simple vote transaction.
pub is_vote: bool,

/// The versioned transaction.
pub transaction: &'a VersionedTransaction,

/// Addresses loaded from address lookup tables for V0 transactions.
/// This is None for legacy transactions or if address resolution failed.
pub loaded_addresses: Option<&'a LoadedAddresses>,
}

/// A wrapper to future-proof ReplicaDeshredTransactionInfo handling.
#[repr(u32)]
pub enum ReplicaDeshredTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaDeshredTransactionInfo<'a>),
}

#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplicaEntryInfo<'a> {
Expand Down Expand Up @@ -471,6 +499,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> {
Ok(())
}

/// Called when a transaction is deshredded (entries formed from shreds).
/// This is triggered before any execution occurs. Unlike notify_transaction,
/// this does not include execution metadata (TransactionStatusMeta).
#[allow(unused_variables)]
fn notify_deshred_transaction(
&self,
transaction: ReplicaDeshredTransactionInfoVersions,
slot: Slot,
) -> Result<()> {
Ok(())
}

/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
Expand Down Expand Up @@ -500,4 +540,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}

/// Check if the plugin is interested in deshred transaction data.
/// Default is false -- if the plugin is interested in receiving
/// transactions when they are deshredded, return true.
fn deshred_transaction_notifications_enabled(&self) -> bool {
false
}
}
1 change: 1 addition & 0 deletions geyser-plugin-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ solana-entry = { workspace = true }
solana-hash = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
solana-message = { workspace = true }
solana-metrics = { workspace = true }
solana-pubkey = { workspace = true }
solana-rpc = { workspace = true }
Expand Down
Loading