Skip to content

Commit

Permalink
chore(api): optimize retrieval of assets
Browse files Browse the repository at this point in the history
  • Loading branch information
armyhaylenko committed Jan 16, 2025
1 parent 39dce55 commit 649a82a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 68 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rocks-db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ figment = { workspace = true }
lz4 = { workspace = true }
tar = { workspace = true }
reqwest = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
metrics-utils = { path = "../metrics_utils" }
tokio = { workspace = true }
Expand Down
158 changes: 91 additions & 67 deletions rocks-db/src/clients/asset_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use entities::{
enums::{AssetType, SpecificationAssetClass, TokenMetadataEdition},
models::{EditionData, PubkeyWithSlot},
};
use futures_util::FutureExt;
use futures::future::Either;
use solana_sdk::pubkey::Pubkey;

use crate::{
Expand Down Expand Up @@ -148,65 +148,95 @@ impl Storage {
owner_address: &Option<Pubkey>,
options: &Options,
) -> Result<AssetSelectedMaps> {
let assets_leaf_fut = self.asset_leaf_data.batch_get(asset_ids.clone());
let token_accounts_fut = if let Some(owner_address) = owner_address {
self.get_raw_token_accounts(Some(*owner_address), None, None, None, None, None, true)
.boxed()
Either::Left(self.get_raw_token_accounts(
Some(*owner_address),
None,
None,
None,
None,
None,
true,
))
} else {
async { Ok(Vec::new()) }.boxed()
Either::Right(async { Ok(Vec::new()) })
};
let spl_mints_fut = self.spl_mints.batch_get(asset_ids.clone());

let inscriptions_fut = if options.show_inscription {
self.inscriptions.batch_get(asset_ids.clone()).boxed()
Either::Left(self.inscriptions.batch_get(asset_ids.clone()))
} else {
async { Ok(Vec::new()) }.boxed()
Either::Right(async { Ok(Vec::new()) })
};
let (mut assets_data, assets_collection_pks, mut urls) =
self.get_assets_with_collections_and_urls(asset_ids.clone()).await?;
let mut mpl_core_collections = HashMap::new();
// todo: consider async/future here, but not likely as the very next call depends on urls from this one
if !assets_collection_pks.is_empty() {

let (assets_leaf, assets_with_collectios_and_urls, token_accounts, spl_mints, inscriptions) = tokio::join!(
self.asset_leaf_data.batch_get(asset_ids.clone()),
self.get_assets_with_collections_and_urls(asset_ids.clone()),
token_accounts_fut,
self.spl_mints.batch_get(asset_ids.clone()),
inscriptions_fut,
);

let (mut assets_data, assets_collection_pks, mut urls) = assets_with_collectios_and_urls?;

let offchain_data_fut =
self.asset_offchain_data.batch_get(urls.clone().into_values().collect::<Vec<_>>());
let asset_collection_data_fut = if assets_collection_pks.is_empty() {
Either::Left(async { Ok(Vec::new()) })
} else {
let assets_collection_pks = assets_collection_pks.into_iter().collect::<Vec<_>>();
let start_time = chrono::Utc::now();
let collection_d = self.db.batched_multi_get_cf(
&self.asset_data.handle(),
assets_collection_pks.clone(),
false,
);
for asset in collection_d {
let asset = asset?;
if let Some(asset) = asset {
let asset = fb::root_as_asset_complete_details(asset.as_ref())
.map_err(|e| StorageError::Common(e.to_string()))?;
let key =
Pubkey::new_from_array(asset.pubkey().unwrap().bytes().try_into().unwrap());
if options.show_collection_metadata {
asset
.dynamic_details()
.and_then(|d| d.url())
.and_then(|u| u.value())
.map(|u| urls.insert(key, u.to_string()));
assets_data.insert(key, asset.into());
}
if let Some(collection) = asset.collection() {
mpl_core_collections.insert(key, AssetCollection::from(collection));
}
let red_metrics = self.red_metrics.clone();
let db = self.db.clone();
Either::Right(async move {
tokio::task::spawn_blocking(move || {
let collection_d = db.batched_multi_get_cf(
&db.cf_handle(AssetCompleteDetails::NAME).unwrap(),
assets_collection_pks,
false,
);
red_metrics.observe_request(
ROCKS_COMPONENT,
BATCH_GET_ACTION,
"get_asset_collection",
start_time,
);
// since we cannot return referenced data from this closure,
// we need to convert the db slice to an owned value (Vec in this case).
collection_d
.into_iter()
.map(|res_opt| res_opt.map(|opt| opt.map(|slice| slice.as_ref().to_vec())))
.collect()
})
.await
.map_err(|e| StorageError::Common(e.to_string()))
})
};

let (offchain_data, asset_collection_data) =
tokio::join!(offchain_data_fut, asset_collection_data_fut);

let mut mpl_core_collections = HashMap::new();
for asset in asset_collection_data? {
let asset = asset?;
if let Some(asset) = asset {
let asset = fb::root_as_asset_complete_details(asset.as_ref())
.map_err(|e| StorageError::Common(e.to_string()))?;
let key =
Pubkey::new_from_array(asset.pubkey().unwrap().bytes().try_into().unwrap());
if options.show_collection_metadata {
asset
.dynamic_details()
.and_then(|d| d.url())
.and_then(|u| u.value())
.map(|u| urls.insert(key, u.to_string()));
assets_data.insert(key, asset.into());
}
if let Some(collection) = asset.collection() {
mpl_core_collections.insert(key, AssetCollection::from(collection));
}
}
self.red_metrics.observe_request(
ROCKS_COMPONENT,
BATCH_GET_ACTION,
"get_asset_collection",
start_time,
);
}

let offchain_data_fut =
self.asset_offchain_data.batch_get(urls.clone().into_values().collect::<Vec<_>>());

let (assets_leaf, offchain_data, token_accounts, spl_mints) =
tokio::join!(assets_leaf_fut, offchain_data_fut, token_accounts_fut, spl_mints_fut);
let offchain_data = offchain_data
.map_err(|e| StorageError::Common(e.to_string()))?
.into_iter()
Expand All @@ -223,27 +253,21 @@ impl Storage {
})
.collect::<HashMap<_, _>>();

let (inscriptions, inscriptions_data) = if options.show_inscription {
let inscriptions = inscriptions_fut
let inscriptions = inscriptions
.map_err(|e| StorageError::Common(e.to_string()))?
.into_iter()
.filter_map(|asset| asset.map(|a| (a.root, a)))
.collect::<HashMap<_, _>>();
let inscriptions_data = to_map!(
self.inscription_data
.batch_get(
inscriptions
.values()
.map(|inscription| inscription.inscription_data_account)
.collect(),
)
.await
.map_err(|e| StorageError::Common(e.to_string()))?
.into_iter()
.filter_map(|asset| asset.map(|a| (a.root, a)))
.collect::<HashMap<_, _>>();
let inscriptions_data = to_map!(
self.inscription_data
.batch_get(
inscriptions
.values()
.map(|inscription| inscription.inscription_data_account)
.collect(),
)
.await
);
(inscriptions, inscriptions_data)
} else {
(HashMap::new(), HashMap::new())
};
);
let token_accounts = token_accounts.map_err(|e| StorageError::Common(e.to_string()))?;
let spl_mints = to_map!(spl_mints);

Expand Down
3 changes: 3 additions & 0 deletions rocks-db/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ where
}

pub async fn batch_get(&self, keys: Vec<C::KeyType>) -> Result<Vec<Option<C::ValueType>>> {
if keys.is_empty() {
return Ok(Vec::new());
}
let start_time = chrono::Utc::now();
let db = self.backend.clone();
let keys = keys.clone();
Expand Down

0 comments on commit 649a82a

Please sign in to comment.