Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 1 addition & 27 deletions magicblock-account-cloner/src/remote_account_cloner_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,17 +789,11 @@ where
pubkey: &Pubkey,
lamports: u64,
owner: &Pubkey,
balance_pda: Option<&Pubkey>,
_balance_pda: Option<&Pubkey>,
) -> AccountClonerResult<Signature> {
self.account_dumper
.dump_feepayer_account(pubkey, lamports, owner)
.map_err(AccountClonerError::AccountDumperError)
.inspect(|_| {
metrics::inc_account_clone(metrics::AccountClone::FeePayer {
pubkey: &pubkey.to_string(),
balance_pda: balance_pda.map(|p| p.to_string()).as_deref(),
});
})
}

fn do_clone_undelegated_account(
Expand All @@ -810,14 +804,6 @@ where
self.account_dumper
.dump_undelegated_account(pubkey, account)
.map_err(AccountClonerError::AccountDumperError)
.inspect(|_| {
metrics::inc_account_clone(
metrics::AccountClone::Undelegated {
pubkey: &pubkey.to_string(),
owner: &account.owner().to_string(),
},
);
})
}

fn do_clone_delegated_account(
Expand Down Expand Up @@ -846,13 +832,6 @@ where
self.account_dumper
.dump_delegated_account(pubkey, account, &record.owner)
.map_err(AccountClonerError::AccountDumperError)
.inspect(|_| {
metrics::inc_account_clone(metrics::AccountClone::Delegated {
// TODO(bmuddha): optimize metrics, remove .to_string()
pubkey: &pubkey.to_string(),
owner: &record.owner.to_string(),
});
})
}

async fn do_clone_program_accounts(
Expand Down Expand Up @@ -908,11 +887,6 @@ where
idl_account,
)
.map_err(AccountClonerError::AccountDumperError)
.inspect(|_| {
metrics::inc_account_clone(metrics::AccountClone::Program {
pubkey: &pubkey.to_string(),
});
})
}

async fn fetch_program_idl(
Expand Down
1 change: 1 addition & 0 deletions magicblock-aperture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ magicblock-chainlink = { workspace = true }
magicblock-config = { workspace = true }
magicblock-core = { workspace = true }
magicblock-ledger = { workspace = true }
magicblock-metrics = { workspace = true }
magicblock-version = { workspace = true }

# solana
Expand Down
21 changes: 11 additions & 10 deletions magicblock-aperture/src/requests/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use log::*;
use magicblock_core::traits::AccountsBank;
use magicblock_metrics::metrics::ENSURE_ACCOUNTS_TIME;
use std::{mem::size_of, ops::Range};

use base64::{prelude::BASE64_STANDARD, Engine};
Expand Down Expand Up @@ -85,15 +86,17 @@ impl HttpDispatcher {
&self,
pubkey: &Pubkey,
) -> Option<AccountSharedData> {
debug!("Ensuring account {pubkey}");
let _timer = ENSURE_ACCOUNTS_TIME
.with_label_values(&["account"])
.start_timer();
let _ = self
.chainlink
.ensure_accounts(&[*pubkey], None)
.await
.inspect_err(|e| {
// There is nothing we can do if fetching the account fails
// Log the error and return whatever is in the accounts db
error!("Failed to ensure account {pubkey}: {e}");
warn!("Failed to ensure account {pubkey}: {e}");
});
self.accountsdb.get_account(pubkey)
}
Expand All @@ -104,14 +107,9 @@ impl HttpDispatcher {
&self,
pubkeys: &[Pubkey],
) -> Vec<Option<AccountSharedData>> {
if log::log_enabled!(log::Level::Debug) {
let pubkeys = pubkeys
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ");
debug!("Ensuring accounts {pubkeys}");
}
let _timer = ENSURE_ACCOUNTS_TIME
.with_label_values(&["multi-account"])
.start_timer();
let _ = self
.chainlink
.ensure_accounts(pubkeys, None)
Expand Down Expand Up @@ -177,6 +175,9 @@ impl HttpDispatcher {
&self,
transaction: &SanitizedTransaction,
) -> RpcResult<()> {
let _timer = ENSURE_ACCOUNTS_TIME
.with_label_values(&["transaction"])
.start_timer();
match self
.chainlink
.ensure_transaction_accounts(transaction)
Expand Down
10 changes: 6 additions & 4 deletions magicblock-aperture/src/requests/http/send_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use log::*;
use magicblock_metrics::metrics::{
TRANSACTION_PROCESSING_TIME, TRANSACTION_SKIP_PREFLIGHT,
};
use solana_rpc_client_api::config::RpcSendTransactionConfig;
use solana_transaction_error::TransactionError;
use solana_transaction_status::UiTransactionEncoding;
Expand All @@ -15,8 +17,10 @@ impl HttpDispatcher {
&self,
request: &mut JsonRequest,
) -> HandlerResult {
let _timer = TRANSACTION_PROCESSING_TIME.start_timer();
let (transaction_str, config) =
parse_params!(request.params()?, String, RpcSendTransactionConfig);

let transaction_str: String = some_or_err!(transaction_str);
let config = config.unwrap_or_default();
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base58);
Expand All @@ -32,16 +36,14 @@ impl HttpDispatcher {
{
return Err(TransactionError::AlreadyProcessed.into());
}
debug!("Received transaction: {signature}, ensuring accounts");
self.ensure_transaction_accounts(&transaction).await?;

// Based on the preflight flag, either execute and await the result,
// or schedule (fire-and-forget) for background processing.
if config.skip_preflight {
debug!("Scheduling transaction: {signature}");
TRANSACTION_SKIP_PREFLIGHT.inc();
self.transactions_scheduler.schedule(transaction).await?;
} else {
debug!("Executing transaction: {signature}");
self.transactions_scheduler.execute(transaction).await?;
}

Expand Down
78 changes: 78 additions & 0 deletions magicblock-aperture/src/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,84 @@ pub(crate) enum JsonRpcWsMethod {
SlotUnsubscribe,
}

impl JsonRpcHttpMethod {
pub(crate) fn as_str(&self) -> &'static str {
match self {
JsonRpcHttpMethod::GetAccountInfo => "getAccountInfo",
JsonRpcHttpMethod::GetBalance => "getBalance",
JsonRpcHttpMethod::GetBlock => "getBlock",
JsonRpcHttpMethod::GetBlockCommitment => "getBlockCommitment",
JsonRpcHttpMethod::GetBlockHeight => "getBlockHeight",
JsonRpcHttpMethod::GetBlockTime => "getBlockTime",
JsonRpcHttpMethod::GetBlocks => "getBlocks",
JsonRpcHttpMethod::GetBlocksWithLimit => "getBlocksWithLimit",
JsonRpcHttpMethod::GetClusterNodes => "getClusterNodes",
JsonRpcHttpMethod::GetEpochInfo => "getEpochInfo",
JsonRpcHttpMethod::GetEpochSchedule => "getEpochSchedule",
JsonRpcHttpMethod::GetFeeForMessage => "getFeeForMessage",
JsonRpcHttpMethod::GetFirstAvailableBlock => {
"getFirstAvailableBlock"
}
JsonRpcHttpMethod::GetGenesisHash => "getGenesisHash",
JsonRpcHttpMethod::GetHealth => "getHealth",
JsonRpcHttpMethod::GetHighestSnapshotSlot => {
"getHighestSnapshotSlot"
}
JsonRpcHttpMethod::GetIdentity => "getIdentity",
JsonRpcHttpMethod::GetLargestAccounts => "getLargestAccounts",
JsonRpcHttpMethod::GetLatestBlockhash => "getLatestBlockhash",
JsonRpcHttpMethod::GetMultipleAccounts => "getMultipleAccounts",
JsonRpcHttpMethod::GetProgramAccounts => "getProgramAccounts",
JsonRpcHttpMethod::GetSignatureStatuses => "getSignatureStatuses",
JsonRpcHttpMethod::GetSignaturesForAddress => {
"getSignaturesForAddress"
}
JsonRpcHttpMethod::GetSlot => "getSlot",
JsonRpcHttpMethod::GetSlotLeader => "getSlotLeader",
JsonRpcHttpMethod::GetSlotLeaders => "getSlotLeaders",
JsonRpcHttpMethod::GetSupply => "getSupply",
JsonRpcHttpMethod::GetTokenAccountBalance => {
"getTokenAccountBalance"
}
JsonRpcHttpMethod::GetTokenAccountsByDelegate => {
"getTokenAccountsByDelegate"
}
JsonRpcHttpMethod::GetTokenAccountsByOwner => {
"getTokenAccountsByOwner"
}
JsonRpcHttpMethod::GetTokenLargestAccounts => {
"getTokenLargestAccounts"
}
JsonRpcHttpMethod::GetTokenSupply => "getTokenSupply",
JsonRpcHttpMethod::GetTransaction => "getTransaction",
JsonRpcHttpMethod::GetTransactionCount => "getTransactionCount",
JsonRpcHttpMethod::GetVersion => "getVersion",
JsonRpcHttpMethod::IsBlockhashValid => "isBlockhashValid",
JsonRpcHttpMethod::MinimumLedgerSlot => "minimumLedgerSlot",
JsonRpcHttpMethod::RequestAirdrop => "requestAirdrop",
JsonRpcHttpMethod::SendTransaction => "sendTransaction",
JsonRpcHttpMethod::SimulateTransaction => "simulateTransaction",
}
}
}

impl JsonRpcWsMethod {
pub(crate) fn as_str(&self) -> &'static str {
match self {
JsonRpcWsMethod::AccountSubscribe => "accountSubscribe",
JsonRpcWsMethod::AccountUnsubscribe => "accountUnsubscribe",
JsonRpcWsMethod::LogsSubscribe => "logsSubscribe",
JsonRpcWsMethod::LogsUnsubscribe => "logsUnsubscribe",
JsonRpcWsMethod::ProgramSubscribe => "programSubscribe",
JsonRpcWsMethod::ProgramUnsubscribe => "programUnsubscribe",
JsonRpcWsMethod::SignatureSubscribe => "signatureSubscribe",
JsonRpcWsMethod::SignatureUnsubscribe => "signatureUnsubscribe",
JsonRpcWsMethod::SlotSubscribe => "slotSubscribe",
JsonRpcWsMethod::SlotUnsubscribe => "slotUnsubscribe",
}
}
}

/// A helper macro for easily parsing positional parameters from a JSON-RPC request.
///
/// This macro simplifies the process of extracting and deserializing parameters
Expand Down
8 changes: 8 additions & 0 deletions magicblock-aperture/src/server/http/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use magicblock_core::link::{
transactions::TransactionSchedulerHandle, DispatchEndpoints,
};
use magicblock_ledger::Ledger;
use magicblock_metrics::metrics::{
RPC_REQUESTS_COUNT, RPC_REQUEST_HANDLING_TIME,
};

use crate::{
requests::{
Expand Down Expand Up @@ -111,6 +114,11 @@ impl HttpDispatcher {
async fn process(&self, request: &mut JsonHttpRequest) -> HandlerResult {
// Route the request to the correct handler based on the method name.
use crate::requests::JsonRpcHttpMethod::*;
let method = request.method.as_str();
RPC_REQUESTS_COUNT.with_label_values(&[method]).inc();
let _timer = RPC_REQUEST_HANDLING_TIME
.with_label_values(&[method])
.start_timer();
match request.method {
GetAccountInfo => self.get_account_info(request).await,
GetBalance => self.get_balance(request).await,
Expand Down
4 changes: 4 additions & 0 deletions magicblock-aperture/src/server/websocket/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
use super::connection::ConnectionID;
use hyper::body::Bytes;
use json::{Serialize, Value};
use magicblock_metrics::metrics::RPC_REQUESTS_COUNT;
use tokio::sync::mpsc;

/// The sender half of an MPSC channel used to push subscription notifications
Expand Down Expand Up @@ -65,6 +66,9 @@ impl WsDispatcher {
request: &mut JsonWsRequest,
) -> RpcResult<WsDispatchResult> {
use JsonRpcWsMethod::*;
RPC_REQUESTS_COUNT
.with_label_values(&[request.method.as_str()])
.inc();
let result = match request.method {
AccountSubscribe => self.account_subscribe(request).await,
ProgramSubscribe => self.program_subscribe(request).await,
Expand Down
27 changes: 26 additions & 1 deletion magicblock-aperture/src/state/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
},
};

use magicblock_metrics::metrics::RPC_WS_SUBSCRIPTIONS_COUNT;
use parking_lot::RwLock;
use solana_account::ReadableAccount;
use solana_pubkey::Pubkey;
Expand Down Expand Up @@ -105,7 +106,7 @@ impl SubscriptionsDb {
.await
.or_insert_with(|| UpdateSubscribers(vec![]))
.add_subscriber(chan, encoder.clone());

let metric = SubMetricGuard::new("account");
// Create a cleanup future that will be executed when the handle is dropped.
let accounts = self.accounts.clone();
let callback = async move {
Expand All @@ -116,6 +117,7 @@ impl SubscriptionsDb {
if entry.remove_subscriber(conid, &encoder) {
let _ = entry.remove();
}
drop(metric)
};
let cleanup = CleanUp(Some(Box::pin(callback)));
SubscriptionHandle { id, cleanup }
Expand Down Expand Up @@ -146,13 +148,15 @@ impl SubscriptionsDb {
.add_subscriber(chan, encoder.clone());

let programs = self.programs.clone();
let metric = SubMetricGuard::new("program");
let callback = async move {
let Some(mut entry) = programs.get_async(&pubkey).await else {
return;
};
if entry.remove_subscriber(conid, &encoder) {
let _ = entry.remove();
}
drop(metric)
};
let cleanup = CleanUp(Some(Box::pin(callback)));
SubscriptionHandle { id, cleanup }
Expand Down Expand Up @@ -212,8 +216,10 @@ impl SubscriptionsDb {
let id = self.logs.write().add_subscriber(chan, encoder.clone());

let logs = self.logs.clone();
let metric = SubMetricGuard::new("logs");
let callback = async move {
logs.write().remove_subscriber(conid, &encoder);
drop(metric)
};
let cleanup = CleanUp(Some(Box::pin(callback)));
SubscriptionHandle { id, cleanup }
Expand All @@ -239,8 +245,10 @@ impl SubscriptionsDb {
let id = subscriber.id;

let slot = self.slot.clone();
let metric = SubMetricGuard::new("slot");
let callback = async move {
slot.write().txs.remove(&conid);
drop(metric)
};
let cleanup = CleanUp(Some(Box::pin(callback)));
SubscriptionHandle { id, cleanup }
Expand Down Expand Up @@ -391,3 +399,20 @@ impl<E> Drop for UpdateSubscriber<E> {
self.live.store(false, Ordering::Relaxed);
}
}

pub(crate) struct SubMetricGuard(&'static str);

impl SubMetricGuard {
pub(crate) fn new(name: &'static str) -> Self {
RPC_WS_SUBSCRIPTIONS_COUNT.with_label_values(&[name]).inc();
Self(name)
}
}

impl Drop for SubMetricGuard {
fn drop(&mut self) {
RPC_WS_SUBSCRIPTIONS_COUNT
.with_label_values(&[self.0])
.dec();
}
}
Loading
Loading