Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata url to RocksDB #30

Merged
merged 6 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions entities/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct CompleteAssetDetails {
pub onchain_data: Option<Updated<ChainDataV1>>,
pub creators: Updated<Vec<Creator>>,
pub royalty_amount: Updated<u16>,
pub url: Updated<String>,

// From AssetAuthority as Tuple
pub authority: Updated<Pubkey>,
Expand Down
14 changes: 14 additions & 0 deletions nft_ingester/src/bubblegum_updates_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::buffer::Buffer;
use crate::db_v2::Task;
use crate::error::IngesterError;
use crate::flatbuffer_mapper::FlatbufferMapper;
use blockbuster::programs::bubblegum::{BubblegumInstruction, Payload};
Expand Down Expand Up @@ -463,6 +464,7 @@ impl BubblegumTxProcessor {
Some(cl.seq),
args.seller_fee_basis_points,
),
url: Updated::new(bundle.slot, Some(cl.seq), args.uri.clone()),
..Default::default()
};

Expand Down Expand Up @@ -541,6 +543,18 @@ impl BubblegumTxProcessor {
}
}

let mut tasks_buffer = self.buffer.json_tasks.lock().await;

let task = Task {
ofd_metadata_url: args.uri.clone(),
ofd_locked_until: Some(chrono::Utc::now()),
ofd_attempts: 0,
ofd_max_attempts: 10,
ofd_error: None,
};

tasks_buffer.push_back(task);

return Ok(());
}
Err(IngesterError::ParsingError(
Expand Down
10 changes: 5 additions & 5 deletions nft_ingester/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;

use spl_account_compression::events::ChangeLogEventV1;
use tokio::sync::Mutex;

use metrics_utils::IngesterMetricsConfig;
use rocks_db::columns::{Mint, TokenAccount};

use crate::mplx_updates_processor::MetadataInfo;
use crate::{db_v2::Task, mplx_updates_processor::MetadataInfo};

#[derive(Default)]
pub struct Buffer {
Expand All @@ -20,7 +19,7 @@ pub struct Buffer {

pub mints: Mutex<HashMap<Vec<u8>, Mint>>,

pub compressed_change_log: Mutex<HashMap<Vec<u8>, ChangeLogEventV1>>,
pub json_tasks: Mutex<VecDeque<Task>>,
}

#[derive(Clone)]
Expand All @@ -42,17 +41,18 @@ impl Buffer {

mints: Mutex::new(HashMap::new()),

compressed_change_log: Mutex::new(HashMap::new()),
json_tasks: Mutex::new(VecDeque::<Task>::new()),
}
}

pub async fn debug(&self) {
println!(
"\nMplx metadata info buffer: {}\nTransactions buffer: {}\nSPL Tokens buffer: {}\nSPL Mints buffer: {}\n",
"\nMplx metadata info buffer: {}\nTransactions buffer: {}\nSPL Tokens buffer: {}\nSPL Mints buffer: {}\nJson tasks buffer: {}\n",
self.mplx_metadata_info.lock().await.len(),
self.transactions.lock().await.len(),
self.token_accs.lock().await.len(),
self.mints.lock().await.len(),
self.json_tasks.lock().await.len(),
);
}

Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/db_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum TaskStatus {
Failed,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Task {
pub ofd_metadata_url: String,
pub ofd_locked_until: Option<chrono::DateTime<chrono::Utc>>,
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/gapfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn insert_gaped_data(
}),
creators: data.creators,
royalty_amount: data.royalty_amount,
url: data.url,
},
)?;

Expand Down
26 changes: 25 additions & 1 deletion nft_ingester/src/mplx_updates_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::buffer::Buffer;
use crate::db_v2::{DBClient as DBClientV2, Task};

pub const BUFFER_PROCESSING_COUNTER: i32 = 10;
// arbitrary number, should be enough to not overflow batch insert command at Postgre
pub const MAX_BUFFERED_TASKS_TO_TAKE: usize = 5000;

#[derive(Default, Debug)]
pub struct RocksMetadataModels {
Expand Down Expand Up @@ -160,6 +162,7 @@ impl MplxAccsProcessor {
onchain_data: asset.onchain_data.clone(),
creators: asset.creators.clone(),
royalty_amount: asset.royalty_amount.clone(),
url: asset.url.clone(),
}
} else {
asset.clone()
Expand Down Expand Up @@ -237,7 +240,27 @@ impl MplxAccsProcessor {
}
}

let res = self.db_client_v2.insert_tasks(&metadata_models.tasks).await;
let mut tasks_to_insert = metadata_models.tasks.clone();

let mut tasks_buffer = self.buffer.json_tasks.lock().await;

let number_of_tasks = tasks_buffer.len();

if number_of_tasks + tasks_to_insert.len() > MAX_BUFFERED_TASKS_TO_TAKE {
tasks_to_insert.extend(
tasks_buffer
.drain(0..MAX_BUFFERED_TASKS_TO_TAKE.saturating_sub( tasks_to_insert.len()))
.collect::<Vec<Task>>(),
);
} else {
tasks_to_insert.extend(
tasks_buffer
.drain(0..number_of_tasks)
.collect::<Vec<Task>>(),
);
}

let res = self.db_client_v2.insert_tasks(&tasks_to_insert).await;
match res {
Err(e) => {
self.metrics
Expand Down Expand Up @@ -347,6 +370,7 @@ impl MplxAccsProcessor {
None,
data.seller_fee_basis_points,
),
url: Updated::new(metadata_info.slot, None, uri.clone()),
});

models.tasks.push(Task {
Expand Down
2 changes: 2 additions & 0 deletions postgre-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ serde = "1.0.193"
base64 = "0.21.5"
entities = { path = "../entities" }
solana-sdk = { version = "~1.14.14" }
tracing ="0.1.40"

[dev-dependencies]
testcontainers = "0.14.0"
uuid = { version = "1.6.1", features = ["v4"] }
rand = "0.8.5"
once_cell = "1.19.0"
lazy_static = "1.4.0"
tracing-test = { version = "0.2.4", features = ["no-env-filter"] }

[features]
integration_tests = []
22 changes: 5 additions & 17 deletions postgre-client/src/asset_index_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,11 @@ impl AssetIndexStorage for PgClient {

if !metadata_urls.is_empty() {
metadata_urls.sort();
let mut query_builder: QueryBuilder<'_, Postgres> =
QueryBuilder::new("INSERT INTO metadata (mtd_url) ");
query_builder.push_values(metadata_urls.iter(), |mut builder, metadata_url| {
builder.push_bind(metadata_url);
});
query_builder.push(" ON CONFLICT (mtd_url) DO NOTHING RETURNING mtd_id, mtd_url;");
let query = query_builder.build_query_as::<(i64, String)>();
let metadata_ids = query
.fetch_all(&mut transaction)
.await
.map_err(|e| e.to_string())?;

// convert metadata_ids to a map
metadata_url_map = metadata_ids
.iter()
.map(|(id, url)| (url.clone(), *id))
.collect::<HashMap<String, i64>>();
self.insert_metadata(&mut transaction, metadata_urls.clone())
.await?;
metadata_url_map = self
.get_metadata_ids(&mut transaction, metadata_urls)
.await?;
}

let mut asset_indexes = asset_indexes.to_vec();
Expand Down
61 changes: 60 additions & 1 deletion postgre-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
PgPool,
PgPool, Postgres, QueryBuilder, Row, Transaction,
};
use std::collections::HashMap;

pub mod asset_filter_client;
pub mod asset_index_client;
Expand Down Expand Up @@ -30,4 +31,62 @@ impl PgClient {
pub fn new_with_pool(pool: PgPool) -> Self {
Self { pool }
}

pub async fn insert_metadata(
&self,
transaction: &mut Transaction<'_, Postgres>,
metadata_urls: Vec<String>,
) -> Result<(), String> {
let mut query_builder: QueryBuilder<'_, Postgres> =
QueryBuilder::new("INSERT INTO metadata (mtd_url) ");
query_builder.push_values(metadata_urls.iter(), |mut builder, metadata_url| {
builder.push_bind(metadata_url);
});
query_builder.push(" ON CONFLICT (mtd_url) DO NOTHING;");

let query = query_builder.build();
query
.execute(transaction)
.await
.map_err(|err| err.to_string())?;

Ok(())
}

pub async fn get_metadata_ids(
&self,
transaction: &mut Transaction<'_, Postgres>,
metadata_urls: Vec<String>,
) -> Result<HashMap<String, i64>, String> {
let mut query_builder: QueryBuilder<'_, Postgres> =
QueryBuilder::new("SELECT mtd_id, mtd_url FROM metadata WHERE mtd_url in (");

let urls_len = metadata_urls.len();

for (i, url) in metadata_urls.iter().enumerate() {
query_builder.push_bind(url);
if i < urls_len - 1 {
query_builder.push(",");
}
}
query_builder.push(");");

let query = query_builder.build();

let rows_result = query
.fetch_all(transaction)
.await
.map_err(|err| err.to_string())?;

let mut metadata_ids_map = HashMap::new();

for row in rows_result {
let metadata_id: i64 = row.get("mtd_id");
let metadata_url: String = row.get("mtd_url");

metadata_ids_map.insert(metadata_url, metadata_id);
}

Ok(metadata_ids_map)
}
}
68 changes: 67 additions & 1 deletion postgre-client/tests/asset_filter_client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ mod db_setup;
#[cfg(test)]
mod tests {
use super::db_setup;
use postgre_client::model::*;
use postgre_client::storage_traits::{AssetIndexStorage, AssetPubkeyFilteredFetcher};
use postgre_client::PgClient;
use postgre_client::{asset_filter_client, model::*};
use rocks_db::asset;
use testcontainers::clients::Cli;
use testcontainers::*;
use tokio;
Expand Down Expand Up @@ -174,6 +175,71 @@ mod tests {
.unwrap();
assert_eq!(res.len(), limit as usize);

let res = asset_filter_storage
.get_asset_pubkeys_filtered(
&SearchAssetsFilter {
json_uri: ref_value.metadata_url.clone(),
..Default::default()
},
&order,
1000,
None,
None,
None,
)
.await
.unwrap();
assert_eq!(res.len(), 100);
pool.close().await;
db_setup::teardown(&node, &db_name).await;
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_upsert_meatadata_urls() {
let cli = Cli::default();
let node = cli.run(images::postgres::Postgres::default());
let (pool, db_name) = db_setup::setup_database(&node).await;

let asset_filter_storage = PgClient::new_with_pool(pool.clone());
// Generate random asset indexes
let asset_indexes = db_setup::generate_asset_index_records(1);
let last_known_key = db_setup::generate_random_vec(8 + 8 + 32);
let ref_value = &asset_indexes[asset_indexes.len() - 1];

// Insert assets and last key using update_asset_indexes_batch
asset_filter_storage
.update_asset_indexes_batch(asset_indexes.as_slice(), &last_known_key)
.await
.unwrap();
let asset_indexes = db_setup::generate_asset_index_records(100);
let last_known_key = db_setup::generate_random_vec(8 + 8 + 32);

// Insert assets and last key using update_asset_indexes_batch
asset_filter_storage
.update_asset_indexes_batch(asset_indexes.as_slice(), &last_known_key)
.await
.unwrap();
let order = AssetSorting {
sort_by: AssetSortBy::SlotCreated,
sort_direction: AssetSortDirection::Asc,
};

let res = asset_filter_storage
.get_asset_pubkeys_filtered(
&SearchAssetsFilter {
json_uri: ref_value.metadata_url.clone(),
..Default::default()
},
&order,
1000,
None,
None,
None,
)
.await
.unwrap();
assert_eq!(res.len(), 101);
pool.close().await;
db_setup::teardown(&node, &db_name).await;
}
Expand Down
Loading
Loading