From e0c057ec3dd4cba55e52b7e98f242654ed79a205 Mon Sep 17 00:00:00 2001 From: Vadim Date: Thu, 28 Dec 2023 17:58:14 +0100 Subject: [PATCH 1/6] feat: add metadata url to rocks --- entities/src/models.rs | 1 + .../src/bubblegum_updates_processor.rs | 1 + nft_ingester/src/gapfiller.rs | 1 + nft_ingester/src/mplx_updates_processor.rs | 2 + postgre-client/src/asset_index_client.rs | 22 ++----- postgre-client/src/lib.rs | 61 ++++++++++++++++++- rocks-db/src/asset.rs | 2 + rocks-db/src/asset_streaming_client.rs | 1 + rocks-db/src/batch_client.rs | 2 + 9 files changed, 75 insertions(+), 18 deletions(-) diff --git a/entities/src/models.rs b/entities/src/models.rs index 863daecd8..bd1ffcba3 100644 --- a/entities/src/models.rs +++ b/entities/src/models.rs @@ -59,6 +59,7 @@ pub struct CompleteAssetDetails { pub onchain_data: Option>, pub creators: Updated>, pub royalty_amount: Updated, + pub url: Updated, // From AssetAuthority as Tuple pub authority: Updated, diff --git a/nft_ingester/src/bubblegum_updates_processor.rs b/nft_ingester/src/bubblegum_updates_processor.rs index e0b36c703..5d68fd0a3 100644 --- a/nft_ingester/src/bubblegum_updates_processor.rs +++ b/nft_ingester/src/bubblegum_updates_processor.rs @@ -463,6 +463,7 @@ impl BubblegumTxProcessor { Some(cl.seq), args.seller_fee_basis_points, ), + url: Updated::new(bundle.slot, Some(cl.seq), args.uri.clone()), ..Default::default() }; diff --git a/nft_ingester/src/gapfiller.rs b/nft_ingester/src/gapfiller.rs index 822bb50e8..23f143d49 100644 --- a/nft_ingester/src/gapfiller.rs +++ b/nft_ingester/src/gapfiller.rs @@ -59,6 +59,7 @@ pub fn insert_gaped_data( }), creators: data.creators, royalty_amount: data.royalty_amount, + url: data.url, }, )?; diff --git a/nft_ingester/src/mplx_updates_processor.rs b/nft_ingester/src/mplx_updates_processor.rs index c539831fa..75c243ceb 100644 --- a/nft_ingester/src/mplx_updates_processor.rs +++ b/nft_ingester/src/mplx_updates_processor.rs @@ -160,6 +160,7 @@ impl MplxAccsProcessor { onchain_data: asset.onchain_data.clone(), creators: asset.creators.clone(), royalty_amount: asset.royalty_amount.clone(), + url: asset.url.clone(), } } else { asset.clone() @@ -347,6 +348,7 @@ impl MplxAccsProcessor { None, data.seller_fee_basis_points, ), + url: Updated::new(metadata_info.slot, None, uri.clone()), }); models.tasks.push(Task { diff --git a/postgre-client/src/asset_index_client.rs b/postgre-client/src/asset_index_client.rs index b50c4f116..bfa69c7a9 100644 --- a/postgre-client/src/asset_index_client.rs +++ b/postgre-client/src/asset_index_client.rs @@ -46,23 +46,11 @@ impl AssetIndexStorage for PgClient { if !metadata_urls.is_empty() { metadata_urls.sort(); - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("INSERT INTO metadata (mtd_url) "); - query_builder.push_values(metadata_urls.iter(), |mut builder, metadata_url| { - builder.push_bind(metadata_url); - }); - query_builder.push(" ON CONFLICT (mtd_url) DO NOTHING RETURNING mtd_id, mtd_url;"); - let query = query_builder.build_query_as::<(i64, String)>(); - let metadata_ids = query - .fetch_all(&mut transaction) - .await - .map_err(|e| e.to_string())?; - - // convert metadata_ids to a map - metadata_url_map = metadata_ids - .iter() - .map(|(id, url)| (url.clone(), *id)) - .collect::>(); + self.insert_metadata(&mut transaction, metadata_urls.clone()) + .await?; + metadata_url_map = self + .get_metadata_ids(&mut transaction, metadata_urls) + .await?; } let mut asset_indexes = asset_indexes.to_vec(); diff --git a/postgre-client/src/lib.rs b/postgre-client/src/lib.rs index cb062ed22..33a7b86c4 100644 --- a/postgre-client/src/lib.rs +++ b/postgre-client/src/lib.rs @@ -1,7 +1,8 @@ use sqlx::{ postgres::{PgConnectOptions, PgPoolOptions}, - PgPool, + PgPool, Postgres, QueryBuilder, Row, Transaction, }; +use std::collections::HashMap; pub mod asset_filter_client; pub mod asset_index_client; @@ -30,4 +31,62 @@ impl PgClient { pub fn new_with_pool(pool: PgPool) -> Self { Self { pool } } + + pub async fn insert_metadata( + &self, + transaction: &mut Transaction<'_, Postgres>, + metadata_urls: Vec, + ) -> Result<(), String> { + let mut query_builder: QueryBuilder<'_, Postgres> = + QueryBuilder::new("INSERT INTO metadata (mtd_url) "); + query_builder.push_values(metadata_urls.iter(), |mut builder, metadata_url| { + builder.push_bind(metadata_url); + }); + query_builder.push(" ON CONFLICT (mtd_url) DO NOTHING;"); + + let query = query_builder.build(); + query + .execute(transaction) + .await + .map_err(|err| err.to_string())?; + + Ok(()) + } + + pub async fn get_metadata_ids( + &self, + transaction: &mut Transaction<'_, Postgres>, + metadata_urls: Vec, + ) -> Result, String> { + let mut query_builder: QueryBuilder<'_, Postgres> = + QueryBuilder::new("SELECT mtd_id, mtd_url FROM metadata WHERE mtd_url in ("); + + let urls_len = metadata_urls.len(); + + for (i, url) in metadata_urls.iter().enumerate() { + query_builder.push_bind(url); + if i < urls_len - 1 { + query_builder.push(","); + } + } + query_builder.push(");"); + + let query = query_builder.build(); + + let rows_result = query + .fetch_all(transaction) + .await + .map_err(|err| err.to_string())?; + + let mut metadata_ids_map = HashMap::new(); + + for row in rows_result { + let metadata_id: i64 = row.get("mtd_id"); + let metadata_url: String = row.get("mtd_url"); + + metadata_ids_map.insert(metadata_url, metadata_id); + } + + Ok(metadata_ids_map) + } } diff --git a/rocks-db/src/asset.rs b/rocks-db/src/asset.rs index 334807054..e644e99da 100644 --- a/rocks-db/src/asset.rs +++ b/rocks-db/src/asset.rs @@ -34,6 +34,7 @@ pub struct AssetDynamicDetails { pub onchain_data: Option>, pub creators: Updated>, pub royalty_amount: Updated, + pub url: Updated, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] @@ -206,6 +207,7 @@ impl AssetDynamicDetails { update_field(&mut current_val.royalty_amount, &new_val.royalty_amount); update_field(&mut current_val.was_decompressed, &new_val.was_decompressed); update_optional_field(&mut current_val.onchain_data, &new_val.onchain_data); + update_field(&mut current_val.url, &new_val.url); current_val } else { diff --git a/rocks-db/src/asset_streaming_client.rs b/rocks-db/src/asset_streaming_client.rs index 6eb0ea438..56865b9bc 100644 --- a/rocks-db/src/asset_streaming_client.rs +++ b/rocks-db/src/asset_streaming_client.rs @@ -163,6 +163,7 @@ async fn get_complete_asset_details( onchain_data, creators: dynamic_data.creators, royalty_amount: dynamic_data.royalty_amount, + url: dynamic_data.url, authority: Updated::new( authority.slot_updated, None, //todo: where do we get seq? diff --git a/rocks-db/src/batch_client.rs b/rocks-db/src/batch_client.rs index 6484f962b..d7e001adb 100644 --- a/rocks-db/src/batch_client.rs +++ b/rocks-db/src/batch_client.rs @@ -111,6 +111,7 @@ impl AssetIndexReader for Storage { existed_index.creators = dynamic_info.creators.clone().value; existed_index.royalty_amount = dynamic_info.royalty_amount.value as i64; existed_index.slot_updated = dynamic_info.get_slot_updated() as i64; + existed_index.metadata_url = Some(dynamic_info.url.value.clone()); } else { let asset_index = AssetIndex { pubkey: dynamic_info.pubkey, @@ -122,6 +123,7 @@ impl AssetIndexReader for Storage { creators: dynamic_info.creators.clone().value, royalty_amount: dynamic_info.royalty_amount.value as i64, slot_updated: dynamic_info.get_slot_updated() as i64, + metadata_url: Some(dynamic_info.url.value.clone()), ..Default::default() }; From 2abae7252cbd1a80f6d62d238f23e0ee58be9c65 Mon Sep 17 00:00:00 2001 From: Vadim Date: Thu, 28 Dec 2023 18:49:28 +0100 Subject: [PATCH 2/6] feat: save json tasks for compressed nfts --- .../src/bubblegum_updates_processor.rs | 13 ++++++++++ nft_ingester/src/buffer.rs | 9 +++---- nft_ingester/src/db_v2.rs | 2 +- nft_ingester/src/mplx_updates_processor.rs | 24 ++++++++++++++++++- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/nft_ingester/src/bubblegum_updates_processor.rs b/nft_ingester/src/bubblegum_updates_processor.rs index 5d68fd0a3..73f92950b 100644 --- a/nft_ingester/src/bubblegum_updates_processor.rs +++ b/nft_ingester/src/bubblegum_updates_processor.rs @@ -1,4 +1,5 @@ use crate::buffer::Buffer; +use crate::db_v2::Task; use crate::error::IngesterError; use crate::flatbuffer_mapper::FlatbufferMapper; use blockbuster::programs::bubblegum::{BubblegumInstruction, Payload}; @@ -542,6 +543,18 @@ impl BubblegumTxProcessor { } } + let mut tasks_buffer = self.buffer.json_tasks.lock().await; + + let task = Task { + ofd_metadata_url: args.uri.clone(), + ofd_locked_until: Some(chrono::Utc::now()), + ofd_attempts: 0, + ofd_max_attempts: 10, + ofd_error: None, + }; + + tasks_buffer.push_back(task); + return Ok(()); } Err(IngesterError::ParsingError( diff --git a/nft_ingester/src/buffer.rs b/nft_ingester/src/buffer.rs index 650660991..353a7280c 100644 --- a/nft_ingester/src/buffer.rs +++ b/nft_ingester/src/buffer.rs @@ -8,7 +8,7 @@ use tokio::sync::Mutex; use metrics_utils::IngesterMetricsConfig; use rocks_db::columns::{Mint, TokenAccount}; -use crate::mplx_updates_processor::MetadataInfo; +use crate::{db_v2::Task, mplx_updates_processor::MetadataInfo}; #[derive(Default)] pub struct Buffer { @@ -20,7 +20,7 @@ pub struct Buffer { pub mints: Mutex, Mint>>, - pub compressed_change_log: Mutex, ChangeLogEventV1>>, + pub json_tasks: Mutex>, } #[derive(Clone)] @@ -42,17 +42,18 @@ impl Buffer { mints: Mutex::new(HashMap::new()), - compressed_change_log: Mutex::new(HashMap::new()), + json_tasks: Mutex::new(VecDeque::::new()), } } pub async fn debug(&self) { println!( - "\nMplx metadata info buffer: {}\nTransactions buffer: {}\nSPL Tokens buffer: {}\nSPL Mints buffer: {}\n", + "\nMplx metadata info buffer: {}\nTransactions buffer: {}\nSPL Tokens buffer: {}\nSPL Mints buffer: {}\nJson tasks buffer: {}\n", self.mplx_metadata_info.lock().await.len(), self.transactions.lock().await.len(), self.token_accs.lock().await.len(), self.mints.lock().await.len(), + self.json_tasks.lock().await.len(), ); } diff --git a/nft_ingester/src/db_v2.rs b/nft_ingester/src/db_v2.rs index 2c54bbffa..636ea9eb1 100644 --- a/nft_ingester/src/db_v2.rs +++ b/nft_ingester/src/db_v2.rs @@ -31,7 +31,7 @@ pub enum TaskStatus { Failed, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Task { pub ofd_metadata_url: String, pub ofd_locked_until: Option>, diff --git a/nft_ingester/src/mplx_updates_processor.rs b/nft_ingester/src/mplx_updates_processor.rs index 75c243ceb..d180c0043 100644 --- a/nft_ingester/src/mplx_updates_processor.rs +++ b/nft_ingester/src/mplx_updates_processor.rs @@ -19,6 +19,8 @@ use crate::buffer::Buffer; use crate::db_v2::{DBClient as DBClientV2, Task}; pub const BUFFER_PROCESSING_COUNTER: i32 = 10; +// arbitrary number, should be enough to not overflow batch insert command at Postgre +pub const MAX_BUFFERED_TASKS_TO_TAKE: usize = 5000; #[derive(Default, Debug)] pub struct RocksMetadataModels { @@ -238,7 +240,27 @@ impl MplxAccsProcessor { } } - let res = self.db_client_v2.insert_tasks(&metadata_models.tasks).await; + let mut tasks_to_insert = metadata_models.tasks.clone(); + + let mut tasks_buffer = self.buffer.json_tasks.lock().await; + + let number_of_tasks = tasks_buffer.len(); + + if number_of_tasks > MAX_BUFFERED_TASKS_TO_TAKE { + tasks_to_insert.extend( + tasks_buffer + .drain(0..MAX_BUFFERED_TASKS_TO_TAKE) + .collect::>(), + ); + } else { + tasks_to_insert.extend( + tasks_buffer + .drain(0..number_of_tasks) + .collect::>(), + ); + } + + let res = self.db_client_v2.insert_tasks(&tasks_to_insert).await; match res { Err(e) => { self.metrics From ad685e24f698c9ec6472fcc503f65910181eadec Mon Sep 17 00:00:00 2001 From: Stanislav Cherviakov Date: Wed, 3 Jan 2024 10:40:06 +0100 Subject: [PATCH 3/6] a test to check metadata url upsert --- Cargo.lock | 25 +++++++ nft_ingester/src/buffer.rs | 1 - postgre-client/Cargo.toml | 2 + .../tests/asset_filter_client_test.rs | 68 ++++++++++++++++++- 4 files changed, 94 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b514f48b..8794f89bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3632,6 +3632,8 @@ dependencies = [ "sqlx", "testcontainers", "tokio", + "tracing", + "tracing-test", "uuid", ] @@ -6578,6 +6580,29 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a2c0ff408fe918a94c428a3f2ad04e4afd5c95bbc08fcf868eff750c15728a4" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bc1c4f8e2e73a977812ab339d503e6feeb92700f6d07a6de4d321522d5c08" +dependencies = [ + "lazy_static", + "quote", + "syn 1.0.109", +] + [[package]] name = "triomphe" version = "0.1.11" diff --git a/nft_ingester/src/buffer.rs b/nft_ingester/src/buffer.rs index 353a7280c..37fac1512 100644 --- a/nft_ingester/src/buffer.rs +++ b/nft_ingester/src/buffer.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::collections::VecDeque; use std::sync::Arc; -use spl_account_compression::events::ChangeLogEventV1; use tokio::sync::Mutex; use metrics_utils::IngesterMetricsConfig; diff --git a/postgre-client/Cargo.toml b/postgre-client/Cargo.toml index 562444066..c4d305aaf 100644 --- a/postgre-client/Cargo.toml +++ b/postgre-client/Cargo.toml @@ -15,6 +15,7 @@ serde = "1.0.193" base64 = "0.21.5" entities = { path = "../entities" } solana-sdk = { version = "~1.14.14" } +tracing ="0.1.40" [dev-dependencies] testcontainers = "0.14.0" @@ -22,6 +23,7 @@ uuid = { version = "1.6.1", features = ["v4"] } rand = "0.8.5" once_cell = "1.19.0" lazy_static = "1.4.0" +tracing-test = { version = "0.2.4", features = ["no-env-filter"] } [features] integration_tests = [] \ No newline at end of file diff --git a/postgre-client/tests/asset_filter_client_test.rs b/postgre-client/tests/asset_filter_client_test.rs index d9ac9a0b9..068f506e5 100644 --- a/postgre-client/tests/asset_filter_client_test.rs +++ b/postgre-client/tests/asset_filter_client_test.rs @@ -4,9 +4,10 @@ mod db_setup; #[cfg(test)] mod tests { use super::db_setup; - use postgre_client::model::*; use postgre_client::storage_traits::{AssetIndexStorage, AssetPubkeyFilteredFetcher}; use postgre_client::PgClient; + use postgre_client::{asset_filter_client, model::*}; + use rocks_db::asset; use testcontainers::clients::Cli; use testcontainers::*; use tokio; @@ -174,6 +175,71 @@ mod tests { .unwrap(); assert_eq!(res.len(), limit as usize); + let res = asset_filter_storage + .get_asset_pubkeys_filtered( + &SearchAssetsFilter { + json_uri: ref_value.metadata_url.clone(), + ..Default::default() + }, + &order, + 1000, + None, + None, + None, + ) + .await + .unwrap(); + assert_eq!(res.len(), 100); + pool.close().await; + db_setup::teardown(&node, &db_name).await; + } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_upsert_meatadata_urls() { + let cli = Cli::default(); + let node = cli.run(images::postgres::Postgres::default()); + let (pool, db_name) = db_setup::setup_database(&node).await; + + let asset_filter_storage = PgClient::new_with_pool(pool.clone()); + // Generate random asset indexes + let asset_indexes = db_setup::generate_asset_index_records(1); + let last_known_key = db_setup::generate_random_vec(8 + 8 + 32); + let ref_value = &asset_indexes[asset_indexes.len() - 1]; + + // Insert assets and last key using update_asset_indexes_batch + asset_filter_storage + .update_asset_indexes_batch(asset_indexes.as_slice(), &last_known_key) + .await + .unwrap(); + let asset_indexes = db_setup::generate_asset_index_records(100); + let last_known_key = db_setup::generate_random_vec(8 + 8 + 32); + + // Insert assets and last key using update_asset_indexes_batch + asset_filter_storage + .update_asset_indexes_batch(asset_indexes.as_slice(), &last_known_key) + .await + .unwrap(); + let order = AssetSorting { + sort_by: AssetSortBy::SlotCreated, + sort_direction: AssetSortDirection::Asc, + }; + + let res = asset_filter_storage + .get_asset_pubkeys_filtered( + &SearchAssetsFilter { + json_uri: ref_value.metadata_url.clone(), + ..Default::default() + }, + &order, + 1000, + None, + None, + None, + ) + .await + .unwrap(); + assert_eq!(res.len(), 101); pool.close().await; db_setup::teardown(&node, &db_name).await; } From 009d417845c894b939698b62a12db0c21c0cf78e Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Jan 2024 13:01:21 +0200 Subject: [PATCH 4/6] fix comment --- nft_ingester/src/mplx_updates_processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nft_ingester/src/mplx_updates_processor.rs b/nft_ingester/src/mplx_updates_processor.rs index d180c0043..f0d9bface 100644 --- a/nft_ingester/src/mplx_updates_processor.rs +++ b/nft_ingester/src/mplx_updates_processor.rs @@ -246,10 +246,10 @@ impl MplxAccsProcessor { let number_of_tasks = tasks_buffer.len(); - if number_of_tasks > MAX_BUFFERED_TASKS_TO_TAKE { + if number_of_tasks + metadata_info.len() > MAX_BUFFERED_TASKS_TO_TAKE { tasks_to_insert.extend( tasks_buffer - .drain(0..MAX_BUFFERED_TASKS_TO_TAKE) + .drain(0..MAX_BUFFERED_TASKS_TO_TAKE - metadata_info.len()) .collect::>(), ); } else { From 3f112bbbe5d081f39d494fae84b1e018829ce1de Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Jan 2024 13:04:34 +0200 Subject: [PATCH 5/6] check tasks_to_insert len --- nft_ingester/src/mplx_updates_processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nft_ingester/src/mplx_updates_processor.rs b/nft_ingester/src/mplx_updates_processor.rs index f0d9bface..15c32140e 100644 --- a/nft_ingester/src/mplx_updates_processor.rs +++ b/nft_ingester/src/mplx_updates_processor.rs @@ -246,10 +246,10 @@ impl MplxAccsProcessor { let number_of_tasks = tasks_buffer.len(); - if number_of_tasks + metadata_info.len() > MAX_BUFFERED_TASKS_TO_TAKE { + if number_of_tasks + tasks_to_insert.len() > MAX_BUFFERED_TASKS_TO_TAKE { tasks_to_insert.extend( tasks_buffer - .drain(0..MAX_BUFFERED_TASKS_TO_TAKE - metadata_info.len()) + .drain(0..MAX_BUFFERED_TASKS_TO_TAKE - tasks_to_insert.len()) .collect::>(), ); } else { From 659d424cf2b5423fb5f143bf49cd87ef7b7b997c Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Jan 2024 13:55:28 +0200 Subject: [PATCH 6/6] use saturating_sub --- nft_ingester/src/mplx_updates_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nft_ingester/src/mplx_updates_processor.rs b/nft_ingester/src/mplx_updates_processor.rs index 15c32140e..e4e79627c 100644 --- a/nft_ingester/src/mplx_updates_processor.rs +++ b/nft_ingester/src/mplx_updates_processor.rs @@ -249,7 +249,7 @@ impl MplxAccsProcessor { if number_of_tasks + tasks_to_insert.len() > MAX_BUFFERED_TASKS_TO_TAKE { tasks_to_insert.extend( tasks_buffer - .drain(0..MAX_BUFFERED_TASKS_TO_TAKE - tasks_to_insert.len()) + .drain(0..MAX_BUFFERED_TASKS_TO_TAKE.saturating_sub( tasks_to_insert.len())) .collect::>(), ); } else {