From 649a82a01848dbb8dc81c3514a6b04b6a989652d Mon Sep 17 00:00:00 2001 From: armyhaylenko Date: Thu, 16 Jan 2025 19:38:31 +0200 Subject: [PATCH] chore(api): optimize retrieval of assets --- Cargo.lock | 3 +- rocks-db/Cargo.toml | 1 + rocks-db/src/clients/asset_client.rs | 158 +++++++++++++++------------ rocks-db/src/column.rs | 3 + 4 files changed, 97 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3019d3c7..a689a53e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -6510,6 +6510,7 @@ dependencies = [ "entities", "figment", "flatbuffers 24.3.25", + "futures", "futures-util", "hex", "indicatif", diff --git a/rocks-db/Cargo.toml b/rocks-db/Cargo.toml index 4555a54a..e0f7882b 100644 --- a/rocks-db/Cargo.toml +++ b/rocks-db/Cargo.toml @@ -16,6 +16,7 @@ figment = { workspace = true } lz4 = { workspace = true } tar = { workspace = true } reqwest = { workspace = true } +futures = { workspace = true } futures-util = { workspace = true } metrics-utils = { path = "../metrics_utils" } tokio = { workspace = true } diff --git a/rocks-db/src/clients/asset_client.rs b/rocks-db/src/clients/asset_client.rs index d639134d..11d893a2 100644 --- a/rocks-db/src/clients/asset_client.rs +++ b/rocks-db/src/clients/asset_client.rs @@ -6,7 +6,7 @@ use entities::{ enums::{AssetType, SpecificationAssetClass, TokenMetadataEdition}, models::{EditionData, PubkeyWithSlot}, }; -use futures_util::FutureExt; +use futures::future::Either; use solana_sdk::pubkey::Pubkey; use crate::{ @@ -148,65 +148,95 @@ impl Storage { owner_address: &Option, options: &Options, ) -> Result { - let assets_leaf_fut = self.asset_leaf_data.batch_get(asset_ids.clone()); let token_accounts_fut = if let Some(owner_address) = owner_address { - self.get_raw_token_accounts(Some(*owner_address), None, None, None, None, None, true) - .boxed() + Either::Left(self.get_raw_token_accounts( + Some(*owner_address), + None, + None, + None, + None, + None, + true, + )) } else { - async { Ok(Vec::new()) }.boxed() + Either::Right(async { Ok(Vec::new()) }) }; - let spl_mints_fut = self.spl_mints.batch_get(asset_ids.clone()); let inscriptions_fut = if options.show_inscription { - self.inscriptions.batch_get(asset_ids.clone()).boxed() + Either::Left(self.inscriptions.batch_get(asset_ids.clone())) } else { - async { Ok(Vec::new()) }.boxed() + Either::Right(async { Ok(Vec::new()) }) }; - let (mut assets_data, assets_collection_pks, mut urls) = - self.get_assets_with_collections_and_urls(asset_ids.clone()).await?; - let mut mpl_core_collections = HashMap::new(); - // todo: consider async/future here, but not likely as the very next call depends on urls from this one - if !assets_collection_pks.is_empty() { + + let (assets_leaf, assets_with_collectios_and_urls, token_accounts, spl_mints, inscriptions) = tokio::join!( + self.asset_leaf_data.batch_get(asset_ids.clone()), + self.get_assets_with_collections_and_urls(asset_ids.clone()), + token_accounts_fut, + self.spl_mints.batch_get(asset_ids.clone()), + inscriptions_fut, + ); + + let (mut assets_data, assets_collection_pks, mut urls) = assets_with_collectios_and_urls?; + + let offchain_data_fut = + self.asset_offchain_data.batch_get(urls.clone().into_values().collect::>()); + let asset_collection_data_fut = if assets_collection_pks.is_empty() { + Either::Left(async { Ok(Vec::new()) }) + } else { let assets_collection_pks = assets_collection_pks.into_iter().collect::>(); let start_time = chrono::Utc::now(); - let collection_d = self.db.batched_multi_get_cf( - &self.asset_data.handle(), - assets_collection_pks.clone(), - false, - ); - for asset in collection_d { - let asset = asset?; - if let Some(asset) = asset { - let asset = fb::root_as_asset_complete_details(asset.as_ref()) - .map_err(|e| StorageError::Common(e.to_string()))?; - let key = - Pubkey::new_from_array(asset.pubkey().unwrap().bytes().try_into().unwrap()); - if options.show_collection_metadata { - asset - .dynamic_details() - .and_then(|d| d.url()) - .and_then(|u| u.value()) - .map(|u| urls.insert(key, u.to_string())); - assets_data.insert(key, asset.into()); - } - if let Some(collection) = asset.collection() { - mpl_core_collections.insert(key, AssetCollection::from(collection)); - } + let red_metrics = self.red_metrics.clone(); + let db = self.db.clone(); + Either::Right(async move { + tokio::task::spawn_blocking(move || { + let collection_d = db.batched_multi_get_cf( + &db.cf_handle(AssetCompleteDetails::NAME).unwrap(), + assets_collection_pks, + false, + ); + red_metrics.observe_request( + ROCKS_COMPONENT, + BATCH_GET_ACTION, + "get_asset_collection", + start_time, + ); + // since we cannot return referenced data from this closure, + // we need to convert the db slice to an owned value (Vec in this case). + collection_d + .into_iter() + .map(|res_opt| res_opt.map(|opt| opt.map(|slice| slice.as_ref().to_vec()))) + .collect() + }) + .await + .map_err(|e| StorageError::Common(e.to_string())) + }) + }; + + let (offchain_data, asset_collection_data) = + tokio::join!(offchain_data_fut, asset_collection_data_fut); + + let mut mpl_core_collections = HashMap::new(); + for asset in asset_collection_data? { + let asset = asset?; + if let Some(asset) = asset { + let asset = fb::root_as_asset_complete_details(asset.as_ref()) + .map_err(|e| StorageError::Common(e.to_string()))?; + let key = + Pubkey::new_from_array(asset.pubkey().unwrap().bytes().try_into().unwrap()); + if options.show_collection_metadata { + asset + .dynamic_details() + .and_then(|d| d.url()) + .and_then(|u| u.value()) + .map(|u| urls.insert(key, u.to_string())); + assets_data.insert(key, asset.into()); + } + if let Some(collection) = asset.collection() { + mpl_core_collections.insert(key, AssetCollection::from(collection)); } } - self.red_metrics.observe_request( - ROCKS_COMPONENT, - BATCH_GET_ACTION, - "get_asset_collection", - start_time, - ); } - let offchain_data_fut = - self.asset_offchain_data.batch_get(urls.clone().into_values().collect::>()); - - let (assets_leaf, offchain_data, token_accounts, spl_mints) = - tokio::join!(assets_leaf_fut, offchain_data_fut, token_accounts_fut, spl_mints_fut); let offchain_data = offchain_data .map_err(|e| StorageError::Common(e.to_string()))? .into_iter() @@ -223,27 +253,21 @@ impl Storage { }) .collect::>(); - let (inscriptions, inscriptions_data) = if options.show_inscription { - let inscriptions = inscriptions_fut + let inscriptions = inscriptions + .map_err(|e| StorageError::Common(e.to_string()))? + .into_iter() + .filter_map(|asset| asset.map(|a| (a.root, a))) + .collect::>(); + let inscriptions_data = to_map!( + self.inscription_data + .batch_get( + inscriptions + .values() + .map(|inscription| inscription.inscription_data_account) + .collect(), + ) .await - .map_err(|e| StorageError::Common(e.to_string()))? - .into_iter() - .filter_map(|asset| asset.map(|a| (a.root, a))) - .collect::>(); - let inscriptions_data = to_map!( - self.inscription_data - .batch_get( - inscriptions - .values() - .map(|inscription| inscription.inscription_data_account) - .collect(), - ) - .await - ); - (inscriptions, inscriptions_data) - } else { - (HashMap::new(), HashMap::new()) - }; + ); let token_accounts = token_accounts.map_err(|e| StorageError::Common(e.to_string()))?; let spl_mints = to_map!(spl_mints); diff --git a/rocks-db/src/column.rs b/rocks-db/src/column.rs index 920b768b..e94a1b4a 100644 --- a/rocks-db/src/column.rs +++ b/rocks-db/src/column.rs @@ -204,6 +204,9 @@ where } pub async fn batch_get(&self, keys: Vec) -> Result>> { + if keys.is_empty() { + return Ok(Vec::new()); + } let start_time = chrono::Utc::now(); let db = self.backend.clone(); let keys = keys.clone();