diff --git a/Cargo.lock b/Cargo.lock index 09ed22787..b3b4fabf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4658,6 +4658,7 @@ dependencies = [ "mpl-bubblegum", "mpl-token-metadata", "multer", + "num-bigint 0.4.5", "num-traits", "open-rpc-schema", "plerkle_messenger", diff --git a/Makefile b/Makefile index a06182d34..07f5416a6 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,8 @@ -.PHONY: build start build-integrity-verification start-integrity-verification start-raw-backfiller start-core-indexing dev stop clippy test - +.PHONY: build start build-integrity-verification start-integrity-verification start-core-indexing dev stop clippy test start-backfiller SHELL := /bin/bash build: - @docker compose -f docker-compose.yaml build ingester raw-backfiller das-api synchronizer core-indexing slot-persister + @docker compose -f docker-compose.yaml build ingester das-api synchronizer core-indexing slot-persister backfill start: @docker compose -f docker-compose.yaml up -d ingester @@ -26,8 +25,8 @@ build-integrity-verification: start-integrity-verification: @docker compose -f docker-compose.yaml up -d integrity-verification -start-raw-backfiller: - @docker compose -f docker-compose.yaml up -d raw-backfiller +start-backfiller: + @docker compose -f docker-compose.yaml up -d backfill start-core-indexing: @docker compose -f docker-compose.yaml up -d core-indexing diff --git a/docker-compose.yaml b/docker-compose.yaml index e94ad889b..5ff9a9f40 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -3,7 +3,7 @@ services: ingester: container_name: ingester restart: always - entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; else exec ./profiling_ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; fi" + entrypoint: sh -c "if [ -z '${MALLOC_CONF}' ]; then exec ./ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; else exec ./profiling_ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; fi" env_file: - .env network_mode: host @@ -34,7 +34,7 @@ services: das-api: container_name: das-api restart: always - entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./api; else exec ./profiling_api; fi" + entrypoint: sh -c "if [ -z '${MALLOC_CONF}' ]; then exec ./api; else exec ./profiling_api; fi" env_file: - .env network_mode: host @@ -57,7 +57,7 @@ services: synchronizer: container_name: synchronizer restart: always - entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./synchronizer; else exec ./profiling_synchronizer; fi" + entrypoint: sh -c "if [ -z '${MALLOC_CONF}' ]; then exec ./synchronizer; else exec ./profiling_synchronizer; fi" env_file: - .env network_mode: host @@ -97,18 +97,19 @@ services: options: max-size: "2048m" - raw-backfiller: - container_name: raw-backfiller - restart: always - entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./raw_backfiller; else exec ./profiling_raw_backfiller; fi" + backfill: + container_name: backfill + restart: no + entrypoint: ./backfill env_file: - .env + environment: + SOURCE_SLOTS_DB_PATH: ${INGESTER_SLOTS_DB_PATH} + TARGET_MAIN_DB_PATH: ${INGESTER_ROCKS_DB_PATH} network_mode: host volumes: - - ${INGESTER_ROCKS_DB_PATH}:${INGESTER_ROCKS_DB_PATH_CONTAINER}:rw - - ${INGESTER_PROFILING_FILE_PATH}:${INGESTER_PROFILING_FILE_PATH_CONTAINER}:rw - - ./creds.json:/usr/src/app/creds.json - - ./heaps:/usr/src/app/heaps:rw + - ${INGESTER_ROCKS_DB_PATH}:${INGESTER_ROCKS_DB_PATH}:rw + - ${INGESTER_SLOTS_DB_PATH}:${INGESTER_SLOTS_DB_PATH}:ro stop_grace_period: 5m build: context: . @@ -120,30 +121,22 @@ services: slot-persister: container_name: slot-persister restart: always - entrypoint: | - sh -c " - ARGS=\"--target-db-path $target_db_path\" - ARGS=\"$$ARGS --rpc-host $rpc_host\" - [ -n \"$start_slot\" ] && ARGS=\"$$ARGS --start-slot $start_slot\" - [ -n \"$big_table_credentials\" ] && ARGS=\"$$ARGS --big-table-credentials $big_table_credentials\" - [ -n \"$big_table_timeout\" ] && ARGS=\"$$ARGS --big-table-timeout $big_table_timeout\" - [ -n \"$metrics_port\" ] && ARGS=\"$$ARGS --metrics-port $metrics_port\" - [ -n \"$chunk_size\" ] && ARGS=\"$$ARGS --chunk-size $chunk_size\" - [ -n \"$max_concurrency\" ] && ARGS=\"$$ARGS --max-concurrency $max_concurrency\" - - if [ -z \"$MALLOC_CONF\" ]; then - exec ./slot_persister $$ARGS - else - exec ./profiling_slot_persister $$ARGS - fi" + entrypoint: > + sh -c "if [ -f /usr/src/app/creds.json ]; then + export BIG_TABLE_CREDENTIALS=/usr/src/app/creds.json; + fi; + exec ./slot_persister" env_file: - .env + environment: + SLOTS_DB_PRIMARY_PATH: ${INGESTER_SLOTS_DB_PATH} + RPC_HOST: ${INGESTER_RPC_HOST} + # BIG_TABLE_CREDENTIALS: /usr/src/app/creds.json # refactored this to account for the file doesn't exist case + METRICS_PORT: 9090 network_mode: host volumes: - - ${target_db_path}:${target_db_path}:rw - - ${INGESTER_PROFILING_FILE_PATH}:${INGESTER_PROFILING_FILE_PATH_CONTAINER}:rw - - ${big_table_credentials:-/tmp/creds.json}:${big_table_credentials:-/tmp/creds.json} - - ./heaps:/usr/src/app/heaps:rw + - ${INGESTER_SLOTS_DB_PATH}:${INGESTER_SLOTS_DB_PATH}:rw + - ./creds.json:/usr/src/app/creds.json stop_grace_period: 5m build: context: . diff --git a/ingester.Dockerfile b/ingester.Dockerfile index 249ee2da7..bd1277704 100644 --- a/ingester.Dockerfile +++ b/ingester.Dockerfile @@ -18,6 +18,7 @@ COPY rocks-db ./rocks-db COPY tests/setup ./tests/setup COPY usecase ./usecase COPY integrity_verification ./integrity_verification +COPY integration_tests ./integration_tests RUN cargo chef prepare --recipe-path recipe.json @@ -36,12 +37,12 @@ RUN cargo chef cook --release --recipe-path recipe.json # Building the services FROM cacher AS builder COPY . . -RUN cargo build --release --bin ingester --bin api --bin raw_backfiller --bin synchronizer --bin slot_persister +RUN cargo build --release --bin ingester --bin api --bin backfill --bin synchronizer --bin slot_persister # Building the profiling feature services FROM cacher AS builder-with-profiling COPY . . -RUN cargo build --release --features profiling --bin ingester --bin api --bin raw_backfiller --bin synchronizer --bin slot_persister +RUN cargo build --release --features profiling --bin ingester --bin api --bin backfill --bin synchronizer --bin slot_persister # Final image FROM rust:1.76-slim-bullseye AS runtime @@ -52,12 +53,12 @@ ENV TZ=Etc/UTC APP_USER=appuser LD_PRELOAD="/usr/local/lib/libjemalloc.so.2" RUN groupadd $APP_USER && useradd -g $APP_USER $APP_USER && mkdir -p ${APP} COPY --from=builder /rust/target/release/ingester ${APP}/ingester -COPY --from=builder /rust/target/release/raw_backfiller ${APP}/raw_backfiller +COPY --from=builder /rust/target/release/backfill ${APP}/backfill COPY --from=builder /rust/target/release/api ${APP}/api COPY --from=builder /rust/target/release/synchronizer ${APP}/synchronizer COPY --from=builder /rust/target/release/slot_persister ${APP}/slot_persister COPY --from=builder-with-profiling /rust/target/release/ingester ${APP}/profiling_ingester -COPY --from=builder-with-profiling /rust/target/release/raw_backfiller ${APP}/profiling_raw_backfiller +COPY --from=builder-with-profiling /rust/target/release/backfill ${APP}/profiling_backfill COPY --from=builder-with-profiling /rust/target/release/api ${APP}/profiling_api COPY --from=builder-with-profiling /rust/target/release/synchronizer ${APP}/profiling_synchronizer COPY --from=builder-with-profiling /rust/target/release/slot_persister ${APP}/profiling_slot_persister diff --git a/integration_tests/src/common.rs b/integration_tests/src/common.rs index 0d30c075c..3c121b202 100644 --- a/integration_tests/src/common.rs +++ b/integration_tests/src/common.rs @@ -35,7 +35,6 @@ use tokio::{ }; use usecase::proofs::MaybeProofChecker; -use tracing::{error, info}; use serde::de::DeserializeOwned; use solana_account_decoder::{UiAccount, UiAccountEncoding}; use solana_client::{ @@ -51,6 +50,7 @@ use solana_sdk::{ }; use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding}; use std::{fmt, time::Duration}; +use tracing::{error, info}; use std::path::PathBuf; diff --git a/interface/src/price_fetcher.rs b/interface/src/price_fetcher.rs index 0ad6ea874..35090ed9f 100644 --- a/interface/src/price_fetcher.rs +++ b/interface/src/price_fetcher.rs @@ -1,6 +1,5 @@ use crate::error::UsecaseError; use async_trait::async_trait; -use solana_program::pubkey::Pubkey; use std::collections::HashMap; #[async_trait] diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 693e43c67..897a800be 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -33,7 +33,7 @@ stretto = { workspace = true } async-channel = { workspace = true } tokio-util = { workspace = true } tracing-subscriber = { workspace = true } -clap = { workspace = true } +clap = { workspace = true, features = ["env"] } bincode = { workspace = true } metrics-utils = { path = "../metrics_utils" } rocks-db = { path = "../rocks-db" } @@ -84,6 +84,7 @@ indicatif = { workspace = true } tokio-retry = { workspace = true } axum = { workspace = true } rocksdb = { workspace = true } +num-bigint = "0.4" [dev-dependencies] setup = { path = "../tests/setup" } @@ -145,3 +146,6 @@ name = "explorer" [[bin]] name = "synchronizer_utils" + +[[bin]] +name = "dumper" diff --git a/nft_ingester/src/api/dapi/asset.rs b/nft_ingester/src/api/dapi/asset.rs index 8888574ce..dbe836276 100644 --- a/nft_ingester/src/api/dapi/asset.rs +++ b/nft_ingester/src/api/dapi/asset.rs @@ -18,7 +18,7 @@ use interface::processing_possibility::ProcessingPossibilityChecker; use itertools::Itertools; use metrics_utils::ApiMetricsConfig; use rocks_db::asset::{AssetLeaf, AssetSelectedMaps}; -use rocks_db::{AssetAuthority, Storage}; +use rocks_db::Storage; use tokio::sync::Mutex; use tokio::task::{JoinError, JoinSet}; @@ -140,7 +140,9 @@ fn asset_selected_maps_into_full_asset( if let Some(asset_static_details) = &asset_complete_details.static_details { // collection itself cannot have a collection // TODO!: should we also include in this check FungibleToken? - if &asset_static_details.specification_asset_class != &SpecificationAssetClass::MplCoreCollection { + if &asset_static_details.specification_asset_class + != &SpecificationAssetClass::MplCoreCollection + { if let Some(collection_details) = &asset_complete_details.collection { if !collection_details.is_collection_verified.value { return None; @@ -294,7 +296,7 @@ pub async fn get_by_ids< }, ); } - Err(e) => {} + Err(_) => {} } } diff --git a/nft_ingester/src/api/dapi/rpc_asset_convertors.rs b/nft_ingester/src/api/dapi/rpc_asset_convertors.rs index c8cf247b2..ba3954dbd 100644 --- a/nft_ingester/src/api/dapi/rpc_asset_convertors.rs +++ b/nft_ingester/src/api/dapi/rpc_asset_convertors.rs @@ -279,7 +279,12 @@ pub fn to_authority( // even if there is no authority for asset we should not set Pubkey::default(), just empty string let auth_key = update_authority .map(|update_authority| update_authority.to_string()) - .unwrap_or(authority.as_ref().map(|auth| auth.authority.to_string()).unwrap_or("".to_string())); + .unwrap_or( + authority + .as_ref() + .map(|auth| auth.authority.to_string()) + .unwrap_or("".to_string()), + ); vec![Authority { address: auth_key, diff --git a/nft_ingester/src/backfiller.rs b/nft_ingester/src/backfiller.rs index 7d88258c3..ca90c5f31 100644 --- a/nft_ingester/src/backfiller.rs +++ b/nft_ingester/src/backfiller.rs @@ -1,4 +1,4 @@ -use crate::config::{BackfillerConfig, BackfillerSourceMode, BigTableConfig}; +use crate::config::{BackfillerSourceMode, BigTableConfig}; use crate::error::IngesterError; use async_trait::async_trait; use backfill_rpc::rpc::BackfillRPC; @@ -118,7 +118,6 @@ impl BlockProducer for BackfillSource { #[derive(Clone)] pub struct TransactionsParser { - rocks_client: Arc, slot_getter: Arc, consumer: Arc, producer: Arc

, @@ -134,7 +133,6 @@ where S: SlotGetter, { pub fn new( - rocks_client: Arc, slot_getter: Arc, consumer: Arc, producer: Arc

, @@ -143,7 +141,6 @@ where chunk_size: usize, ) -> TransactionsParser { TransactionsParser { - rocks_client, slot_getter, consumer, producer, diff --git a/nft_ingester/src/bin/api/main.rs b/nft_ingester/src/bin/api/main.rs index a44b09a22..74c9b3bba 100644 --- a/nft_ingester/src/bin/api/main.rs +++ b/nft_ingester/src/bin/api/main.rs @@ -30,10 +30,10 @@ pub const DEFAULT_SECONDARY_ROCKSDB_PATH: &str = "./my_rocksdb_secondary"; #[tokio::main(flavor = "multi_thread")] pub async fn main() -> Result<(), IngesterError> { + tracing_subscriber::fmt::init(); info!("Starting API server..."); - let config: ApiConfig = setup_config("API_"); - init_logger(&config.get_log_level()); + // init_logger(&config.get_log_level()); let guard = if config.run_profiling { Some( @@ -73,6 +73,7 @@ pub async fn main() -> Result<(), IngesterError> { config.database_config.get_database_url()?.as_str(), min_connections, max_connections, + None, red_metrics.clone(), ) .await?; @@ -120,6 +121,7 @@ pub async fn main() -> Result<(), IngesterError> { rocks_storage.clone(), json_downloader_metrics.clone(), red_metrics.clone(), + nft_ingester::config::default_parallel_json_downloaders(), ) .await, )) diff --git a/nft_ingester/src/bin/backfill/main.rs b/nft_ingester/src/bin/backfill/main.rs index 26f0cfd11..07410c5f6 100644 --- a/nft_ingester/src/bin/backfill/main.rs +++ b/nft_ingester/src/bin/backfill/main.rs @@ -24,11 +24,11 @@ use tracing::{error, info, warn}; #[command(author, version, about, long_about = None)] struct Args { /// Path to the source RocksDB with slots (readonly) - #[arg(short, long)] + #[arg(short, long, env = "SOURCE_SLOTS_DB_PATH")] source_db_path: PathBuf, /// Path to the target RocksDB instance - #[arg(short, long)] + #[arg(short, long, env = "TARGET_MAIN_DB_PATH")] target_db_path: PathBuf, /// Optional starting slot number diff --git a/nft_ingester/src/bin/dumper/main.rs b/nft_ingester/src/bin/dumper/main.rs new file mode 100644 index 000000000..f8c4d1631 --- /dev/null +++ b/nft_ingester/src/bin/dumper/main.rs @@ -0,0 +1,232 @@ +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; + +use clap::{command, Parser}; +use metrics_utils::SynchronizerMetricsConfig; +use nft_ingester::error::IngesterError; +use nft_ingester::index_syncronizer::shard_pubkeys; +use nft_ingester::init::graceful_stop; +use rocks_db::migrator::MigrationState; +use rocks_db::storage_traits::{AssetUpdateIndexStorage, Dumper}; +use rocks_db::Storage; +use tokio::sync::{broadcast, Mutex}; +use tokio::task::JoinSet; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Path to the RocksDB instance + #[arg(short, long)] + source_path: PathBuf, + + /// Buffer capacity for the dumper + /// The dumper will wait until the buffer is full before writing to the CSV files + /// This is to reduce the number of writes to the disk + #[arg(short, long, default_value = "33554432")] + buffer_capacity: usize, + + /// Limit the number of assets to dump + /// If not set, all assets will be dumped + /// If set, the dumper will stop after dumping the specified number of assets + /// This is useful for testing + #[arg(short, long)] + limit: Option, + + /// Path to dump the CSV files + /// If not set, the CSV files will be dumped to a temporary directory + #[arg(short, long)] + dump_path: Option, + + /// Number of shards to use for the dump + /// If not set, the dump will be done in a single shard + /// If set, the dump will be done in the specified number of shards + #[arg(short, long, default_value = "1")] + num_shards: u64, + + /// Number of shards for fungible tokens + /// If not set, the dump will be done in a single shard + /// If set, the dump will be done in the specified number of shards + #[arg(short, long, default_value = "1")] + fungible_num_shards: u64, +} + +#[tokio::main(flavor = "multi_thread")] +pub async fn main() -> Result<(), IngesterError> { + // Initialize tracing subscriber for logging + tracing_subscriber::fmt::init(); + tracing::info!("Starting Synchronizer server..."); + let args = Args::parse(); + let start_time = std::time::Instant::now(); + + let secondary_storage_path = tempfile::TempDir::new().unwrap().path().to_path_buf(); + let metrics = Arc::new(SynchronizerMetricsConfig::new()); + let red_metrics = Arc::new(metrics_utils::red::RequestErrorDurationMetrics::new()); + + let tasks = JoinSet::new(); + let mutexed_tasks = Arc::new(Mutex::new(tasks)); + + let rocks_storage = Arc::new( + Storage::open_secondary( + &args.source_path, + &secondary_storage_path, + mutexed_tasks.clone(), + red_metrics.clone(), + MigrationState::Last, + ) + .unwrap(), + ); + + let cloned_tasks = mutexed_tasks.clone(); + let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); + mutexed_tasks.lock().await.spawn(async move { + // --stop + graceful_stop(cloned_tasks, shutdown_tx, None, None, None, "").await; + + Ok(()) + }); + if let Err(e) = rocks_storage.db.try_catch_up_with_primary() { + tracing::error!("Sync rocksdb error: {}", e); + } + + tracing::info!("Preparation took {:?}", start_time.elapsed()); + let start_time = std::time::Instant::now(); + + let Some(last_known_key) = rocks_storage.last_known_nft_asset_updated_key()? else { + return Ok(()); + }; + let Some(last_known_fungible_key) = rocks_storage.last_known_fungible_asset_updated_key()? + else { + return Ok(()); + }; + + let base_path = args + .dump_path + .unwrap_or_else(|| tempfile::TempDir::new().unwrap().path().to_path_buf()); + + let shards = shard_pubkeys(args.num_shards); + let fungible_shards = shard_pubkeys(args.fungible_num_shards); + + let mut tasks = JoinSet::new(); + let mut fungible_tasks = JoinSet::new(); + + for (start, end) in shards.iter() { + let name_postfix = if args.num_shards > 1 { + format!("_shard_{}_{}", start, end) + } else { + "".to_string() + }; + let creators_path = base_path + .join(format!("creators{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + let assets_path = base_path + .join(format!("assets{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + let authorities_path = base_path + .join(format!("assets_authorities{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + let metadata_path = base_path + .join(format!("metadata{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + tracing::info!( + "Dumping to creators: {:?}, assets: {:?}, authorities: {:?}, metadata: {:?}", + creators_path, + assets_path, + authorities_path, + metadata_path, + ); + + let assets_file = File::create(assets_path.clone()) + .map_err(|e| format!("Could not create file for assets dump: {}", e))?; + let creators_file = File::create(creators_path.clone()) + .map_err(|e| format!("Could not create file for creators dump: {}", e))?; + let authority_file = File::create(authorities_path.clone()) + .map_err(|e| format!("Could not create file for authority dump: {}", e))?; + let metadata_file = File::create(metadata_path.clone()) + .map_err(|e| format!("Could not create file for metadata dump: {}", e))?; + + let start = start.clone(); + let end = end.clone(); + let shutdown_rx = shutdown_rx.resubscribe(); + let metrics = metrics.clone(); + let rocks_storage = rocks_storage.clone(); + tasks.spawn_blocking(move || { + rocks_storage.dump_nft_csv( + assets_file, + creators_file, + authority_file, + metadata_file, + args.buffer_capacity, + args.limit, + Some(start), + Some(end), + &shutdown_rx, + metrics, + ) + }); + } + + for (start, end) in fungible_shards.iter() { + let name_postfix = if args.fungible_num_shards > 1 { + format!("_shard_{}_{}", start, end) + } else { + "".to_string() + }; + let fungible_tokens_path = base_path + .join(format!("fungible_tokens{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + tracing::info!("Dumping to fungible tokens: {:?}", fungible_tokens_path); + let fungible_tokens_file = File::create(fungible_tokens_path.clone()) + .map_err(|e| format!("Could not create file for fungible tokens dump: {}", e))?; + + let start = start.clone(); + let end = end.clone(); + let shutdown_rx = shutdown_rx.resubscribe(); + let metrics = metrics.clone(); + let rocks_storage = rocks_storage.clone(); + fungible_tasks.spawn_blocking(move || { + rocks_storage.dump_fungible_csv( + (fungible_tokens_file, fungible_tokens_path), + args.buffer_capacity, + Some(start), + Some(end), + &shutdown_rx, + metrics, + ) + }); + } + + let mut total_assets = 0; + while let Some(task) = tasks.join_next().await { + let cnt = task + .map_err(|e| e.to_string())? + .map_err(|e| e.to_string())?; + total_assets = total_assets + cnt; + } + let duration = start_time.elapsed(); + tracing::info!( + "Dumping of {} assets took {:?}, average rate: {:.2} assets/s", + total_assets, + duration, + total_assets as f64 / duration.as_secs_f64() + ); + + while let Some(task) = fungible_tasks.join_next().await { + task.map_err(|e| e.to_string())? + .map_err(|e| e.to_string())?; + } + tracing::info!("Dumping fungible tokens done"); + let keys_file = File::create(base_path.join("keys.csv")).expect("should create keys file"); + Storage::dump_last_keys(keys_file, last_known_key, last_known_fungible_key)?; + Ok(()) +} diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 468069201..bbfc2fc48 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -1,6 +1,6 @@ use arweave_rs::consts::ARWEAVE_BASE_URL; use arweave_rs::Arweave; -use entities::enums::{AssetType, ASSET_TYPES}; +use entities::enums::ASSET_TYPES; use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister}; use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; use nft_ingester::scheduler::Scheduler; @@ -72,7 +72,7 @@ static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb"; pub const ARWEAVE_WALLET_PATH: &str = "./arweave_wallet.json"; -pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100; +pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 8; pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100; pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15 * 60; // 15 minutes @@ -128,6 +128,7 @@ pub async fn main() -> Result<(), IngesterError> { metrics_state.red_metrics.clone(), DEFAULT_MIN_POSTGRES_CONNECTIONS, PG_MIGRATIONS_PATH, + None, ) .await?, ); @@ -298,6 +299,7 @@ pub async fn main() -> Result<(), IngesterError> { primary_rocks_storage.clone(), metrics_state.json_downloader_metrics.clone(), metrics_state.red_metrics.clone(), + config.parallel_json_downloaders, ) .await, ); @@ -566,7 +568,6 @@ pub async fn main() -> Result<(), IngesterError> { metrics.register_with_prefix(&mut metrics_state.registry, "force_slot_persister_"); if let Some(client) = grpc_client { let force_reingestable_transactions_parser = Arc::new(TransactionsParser::new( - primary_rocks_storage.clone(), force_reingestable_slot_processor.clone(), force_reingestable_slot_processor.clone(), Arc::new(client), @@ -580,7 +581,6 @@ pub async fn main() -> Result<(), IngesterError> { .spawn(run_slot_force_persister(force_reingestable_transactions_parser, rx)); } else { let force_reingestable_transactions_parser = Arc::new(TransactionsParser::new( - primary_rocks_storage.clone(), force_reingestable_slot_processor.clone(), force_reingestable_slot_processor.clone(), producer.clone(), @@ -645,12 +645,14 @@ pub async fn main() -> Result<(), IngesterError> { for asset_type in ASSET_TYPES { let primary_rocks_storage = primary_rocks_storage.clone(); let mut rx = shutdown_rx.resubscribe(); + let index_pg_storage = index_pg_storage.clone(); mutexed_tasks.lock().await.spawn(async move { + let index_pg_storage = index_pg_storage.clone(); tokio::select! { _ = rx.recv() => {} _ = async move { loop { - match clean_syncronized_idxs(primary_rocks_storage.clone(), asset_type) { + match clean_syncronized_idxs(index_pg_storage.clone(), primary_rocks_storage.clone(), asset_type).await { Ok(_) => { info!("Cleaned synchronized indexes for {:?}", asset_type); } diff --git a/nft_ingester/src/bin/slot_persister/main.rs b/nft_ingester/src/bin/slot_persister/main.rs index a0913b913..a71bda266 100644 --- a/nft_ingester/src/bin/slot_persister/main.rs +++ b/nft_ingester/src/bin/slot_persister/main.rs @@ -41,11 +41,11 @@ const SLOT_COLLECTION_OFFSET: u64 = 300; )] struct Args { /// Path to the target RocksDB instance with slots - #[arg(short, long)] + #[arg(short, long, env = "SLOTS_DB_PRIMARY_PATH")] target_db_path: PathBuf, /// RPC host - #[arg(short, long)] + #[arg(short, long, env = "RPC_HOST")] rpc_host: String, /// Optional starting slot number, this will override the last saved slot in the RocksDB @@ -53,7 +53,7 @@ struct Args { start_slot: Option, /// Big table credentials file path - #[arg(short, long)] + #[arg(short, long, env = "BIG_TABLE_CREDENTIALS")] big_table_credentials: Option, /// Optional big table timeout (default: 1000) @@ -62,7 +62,7 @@ struct Args { /// Metrics port /// Default: 9090 - #[arg(short, long, default_value = "9090")] + #[arg(short, long, default_value = "9090", env = "METRICS_PORT")] metrics_port: u16, /// Number of slots to process in each batch diff --git a/nft_ingester/src/bin/synchronizer/main.rs b/nft_ingester/src/bin/synchronizer/main.rs index 0f2dcda5b..274032a5d 100644 --- a/nft_ingester/src/bin/synchronizer/main.rs +++ b/nft_ingester/src/bin/synchronizer/main.rs @@ -7,9 +7,9 @@ use nft_ingester::index_syncronizer::{SyncStatus, Synchronizer}; use nft_ingester::init::{graceful_stop, init_index_storage_with_migration}; use postgre_client::PG_MIGRATIONS_PATH; use prometheus_client::registry::Registry; +use std::path::PathBuf; use std::sync::Arc; -use metrics_utils::utils::setup_metrics; use metrics_utils::SynchronizerMetricsConfig; use rocks_db::migrator::MigrationState; use rocks_db::Storage; @@ -23,7 +23,7 @@ static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb"; pub const DEFAULT_SECONDARY_ROCKSDB_PATH: &str = "./my_rocksdb_secondary"; pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100; -pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100; +pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 2; #[tokio::main(flavor = "multi_thread")] pub async fn main() -> Result<(), IngesterError> { @@ -59,8 +59,9 @@ pub async fn main() -> Result<(), IngesterError> { &config.database_config.get_database_url().unwrap(), max_postgre_connections, red_metrics.clone(), - DEFAULT_MAX_POSTGRES_CONNECTIONS, + DEFAULT_MIN_POSTGRES_CONNECTIONS, PG_MIGRATIONS_PATH, + Some(PathBuf::from(config.dump_path.clone())), ) .await?, ); @@ -106,23 +107,22 @@ pub async fn main() -> Result<(), IngesterError> { let synchronizer = Arc::new(Synchronizer::new( rocks_storage.clone(), index_storage.clone(), - index_storage.clone(), config.dump_synchronizer_batch_size, config.dump_path.to_string(), metrics.clone(), config.parallel_tasks, - config.run_temp_sync_during_dump, )); if let Err(e) = rocks_storage.db.try_catch_up_with_primary() { tracing::error!("Sync rocksdb error: {}", e); } + let mut full_sync_tasks = JoinSet::new(); for asset_type in ASSET_TYPES { let synchronizer = synchronizer.clone(); let shutdown_rx = shutdown_rx.resubscribe(); - mutexed_tasks.lock().await.spawn(async move { + full_sync_tasks.spawn(async move { if let Ok(SyncStatus::FullSyncRequired(_)) = synchronizer .get_sync_state(config.dump_sync_threshold, asset_type) .await @@ -131,18 +131,22 @@ pub async fn main() -> Result<(), IngesterError> { match res { Ok(_) => { tracing::info!("Full synchronization finished successfully"); + return Ok(()); } Err(e) => { tracing::error!("Full synchronization failed: {:?}", e); + return Err(e); } } } - Ok(()) }); } - while (mutexed_tasks.lock().await.join_next().await).is_some() {} - + while let Some(task) = full_sync_tasks.join_next().await { + task.map_err(|e| { + IngesterError::UnrecoverableTaskError(format!("joining task failed: {}", e.to_string())) + })??; + } let shutdown_rx_clone = shutdown_rx.resubscribe(); let synchronizer_clone = synchronizer.clone(); mutexed_tasks.lock().await.spawn(async move { diff --git a/nft_ingester/src/cleaners/indexer_cleaner.rs b/nft_ingester/src/cleaners/indexer_cleaner.rs index 5d5b9a503..020ad7bfb 100644 --- a/nft_ingester/src/cleaners/indexer_cleaner.rs +++ b/nft_ingester/src/cleaners/indexer_cleaner.rs @@ -1,27 +1,19 @@ use std::sync::Arc; use entities::enums::AssetType; -use rocks_db::{ - key_encoders::encode_u64x2_pubkey, storage_traits::AssetUpdateIndexStorage, Storage, -}; +use postgre_client::storage_traits::AssetIndexStorage; +use rocks_db::Storage; use crate::error::IngesterError; -pub fn clean_syncronized_idxs( +pub async fn clean_syncronized_idxs( + index_storage: Arc, primary_rocks_storage: Arc, asset_type: AssetType, ) -> Result<(), IngesterError> { - let optional_last_synced_key = match asset_type { - AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(), - AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(), - }; + let optional_last_synced_key = index_storage.fetch_last_synced_id(asset_type).await; if let Ok(Some(last_synced_key)) = optional_last_synced_key { - let last_synced_key = encode_u64x2_pubkey( - last_synced_key.seq, - last_synced_key.slot, - last_synced_key.pubkey, - ); primary_rocks_storage.clean_syncronized_idxs(asset_type, last_synced_key)?; }; diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 633256a5e..be17ba5c5 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -192,7 +192,7 @@ pub struct IngesterConfig { pub price_monitoring_interval_sec: u64, } -const fn default_parallel_json_downloaders() -> i32 { +pub const fn default_parallel_json_downloaders() -> i32 { 100 } diff --git a/nft_ingester/src/error/mod.rs b/nft_ingester/src/error/mod.rs index 250bd40e6..4926e521d 100644 --- a/nft_ingester/src/error/mod.rs +++ b/nft_ingester/src/error/mod.rs @@ -247,6 +247,7 @@ impl From for IngesterError { IndexDbError::PubkeyParsingError(s) => IngesterError::ParsePubkeyError(s), IndexDbError::NotImplemented(s) => IngesterError::DatabaseError(s), a @ IndexDbError::BadArgument(_) => IngesterError::DatabaseError(a.to_string()), + a @ IndexDbError::JoinError(_) => IngesterError::DatabaseError(a.to_string()), } } } diff --git a/nft_ingester/src/index_syncronizer.rs b/nft_ingester/src/index_syncronizer.rs index e48c6a426..e3fa8f286 100644 --- a/nft_ingester/src/index_syncronizer.rs +++ b/nft_ingester/src/index_syncronizer.rs @@ -1,20 +1,21 @@ -use entities::{ - enums::AssetType, - models::{AssetIndex, FungibleAssetIndex}, -}; +use entities::enums::AssetType; use metrics_utils::SynchronizerMetricsConfig; -use postgre_client::storage_traits::{AssetIndexStorage, TempClientProvider}; +use num_bigint::BigUint; +use postgre_client::storage_traits::{AssetIndexStorage, NFTSemaphores}; use rocks_db::{ key_encoders::{decode_u64x2_pubkey, encode_u64x2_pubkey}, storage_traits::{AssetIndexStorage as AssetIndexSourceStorage, AssetUpdatedKey}, }; use solana_sdk::pubkey::Pubkey; -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, fs::File, sync::Arc}; use tokio::task::JoinSet; use tracing::warn; use crate::error::IngesterError; +const BUF_CAPACITY: usize = 1024 * 1024 * 32; +const NFT_SHARDS: u64 = 32; +const FUNGIBLE_SHARDS: u64 = 16; #[derive(Debug)] pub struct SyncState { last_indexed_key: Option, @@ -27,48 +28,40 @@ pub enum SyncStatus { NoSyncRequired, } -pub struct Synchronizer +pub struct Synchronizer where T: AssetIndexSourceStorage, U: AssetIndexStorage, - P: TempClientProvider + Send + Sync + 'static + Clone, { primary_storage: Arc, index_storage: Arc, - temp_client_provider: P, dump_synchronizer_batch_size: usize, dump_path: String, metrics: Arc, parallel_tasks: usize, - run_temp_sync_during_dump: bool, } -impl Synchronizer +impl Synchronizer where T: AssetIndexSourceStorage + Send + Sync + 'static, U: AssetIndexStorage + Clone + Send + Sync + 'static, - P: TempClientProvider + Send + Sync + 'static + Clone, { #[allow(clippy::too_many_arguments)] pub fn new( primary_storage: Arc, index_storage: Arc, - temp_client_provider: P, dump_synchronizer_batch_size: usize, dump_path: String, metrics: Arc, parallel_tasks: usize, - run_temp_sync_during_dump: bool, ) -> Self { Synchronizer { primary_storage, index_storage, - temp_client_provider, dump_synchronizer_batch_size, dump_path, metrics, parallel_tasks, - run_temp_sync_during_dump, } } @@ -123,11 +116,16 @@ where let last_indexed_key = last_indexed_key.map(decode_u64x2_pubkey).transpose()?; // Fetch the last known key from the primary storage - let last_key = match asset_type { - AssetType::NonFungible => self.primary_storage.last_known_nft_asset_updated_key()?, - AssetType::Fungible => self - .primary_storage - .last_known_fungible_asset_updated_key()?, + let (last_key, prefix) = match asset_type { + AssetType::NonFungible => ( + self.primary_storage.last_known_nft_asset_updated_key()?, + "nft", + ), + AssetType::Fungible => ( + self.primary_storage + .last_known_fungible_asset_updated_key()?, + "fungible", + ), }; let Some(last_key) = last_key else { return Ok(SyncStatus::NoSyncRequired); @@ -140,20 +138,24 @@ where })); } let last_known_seq = last_key.seq as i64; - self.metrics - .set_last_synchronized_slot("last_known_updated_seq", last_known_seq); - self.metrics - .set_last_synchronized_slot("last_known_updated_slot", last_key.slot as i64); + self.metrics.set_last_synchronized_slot( + &format!("last_known_updated_{}_seq", prefix), + last_known_seq, + ); + self.metrics.set_last_synchronized_slot( + &format!("last_known_updated_{}_slot", prefix), + last_key.slot as i64, + ); self.metrics.set_last_synchronized_slot( - "last_synchronized_slot", + &format!("last_synchronized_{}_slot", prefix), last_indexed_key .as_ref() .map(|k| k.slot) .unwrap_or_default() as i64, ); self.metrics.set_last_synchronized_slot( - "last_synchronized_seq", + &format!("last_synchronized_{}_seq", prefix), last_indexed_key.as_ref().map(|k| k.seq).unwrap_or_default() as i64, ); if let Some(last_indexed_key) = &last_indexed_key { @@ -244,92 +246,223 @@ where last_known_key.slot, last_known_key.pubkey, ); - if !self.run_temp_sync_during_dump { - return self - .dump_sync(last_included_rocks_key.as_slice(), rx, asset_type) - .await; - } - // start a regular synchronization into a temporary storage to catch up on it while the dump is being created and loaded, as it takes a loooong time - let (tx, local_rx) = tokio::sync::broadcast::channel::<()>(1); - let temp_storage = Arc::new(self.temp_client_provider.create_temp_client().await?); - temp_storage - .initialize(last_included_rocks_key.as_slice()) - .await?; - let temp_syncronizer = Arc::new(Synchronizer::new( - self.primary_storage.clone(), - temp_storage.clone(), - self.temp_client_provider.clone(), - self.dump_synchronizer_batch_size, - "not used".to_string(), - self.metrics.clone(), - 1, - false, - )); + self.dump_sync(last_included_rocks_key.as_slice(), rx, asset_type) + .await + } + + async fn dump_sync( + &self, + last_included_rocks_key: &[u8], + rx: &tokio::sync::broadcast::Receiver<()>, + asset_type: AssetType, + ) -> Result<(), IngesterError> { + tracing::info!( + "Dumping {:?} from the primary storage to {}", + asset_type, + self.dump_path + ); match asset_type { AssetType::NonFungible => { - temp_syncronizer - .nft_run(&local_rx, -1, tokio::time::Duration::from_millis(100)) - .await + self.dump_sync_nft(rx, last_included_rocks_key, NFT_SHARDS) + .await?; } AssetType::Fungible => { - temp_syncronizer - .fungible_run(&local_rx, -1, tokio::time::Duration::from_millis(100)) - .await + self.dump_sync_fungibles(rx, last_included_rocks_key, FUNGIBLE_SHARDS) + .await?; } } - self.dump_sync(last_included_rocks_key.as_slice(), rx, asset_type) - .await?; - - tx.send(()).map_err(|e| e.to_string())?; - - // now we can copy temp storage to the main storage - temp_storage.copy_to_main().await?; + tracing::info!("{:?} Dump is complete and loaded", asset_type); Ok(()) } - async fn dump_sync( + async fn dump_sync_nft( &self, - last_included_rocks_key: &[u8], rx: &tokio::sync::broadcast::Receiver<()>, - asset_type: AssetType, + last_included_rocks_key: &[u8], + num_shards: u64, ) -> Result<(), IngesterError> { - let path = std::path::Path::new(self.dump_path.as_str()); - tracing::info!("Dumping the primary storage to {}", self.dump_path); + let base_path = std::path::Path::new(self.dump_path.as_str()); + self.index_storage + .destructive_prep_to_batch_nft_load() + .await?; - match asset_type { - AssetType::NonFungible => { - self.primary_storage - .dump_nft_db( - path, - self.dump_synchronizer_batch_size, - rx, - self.metrics.clone(), - ) - .await? - } - AssetType::Fungible => { - self.primary_storage - .dump_fungible_db( - path, - self.dump_synchronizer_batch_size, - rx, - self.metrics.clone(), + let shards = shard_pubkeys(num_shards); + let mut tasks: JoinSet> = + JoinSet::new(); + for (start, end) in shards.iter() { + let name_postfix = if num_shards > 1 { + format!("_shard_{}_{}", start, end) + } else { + "".to_string() + }; + let creators_path = base_path + .join(format!("creators{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + let assets_path = base_path + .join(format!("assets{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + let authorities_path = base_path + .join(format!("assets_authorities{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + let metadata_path = base_path + .join(format!("metadata{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + tracing::info!( + "Dumping to creators: {:?}, assets: {:?}, authorities: {:?}, metadata: {:?}", + creators_path, + assets_path, + authorities_path, + metadata_path, + ); + + let assets_file = File::create(assets_path.clone()) + .map_err(|e| format!("Could not create file for assets dump: {}", e))?; + let creators_file = File::create(creators_path.clone()) + .map_err(|e| format!("Could not create file for creators dump: {}", e))?; + let authority_file = File::create(authorities_path.clone()) + .map_err(|e| format!("Could not create file for authority dump: {}", e))?; + let metadata_file = File::create(metadata_path.clone()) + .map_err(|e| format!("Could not create file for metadata dump: {}", e))?; + + let start = start.clone(); + let end = end.clone(); + let shutdown_rx = rx.resubscribe(); + let metrics = self.metrics.clone(); + let rocks_storage = self.primary_storage.clone(); + tasks.spawn_blocking(move || { + let res = rocks_storage.dump_nft_csv( + assets_file, + creators_file, + authority_file, + metadata_file, + BUF_CAPACITY, + None, + Some(start), + Some(end), + &shutdown_rx, + metrics, + )?; + Ok(( + res, + assets_path.clone(), + creators_path.clone(), + authorities_path.clone(), + metadata_path.clone(), + )) + }); + } + let mut index_tasks = JoinSet::new(); + let semaphore = Arc::new(NFTSemaphores::new()); + while let Some(task) = tasks.join_next().await { + let (_cnt, assets_path, creators_path, authorities_path, metadata_path) = + task.map_err(|e| e.to_string())??; + let index_storage = self.index_storage.clone(); + let semaphore = semaphore.clone(); + index_tasks.spawn(async move { + index_storage + .load_from_dump_nfts( + assets_path.as_str(), + creators_path.as_str(), + authorities_path.as_str(), + metadata_path.as_str(), + semaphore, ) - .await? - } + .await + }); + } + while let Some(task) = index_tasks.join_next().await { + task.map_err(|e| e.to_string())? + .map_err(|e| e.to_string())?; } + tracing::info!("All NFT assets loads complete. Finalizing the batch load"); + self.index_storage.finalize_batch_nft_load().await?; + tracing::info!("Batch load finalized for NFTs"); + self.index_storage + .update_last_synced_key(last_included_rocks_key, AssetType::NonFungible) + .await?; + Ok(()) + } - tracing::info!( - "{:?} Dump is complete. Loading the dump into the index storage", - asset_type - ); + async fn dump_sync_fungibles( + &self, + rx: &tokio::sync::broadcast::Receiver<()>, + last_included_rocks_key: &[u8], + num_shards: u64, + ) -> Result<(), IngesterError> { + let base_path = std::path::Path::new(self.dump_path.as_str()); + self.index_storage + .destructive_prep_to_batch_fungible_load() + .await?; + let shards = shard_pubkeys(num_shards); + let mut tasks: JoinSet> = JoinSet::new(); + for (start, end) in shards.iter() { + let name_postfix = if num_shards > 1 { + format!("_shard_{}_{}", start, end) + } else { + "".to_string() + }; + + let fungible_tokens_path = base_path + .join(format!("fungible_tokens{}.csv", name_postfix)) + .to_str() + .map(str::to_owned) + .unwrap(); + tracing::info!("Dumping to fungible_tokens: {:?}", fungible_tokens_path); + + let fungible_tokens_file = File::create(fungible_tokens_path.clone()) + .map_err(|e| format!("Could not create file for fungible tokens dump: {}", e))?; + + let start = start.clone(); + let end = end.clone(); + let shutdown_rx = rx.resubscribe(); + let metrics = self.metrics.clone(); + let rocks_storage = self.primary_storage.clone(); + + tasks.spawn_blocking(move || { + let res = rocks_storage.dump_fungible_csv( + (fungible_tokens_file, fungible_tokens_path.clone()), + BUF_CAPACITY, + Some(start), + Some(end), + &shutdown_rx, + metrics, + )?; + Ok((res, fungible_tokens_path)) + }); + } + let mut index_tasks = JoinSet::new(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(1)); + while let Some(task) = tasks.join_next().await { + let (_cnt, fungible_tokens_path) = task.map_err(|e| e.to_string())??; + let index_storage = self.index_storage.clone(); + let semaphore = semaphore.clone(); + index_tasks.spawn(async move { + index_storage + .load_from_dump_fungibles(fungible_tokens_path.as_str(), semaphore) + .await + }); + } + while let Some(task) = index_tasks.join_next().await { + task.map_err(|e| e.to_string())? + .map_err(|e| e.to_string())?; + } + tracing::info!("All token accounts/fungibles loads complete. Finalizing the batch load"); + self.index_storage.finalize_batch_fungible_load().await?; + tracing::info!("Batch load finalized for fungibles"); self.index_storage - .load_from_dump(path, last_included_rocks_key, asset_type) + .update_last_synced_key(last_included_rocks_key, AssetType::Fungible) .await?; - tracing::info!("{:?} Dump is loaded into the index storage", asset_type); Ok(()) } @@ -396,11 +529,11 @@ where } if let Some(last_included_rocks_key) = last_included_rocks_key { self.metrics.set_last_synchronized_slot( - "last_synchronized_slot", + "last_synchronized_fungible_slot", last_included_rocks_key.slot as i64, ); self.metrics.set_last_synchronized_slot( - "last_synchronized_seq", + "last_synchronized_fungible_seq", last_included_rocks_key.seq as i64, ); @@ -487,11 +620,11 @@ where } if let Some(last_included_rocks_key) = last_included_rocks_key { self.metrics.set_last_synchronized_slot( - "last_synchronized_slot", + "last_synchronized_nft_slot", last_included_rocks_key.slot as i64, ); self.metrics.set_last_synchronized_slot( - "last_synchronized_seq", + "last_synchronized_nft_seq", last_included_rocks_key.seq as i64, ); @@ -556,9 +689,7 @@ where } index_storage - .update_fungible_asset_indexes_batch( - asset_indexes.as_slice(), - ) + .update_fungible_asset_indexes_batch(asset_indexes.as_slice()) .await?; metrics.inc_number_of_records_synchronized( "synchronized_records", @@ -568,6 +699,84 @@ where } } +/// Generate the first and last Pubkey for each shard. +/// Returns a vector of tuples (start_pubkey, end_pubkey) for each shard. +pub fn shard_pubkeys(num_shards: u64) -> Vec<(Pubkey, Pubkey)> { + // Total keyspace as BigUint + let total_keyspace = BigUint::from_bytes_be(&[0xffu8; 32].as_slice()); + let shard_size = &total_keyspace / num_shards; + + let mut shards = Vec::new(); + for i in 0..num_shards { + // Calculate the start of the shard + let shard_start = &shard_size * i; + let shard_start_bytes = shard_start.to_bytes_be(); + + // Calculate the end of the shard + let shard_end = if i == num_shards - 1 { + total_keyspace.clone() // Last shard ends at the max value + } else { + &shard_size * (i + 1) - 1u64 + }; + let shard_end_bytes = shard_end.to_bytes_be(); + + // Pad the bytes to fit [u8; 32] + let start_pubkey = pad_to_32_bytes(&shard_start_bytes); + let end_pubkey = pad_to_32_bytes(&shard_end_bytes); + + shards.push(( + Pubkey::new_from_array(start_pubkey), + Pubkey::new_from_array(end_pubkey), + )); + } + + shards +} + +/// Pad a byte slice to fit into a [u8; 32] array. +fn pad_to_32_bytes(bytes: &[u8]) -> [u8; 32] { + let mut array = [0u8; 32]; + let offset = 32 - bytes.len(); + array[offset..].copy_from_slice(bytes); // Copy the bytes into the rightmost part of the array + array +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_shard_pubkeys_1() { + let shards = shard_pubkeys(1); + assert_eq!(shards.len(), 1); + assert_eq!( + shards[0], + ( + Pubkey::new_from_array([0; 32]), + Pubkey::new_from_array([0xff; 32]) + ) + ); + } + + #[test] + fn test_shard_pubkeys_2() { + let shards = shard_pubkeys(2); + assert_eq!(shards.len(), 2); + let first_key = [0x0; 32]; + let mut last_key = [0xff; 32]; + last_key[0] = 0x7f; + last_key[31] = 0xfe; + assert_eq!(shards[0].0.to_bytes(), first_key); + assert_eq!(shards[0].1.to_bytes(), last_key); + + let mut first_key = last_key; + first_key[31] = 0xff; + let last_key = [0xff; 32]; + assert_eq!(shards[1].0.to_bytes(), first_key); + assert_eq!(shards[1].1.to_bytes(), last_key); + } +} + #[cfg(test)] mod tests { use super::*; @@ -577,7 +786,7 @@ mod tests { }; use metrics_utils::{MetricState, MetricsTrait}; use mockall; - use postgre_client::storage_traits::{MockAssetIndexStorageMock, MockTempClientProviderMock}; + use postgre_client::storage_traits::MockAssetIndexStorageMock; use rocks_db::storage_traits::MockAssetIndexStorage as MockPrimaryStorage; use tokio; diff --git a/nft_ingester/src/init.rs b/nft_ingester/src/init.rs index 09c429fd7..87172fe38 100644 --- a/nft_ingester/src/init.rs +++ b/nft_ingester/src/init.rs @@ -10,6 +10,7 @@ use rocks_db::Storage; use std::fs::File; use std::io::Write; use std::ops::DerefMut; +use std::path::PathBuf; use std::sync::Arc; use tempfile::TempDir; use tokio::process::Command; @@ -27,11 +28,18 @@ pub async fn init_index_storage_with_migration( red_metrics: Arc, min_pg_connections: u32, pg_migrations_path: &str, + base_dump_path: Option, ) -> Result { - let pg_client = PgClient::new(url, min_pg_connections, max_pg_connections, red_metrics) - .await - .map_err(|e| e.to_string()) - .map_err(IngesterError::SqlxError)?; + let pg_client = PgClient::new( + url, + min_pg_connections, + max_pg_connections, + base_dump_path, + red_metrics, + ) + .await + .map_err(|e| e.to_string()) + .map_err(IngesterError::SqlxError)?; pg_client .run_migration(pg_migrations_path) diff --git a/nft_ingester/src/json_worker.rs b/nft_ingester/src/json_worker.rs index 953ebb886..3bd03e1a6 100644 --- a/nft_ingester/src/json_worker.rs +++ b/nft_ingester/src/json_worker.rs @@ -9,7 +9,7 @@ use metrics_utils::red::RequestErrorDurationMetrics; use metrics_utils::{JsonDownloaderMetricsConfig, MetricStatus}; use postgre_client::tasks::UpdatedTask; use postgre_client::PgClient; -use reqwest::{Client, ClientBuilder}; +use reqwest::ClientBuilder; use rocks_db::asset_previews::UrlToDownload; use rocks_db::Storage; use serde_json::Value; @@ -42,12 +42,11 @@ impl JsonWorker { rocks_db: Arc, metrics: Arc, red_metrics: Arc, + parallel_json_downloaders: i32, ) -> Self { - let config: IngesterConfig = setup_config(INGESTER_CONFIG_PREFIX); - Self { db_client, - num_of_parallel_workers: config.parallel_json_downloaders, + num_of_parallel_workers: parallel_json_downloaders, metrics, red_metrics, rocks_db, diff --git a/nft_ingester/src/processors/account_based/mpl_core_processor.rs b/nft_ingester/src/processors/account_based/mpl_core_processor.rs index a3f2729a8..12ecad240 100644 --- a/nft_ingester/src/processors/account_based/mpl_core_processor.rs +++ b/nft_ingester/src/processors/account_based/mpl_core_processor.rs @@ -113,10 +113,9 @@ impl MplCoreProcessor { let ownership_type = OwnerType::Single; let (owner, class) = match account_data.indexable_asset.clone() { MplCoreAccountData::Asset(_) => (asset.owner, SpecificationAssetClass::MplCoreAsset), - MplCoreAccountData::Collection(_) => ( - update_authority, - SpecificationAssetClass::MplCoreCollection, - ), + MplCoreAccountData::Collection(_) => { + (update_authority, SpecificationAssetClass::MplCoreCollection) + } _ => return Ok(None), }; @@ -137,7 +136,8 @@ impl MplCoreProcessor { // convert HashMap plugins into BTreeMap to have always same plugins order // for example without ordering 2 assets with same plugins can have different order saved in DB // it affects only API response and tests - let ordered_plugins: BTreeMap<_, _> = asset.plugins + let ordered_plugins: BTreeMap<_, _> = asset + .plugins .iter() .map(|(key, value)| (format!("{:?}", key), value)) .collect(); diff --git a/nft_ingester/src/raydium_price_fetcher.rs b/nft_ingester/src/raydium_price_fetcher.rs index 6dadf7363..005641c98 100644 --- a/nft_ingester/src/raydium_price_fetcher.rs +++ b/nft_ingester/src/raydium_price_fetcher.rs @@ -4,7 +4,6 @@ use interface::error::UsecaseError; use interface::price_fetcher::TokenPriceFetcher; use metrics_utils::red::RequestErrorDurationMetrics; use moka::future::Cache; -use solana_program::pubkey::Pubkey; use std::collections::HashMap; use std::sync::Arc; diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index dd9edf34b..7eb93ba11 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -7,8 +7,8 @@ mod tests { ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp, }; use blockbuster::programs::token_extensions::MintAccountExtensions; - use rocks_db::column::TypedColumn; use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; + use rocks_db::column::TypedColumn; use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; diff --git a/postgre-client/Cargo.toml b/postgre-client/Cargo.toml index c749e528f..a7bf91ea6 100644 --- a/postgre-client/Cargo.toml +++ b/postgre-client/Cargo.toml @@ -20,6 +20,7 @@ chrono = { workspace = true } usecase = { path = "../usecase" } thiserror = { workspace = true } interface = { path = "../interface" } +uuid = { workspace = true } [dev-dependencies] setup = { path = "../tests/setup" } diff --git a/postgre-client/src/asset_index_client.rs b/postgre-client/src/asset_index_client.rs index 83ff5ffc2..f79b7abf9 100644 --- a/postgre-client/src/asset_index_client.rs +++ b/postgre-client/src/asset_index_client.rs @@ -1,24 +1,25 @@ use std::{ collections::{HashMap, HashSet}, ops::Deref, + sync::Arc, vec, }; use async_trait::async_trait; use solana_sdk::pubkey::Pubkey; -use sqlx::{Connection, Executor, Postgres, QueryBuilder, Transaction}; -use std::panic::{catch_unwind, AssertUnwindSafe}; +use sqlx::{Executor, Postgres, QueryBuilder, Transaction}; +use tokio::task::JoinSet; +use uuid::Uuid; use crate::{ error::IndexDbError, model::{OwnerType, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions}, - storage_traits::AssetIndexStorage, + storage_traits::{AssetIndexStorage, NFTSemaphores}, PgClient, BATCH_DELETE_ACTION, BATCH_SELECT_ACTION, BATCH_UPSERT_ACTION, CREATE_ACTION, DROP_ACTION, INSERT_TASK_PARAMETERS_COUNT, POSTGRES_PARAMETERS_COUNT_LIMIT, SELECT_ACTION, SQL_COMPONENT, TRANSACTION_ACTION, UPDATE_ACTION, }; use entities::{ - enums::SpecificationAssetClass as AssetSpecClass, enums::AssetType, models::{AssetIndex, Creator, FungibleAssetIndex, FungibleToken, UrlWithStatus}, }; @@ -208,21 +209,7 @@ pub(crate) fn split_assets_into_components(asset_indexes: &[AssetIndex]) -> Asse let mut asset_indexes = asset_indexes.to_vec(); asset_indexes.sort_by(|a, b| a.pubkey.cmp(&b.pubkey)); - let fungible_tokens = asset_indexes - .iter() - .filter(|asset| asset.specification_asset_class == AssetSpecClass::FungibleToken) - .map(|asset| { - FungibleToken { - key: asset.pubkey, - slot_updated: asset.slot_updated, - // it's unlikely that rows below will not be filled for fungible token - // but even if that happens we will save asset with default values - owner: asset.owner.unwrap_or_default(), - asset: asset.fungible_asset_mint.unwrap_or_default(), - balance: asset.fungible_asset_balance.unwrap_or_default() as i64, - } - }) - .collect::>(); + let asset_indexes = asset_indexes .into_iter() .filter(|asset| asset.fungible_asset_mint.is_none()) @@ -383,129 +370,142 @@ impl AssetIndexStorage for PgClient { } } } + async fn load_from_dump_nfts( + &self, + assets_file_name: &str, + creators_file_name: &str, + authority_file_name: &str, + metadata_file_name: &str, + semaphore: Arc, + ) -> Result<(), IndexDbError> { + let Some(ref base_path) = self.base_dump_path else { + return Err(IndexDbError::BadArgument( + "base_dump_path is not set".to_string(), + )); + }; + let temp_postfix = Uuid::new_v4().to_string().replace("-", ""); + let mut copy_tasks: JoinSet> = JoinSet::new(); + for (file_path, table, columns, semaphore) in [ + ( base_path.join(creators_file_name), + "asset_creators_v3", + "asc_pubkey, asc_creator, asc_verified, asc_slot_updated", + semaphore.creators.clone(), + ), + ( + base_path.join(authority_file_name), + "assets_authorities", + "auth_pubkey, auth_authority, auth_slot_updated", + semaphore.authority.clone(), + ), + (base_path.join(assets_file_name), + "assets_v3", + "ast_pubkey, ast_specification_version, ast_specification_asset_class, ast_royalty_target_type, ast_royalty_amount, ast_slot_created, ast_owner_type, ast_owner, ast_delegate, ast_authority_fk, ast_collection, ast_is_collection_verified, ast_is_burnt, ast_is_compressible, ast_is_compressed, ast_is_frozen, ast_supply, ast_metadata_url_id, ast_slot_updated", + semaphore.assets.clone(), + ), + ]{ + let cl = self.clone(); + copy_tasks.spawn(async move { + cl.load_from(file_path.to_str().unwrap_or_default().to_string(), table, columns, semaphore).await + }); + } + + let file_path = base_path + .join(metadata_file_name) + .to_str() + .unwrap_or_default() + .to_string(); + let temp_postfix = temp_postfix.clone(); + let cl = self.clone(); + let semaphore = semaphore.metadata.clone(); + copy_tasks.spawn(async move { + cl.load_through_temp_table( + file_path, + "tasks", + temp_postfix.as_str(), + "tsk_id, tsk_metadata_url, tsk_status", + true, + Some(semaphore), + ) + .await + }); + while let Some(task) = copy_tasks.join_next().await { + task??; + } + Ok(()) + } - async fn load_from_dump( + async fn load_from_dump_fungibles( &self, - base_path: &std::path::Path, - last_key: &[u8], - asset_type: AssetType, + fungible_tokens_path: &str, + semaphore: Arc, ) -> Result<(), IndexDbError> { - let operation_start_time = chrono::Utc::now(); + self.load_from( + fungible_tokens_path.to_string(), + "fungible_tokens", + "fbt_pubkey, fbt_owner, fbt_asset, fbt_balance, fbt_slot_updated", + semaphore, + ) + .await + } + + async fn destructive_prep_to_batch_nft_load(&self) -> Result<(), IndexDbError> { let mut transaction = self.start_transaction().await?; - let dump_result = (async { - match asset_type { - AssetType::NonFungible => { - let Some(metadata_path) = - base_path.join("metadata.csv").to_str().map(str::to_owned) - else { - return Err(IndexDbError::BadArgument(format!( - "invalid path '{:?}'", - base_path - ))); - }; - let Some(creators_path) = - base_path.join("creators.csv").to_str().map(str::to_owned) - else { - return Err(IndexDbError::BadArgument(format!( - "invalid path '{:?}'", - base_path - ))); - }; - let Some(assets_authorities_path) = base_path - .join("assets_authorities.csv") - .to_str() - .map(str::to_owned) - else { - return Err(IndexDbError::BadArgument(format!( - "invalid path '{:?}'", - base_path - ))); - }; - let Some(assets_path) = - base_path.join("assets.csv").to_str().map(str::to_owned) - else { - return Err(IndexDbError::BadArgument(format!( - "invalid path '{:?}'", - base_path - ))); - }; - - let result_of_copy = self - .copy_nfts( - metadata_path, - creators_path, - assets_path, - assets_authorities_path, - &mut transaction, - ) - .await; - - if result_of_copy.is_ok() { - self.update_last_synced_key( - last_key, - &mut transaction, - "last_synced_key", - asset_type, - ) - .await - } else { - result_of_copy - } - } - AssetType::Fungible => { - let Some(fungible_tokens_path) = base_path - .join("fungible_tokens.csv") - .to_str() - .map(str::to_owned) - else { - return Err(IndexDbError::BadArgument(format!( - "invalid path '{:?}'", - base_path - ))); - }; - - let result_of_copy = self - .copy_fungibles(fungible_tokens_path, &mut transaction) - .await; - - if result_of_copy.is_ok() { - self.update_last_synced_key( - last_key, - &mut transaction, - "last_synced_key", - asset_type, - ) - .await - } else { - result_of_copy - } - } + match self + .destructive_prep_to_batch_nft_load_tx(&mut transaction) + .await + { + Ok(_) => { + self.commit_transaction(transaction).await?; } - }) - .await; + Err(e) => { + self.rollback_transaction(transaction).await?; + return Err(e); + } + } + Ok(()) + } + async fn finalize_batch_nft_load(&self) -> Result<(), IndexDbError> { + let mut wg = JoinSet::new(); + let v = self.clone(); + wg.spawn(async move { v.finalize_assets_load().await }); + let v = self.clone(); + wg.spawn(async move { v.finalize_creators_load().await }); + let v = self.clone(); + wg.spawn(async move { v.finalize_authorities_load().await }); + while let Some(task) = wg.join_next().await { + task??; + } + Ok(()) + } - match dump_result { + async fn destructive_prep_to_batch_fungible_load(&self) -> Result<(), IndexDbError> { + let mut transaction = self.start_transaction().await?; + match self + .destructive_prep_to_batch_fungible_load_tx(&mut transaction) + .await + { Ok(_) => { self.commit_transaction(transaction).await?; - self.metrics.observe_request( - SQL_COMPONENT, - TRANSACTION_ACTION, - "load_from_dump", - operation_start_time, - ); - Ok(()) } Err(e) => { self.rollback_transaction(transaction).await?; - self.metrics.observe_request( - SQL_COMPONENT, - TRANSACTION_ACTION, - "load_from_dump_failed", - operation_start_time, - ); - Err(e) + return Err(e); + } + } + Ok(()) + } + async fn finalize_batch_fungible_load(&self) -> Result<(), IndexDbError> { + let mut transaction = self.start_transaction().await?; + match self.finalize_batch_fungible_load_tx(&mut transaction).await { + Ok(_) => { + self.commit_transaction(transaction).await?; + } + Err(e) => { + self.rollback_transaction(transaction).await?; + return Err(e); } } + Ok(()) } async fn update_last_synced_key( diff --git a/postgre-client/src/error.rs b/postgre-client/src/error.rs index 0b82a4c8d..6fa26b370 100644 --- a/postgre-client/src/error.rs +++ b/postgre-client/src/error.rs @@ -15,6 +15,8 @@ pub enum IndexDbError { NotImplemented(String), #[error("Bad argument: {0}")] BadArgument(String), + #[error("Join Error: {0}")] + JoinError(#[from] tokio::task::JoinError), } impl From for String { diff --git a/postgre-client/src/lib.rs b/postgre-client/src/lib.rs index bf14b55f3..dcc2129b9 100644 --- a/postgre-client/src/lib.rs +++ b/postgre-client/src/lib.rs @@ -2,15 +2,16 @@ use entities::enums::TaskStatus; use entities::models::UrlWithStatus; use error::IndexDbError; use metrics_utils::red::RequestErrorDurationMetrics; +use sqlx::Executor; use sqlx::Row; use sqlx::{ migrate::Migrator, postgres::{PgConnectOptions, PgPoolOptions}, ConnectOptions, Error, PgPool, Postgres, QueryBuilder, Transaction, }; +use std::path::PathBuf; use std::{sync::Arc, time::Duration}; use tracing::log::LevelFilter; -use sqlx::Executor; pub mod asset_filter_client; pub mod asset_index_client; @@ -23,7 +24,6 @@ pub mod load_client; pub mod model; pub mod storage_traits; pub mod tasks; -pub mod temp_index_client; pub const SQL_COMPONENT: &str = "sql"; pub const SELECT_ACTION: &str = "select"; @@ -48,6 +48,7 @@ pub const PG_MIGRATIONS_PATH: &str = "./migrations"; #[derive(Clone)] pub struct PgClient { pub pool: PgPool, + pub base_dump_path: Option, pub metrics: Arc, } @@ -56,6 +57,7 @@ impl PgClient { url: &str, min_connections: u32, max_connections: u32, + base_dump_path: Option, metrics: Arc, ) -> Result { let mut options: PgConnectOptions = url.parse().unwrap(); @@ -65,14 +67,28 @@ impl PgClient { let pool = PgPoolOptions::new() .min_connections(min_connections) .max_connections(max_connections) + // 1 hour of a timeout, this is set specifically due to synchronizer needing up to 200 connections to do a full sync load + .acquire_timeout(Duration::from_secs(3600)) .connect_with(options) .await?; - Ok(Self { pool, metrics }) + Ok(Self { + pool, + base_dump_path, + metrics, + }) } - pub fn new_with_pool(pool: PgPool, metrics: Arc) -> Self { - Self { pool, metrics } + pub fn new_with_pool( + pool: PgPool, + base_dump_path: Option, + metrics: Arc, + ) -> Self { + Self { + pool, + base_dump_path, + metrics, + } } pub async fn check_health(&self) -> Result<(), String> { @@ -207,11 +223,21 @@ impl PgClient { self.truncate_table(&mut transaction, table).await?; } - transaction.execute(sqlx::query("update last_synced_key set last_synced_asset_update_key = null where id = 1;")).await?; + transaction + .execute(sqlx::query( + "update last_synced_key set last_synced_asset_update_key = null where id = 1 or id = 2;", + )) + .await?; self.recreate_fungible_indexes(&mut transaction).await?; - self.recreate_nft_indexes(&mut transaction).await?; - self.recreate_constraints(&mut transaction).await?; + self.recreate_asset_indexes(&mut transaction).await?; + self.recreate_authorities_indexes(&mut transaction).await?; + self.recreate_creators_indexes(&mut transaction).await?; + self.recreate_asset_authorities_constraints(&mut transaction) + .await?; + self.recreate_asset_creators_constraints(&mut transaction) + .await?; + self.recreate_asset_constraints(&mut transaction).await?; transaction.commit().await.map_err(|e| e)?; // those await above will not always rollback the tx diff --git a/postgre-client/src/load_client.rs b/postgre-client/src/load_client.rs index bf8d77db7..4d2d345e7 100644 --- a/postgre-client/src/load_client.rs +++ b/postgre-client/src/load_client.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use sqlx::{Execute, Postgres, QueryBuilder, Transaction}; use crate::{ error::IndexDbError, PgClient, ALTER_ACTION, COPY_ACTION, CREATE_ACTION, DROP_ACTION, - INSERT_ACTION, SQL_COMPONENT, TEMP_TABLE_PREFIX, TRUNCATE_ACTION, + INSERT_ACTION, SQL_COMPONENT, TRUNCATE_ACTION, }; impl PgClient { @@ -26,10 +28,17 @@ impl PgClient { &self, transaction: &mut Transaction<'_, Postgres>, table: &str, + temp_table: &str, + on_conflict_do_nothing: bool, ) -> Result<(), IndexDbError> { + let conflict_clause = if on_conflict_do_nothing { + " ON CONFLICT DO NOTHING" + } else { + "" + }; let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(format!( - "INSERT INTO {} SELECT * FROM {}{} ON CONFLICT DO NOTHING;", - table, TEMP_TABLE_PREFIX, table + "INSERT INTO {} SELECT * FROM {} {};", + table, temp_table, conflict_clause )); self.execute_query_with_metrics(transaction, &mut query_builder, INSERT_ACTION, table) .await?; @@ -37,6 +46,54 @@ impl PgClient { Ok(()) } + pub(crate) async fn set_unlogged_on( + &self, + transaction: &mut Transaction<'_, Postgres>, + table: &str, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = + QueryBuilder::new(format!("ALTER TABLE {} SET UNLOGGED;", table)); + self.execute_query_with_metrics(transaction, &mut query_builder, "set_unlogged", table) + .await + } + + pub(crate) async fn set_logged_on( + &self, + transaction: &mut Transaction<'_, Postgres>, + table: &str, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = + QueryBuilder::new(format!("ALTER TABLE {} SET LOGGED;", table)); + self.execute_query_with_metrics(transaction, &mut query_builder, "set_logged", table) + .await + } + + pub(crate) async fn set_autovacuum_off_on( + &self, + transaction: &mut Transaction<'_, Postgres>, + table: &str, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(format!( + "ALTER TABLE {} SET (autovacuum_enabled = false);", + table + )); + self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, table) + .await + } + + pub(crate) async fn reset_autovacuum_on( + &self, + transaction: &mut Transaction<'_, Postgres>, + table: &str, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(format!( + "ALTER TABLE {} RESET (autovacuum_enabled); ", + table + )); + self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, table) + .await + } + pub(crate) async fn truncate_table( &self, transaction: &mut Transaction<'_, Postgres>, @@ -77,7 +134,7 @@ impl PgClient { index: &str, ) -> Result<(), IndexDbError> { let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new(format!("DROP INDEX {};", index)); + QueryBuilder::new(format!("DROP INDEX IF EXISTS {};", index)); self.execute_query_with_metrics(transaction, &mut query_builder, DROP_ACTION, index) .await } @@ -87,13 +144,18 @@ impl PgClient { transaction: &mut Transaction<'_, Postgres>, ) -> Result<(), IndexDbError> { let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("ALTER TABLE assets_v3 DISABLE TRIGGER ALL;"); - self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, "assets_v3") - .await?; + QueryBuilder::new("ALTER TABLE fungible_tokens DISABLE TRIGGER ALL;"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "fungible_tokens", + ) + .await?; for index in [ - "fungible_tokens_fbt_owner_balance_idx", "fungible_tokens_fbt_asset_idx", + "fungible_tokens_fbt_owner_balance_idx", ] { self.drop_index(transaction, index).await?; } @@ -109,47 +171,215 @@ impl PgClient { self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, "assets_v3") .await?; + //alternative would be to use ALTER INDEX index_name ALTER INDEX SET DISABLED; but that might not work on all versions of postgres and might be slower + for index in [ - "assets_authority", "asset_creators_v3_creator", - "assets_v3_specification_version", - "assets_v3_specification_asset_class", - "assets_v3_royalty_target_type", - "assets_v3_royalty_amount", - "assets_v3_slot_created", - "assets_v3_owner_type", - "assets_v3_metadata_url", - "assets_v3_owner", - "assets_v3_delegate", + "assets_authority", "assets_v3_authority_fk", "assets_v3_collection_is_collection_verified", + "assets_v3_delegate", "assets_v3_is_burnt", - "assets_v3_is_compressible", "assets_v3_is_compressed", + "assets_v3_is_compressible", "assets_v3_is_frozen", - "assets_v3_supply", + "assets_v3_metadata_url", + "assets_v3_owner", + "assets_v3_owner_type", + "assets_v3_royalty_amount", + "assets_v3_royalty_target_type", + "assets_v3_slot_created", "assets_v3_slot_updated", + "assets_v3_specification_asset_class", + "assets_v3_specification_version", + "assets_v3_supply", ] { self.drop_index(transaction, index).await?; } Ok(()) } + pub(crate) async fn recreate_asset_indexes( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = + QueryBuilder::new("ALTER TABLE assets_v3 ENABLE TRIGGER ALL;"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + "trigger_enable", + "assets_v3", + ) + .await?; + + for (index, on_query_string) in [ + ("assets_v3_authority_fk", "assets_v3(ast_authority_fk) WHERE ast_authority_fk IS NOT NULL"), + ("assets_v3_collection_is_collection_verified", "assets_v3(ast_collection, ast_is_collection_verified) WHERE ast_collection IS NOT NULL"), + ("assets_v3_delegate", "assets_v3(ast_delegate) WHERE ast_delegate IS NOT NULL"), + ("assets_v3_is_burnt", "assets_v3(ast_is_burnt) WHERE ast_is_burnt IS TRUE"), + ("assets_v3_is_compressed", "assets_v3(ast_is_compressed)"), + ("assets_v3_is_compressible", "assets_v3(ast_is_compressible) WHERE ast_is_compressible IS TRUE"), + ("assets_v3_is_frozen", "assets_v3(ast_is_frozen) WHERE ast_is_frozen IS TRUE"), + ("assets_v3_metadata_url", "assets_v3 (ast_metadata_url_id) WHERE ast_metadata_url_id IS NOT NULL"), + ("assets_v3_owner", "assets_v3(ast_owner) WHERE ast_owner IS NOT NULL"), + ("assets_v3_owner_type", "assets_v3 (ast_owner_type) WHERE ast_owner_type IS NOT NULL AND ast_owner_type <> 'unknown'::owner_type"), + ("assets_v3_royalty_amount", "assets_v3 (ast_royalty_amount)"), + ("assets_v3_royalty_target_type", "assets_v3 (ast_royalty_target_type) WHERE ast_royalty_target_type <> 'creators'::royalty_target_type"), + ("assets_v3_slot_created", "assets_v3 (ast_slot_created)"), + ("assets_v3_slot_updated", "assets_v3(ast_slot_updated)"), + ("assets_v3_specification_asset_class", "assets_v3 (ast_specification_asset_class) WHERE ast_specification_asset_class IS NOT NULL AND ast_specification_asset_class <> 'unknown'::specification_asset_class"), + ("assets_v3_specification_version", "assets_v3 (ast_specification_version) WHERE ast_specification_version <> 'v1'::specification_versions"), + ("assets_v3_supply", "assets_v3(ast_supply) WHERE ast_supply IS NOT NULL"), + ]{ + self.create_index(transaction, index, on_query_string).await?; + } + Ok(()) + } + + pub(crate) async fn recreate_creators_indexes( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + self.create_index( + transaction, + "asset_creators_v3_creator", + "asset_creators_v3(asc_creator, asc_verified)", + ) + .await + } + + pub(crate) async fn recreate_authorities_indexes( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + self.create_index( + transaction, + "assets_authority", + "assets_authorities(auth_authority) WHERE auth_authority IS NOT NULL", + ) + .await + } + + pub async fn drop_fungible_constraints( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( + "ALTER TABLE fungible_tokens DROP CONSTRAINT IF EXISTS fungible_tokens_pkey;", + ); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "fungible_tokens_pkey", + ) + .await?; + Ok(()) + } + + pub async fn recreate_fungible_constraints( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new("ALTER TABLE fungible_tokens ADD CONSTRAINT fungible_tokens_pkey PRIMARY KEY (fbt_pubkey);"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "fungible_tokens_pkey", + ) + .await?; + Ok(()) + } + pub async fn drop_constraints( &self, transaction: &mut Transaction<'_, Postgres>, ) -> Result<(), IndexDbError> { - for (table, constraint) in [("assets_v3", "assets_v3_authority_fk_constraint")] { + for (table, constraint) in [ + ("assets_v3", "assets_v3_authority_fk_constraint"), + ("assets_v3", "assets_v3_ast_metadata_url_id_fkey"), + ("asset_creators_v3", "asset_creators_v3_pkey"), + ("assets_authorities", "assets_authorities_pkey"), + ("assets_v3", "assets_pkey"), + ] { let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(format!( - "ALTER TABLE {} DROP CONSTRAINT {};", + "ALTER TABLE {} DROP CONSTRAINT IF EXISTS {};", table, constraint )); - self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, table) - .await?; + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + constraint, + ) + .await?; } Ok(()) } + pub(crate) async fn recreate_asset_creators_constraints( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new("ALTER TABLE asset_creators_v3 ADD CONSTRAINT asset_creators_v3_pkey PRIMARY KEY (asc_pubkey, asc_creator);"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "asset_creators_v3_pkey", + ) + .await + } + + pub(crate) async fn recreate_asset_authorities_constraints( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new("ALTER TABLE assets_authorities ADD CONSTRAINT assets_authorities_pkey PRIMARY KEY (auth_pubkey);"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "assets_authorities_pkey", + ) + .await + } + + pub(crate) async fn recreate_asset_constraints( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( + "ALTER TABLE assets_v3 ADD CONSTRAINT assets_pkey PRIMARY KEY (ast_pubkey);", + ); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "assets_pkey", + ) + .await?; + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new("ALTER TABLE assets_v3 ADD CONSTRAINT assets_v3_authority_fk_constraint FOREIGN KEY (ast_authority_fk) REFERENCES assets_authorities(auth_pubkey) ON DELETE SET NULL ON UPDATE CASCADE NOT VALID;"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "assets_v3_authority_fk_constraint", + ) + .await?; + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new("ALTER TABLE assets_v3 ADD CONSTRAINT assets_v3_ast_metadata_url_id_fkey FOREIGN KEY (ast_metadata_url_id) REFERENCES tasks(tsk_id) ON DELETE RESTRICT ON UPDATE CASCADE NOT VALID;"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + ALTER_ACTION, + "assets_v3_ast_metadata_url_id_fkey", + ) + .await?; + Ok(()) + } + async fn create_index( &self, transaction: &mut Transaction<'_, Postgres>, @@ -170,9 +400,14 @@ impl PgClient { transaction: &mut Transaction<'_, Postgres>, ) -> Result<(), IndexDbError> { let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("ALTER TABLE assets_v3 ENABLE TRIGGER ALL;"); - self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, "assets_v3") - .await?; + QueryBuilder::new("ALTER TABLE fungible_tokens ENABLE TRIGGER ALL;"); + self.execute_query_with_metrics( + transaction, + &mut query_builder, + "trigger_enable", + "fungible_tokens", + ) + .await?; for (index, on_query_string) in [ ( @@ -190,141 +425,230 @@ impl PgClient { Ok(()) } - pub async fn recreate_nft_indexes( + pub async fn create_temp_tables( &self, + main_table: &str, + temp_table: &str, transaction: &mut Transaction<'_, Postgres>, + drop_on_commit: bool, ) -> Result<(), IndexDbError> { - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("ALTER TABLE assets_v3 ENABLE TRIGGER ALL;"); - self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, "assets_v3") + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(format!( + "CREATE TEMP TABLE {} (LIKE {} INCLUDING ALL)", + temp_table, main_table + )); + if drop_on_commit { + query_builder.push(" ON COMMIT DROP"); + } + query_builder.push(";"); + + self.execute_query_with_metrics(transaction, &mut query_builder, CREATE_ACTION, main_table) .await?; - for (index, on_query_string) in [ - ("assets_authority", "assets_authorities(auth_authority) WHERE auth_authority IS NOT NULL"), - ("asset_creators_v3_creator", "asset_creators_v3(asc_creator, asc_verified)"), - ("assets_v3_specification_version", "assets_v3 (ast_specification_version) WHERE ast_specification_version <> 'v1'::specification_versions"), - ("assets_v3_specification_asset_class", "assets_v3 (ast_specification_asset_class) WHERE ast_specification_asset_class IS NOT NULL AND ast_specification_asset_class <> 'unknown'::specification_asset_class"), - ("assets_v3_royalty_target_type", "assets_v3 (ast_royalty_target_type) WHERE ast_royalty_target_type <> 'creators'::royalty_target_type"), - ("assets_v3_royalty_amount", "assets_v3 (ast_royalty_amount)"), - ("assets_v3_slot_created", "assets_v3 (ast_slot_created)"), - ("assets_v3_owner_type", "assets_v3 (ast_owner_type) WHERE ast_owner_type IS NOT NULL AND ast_owner_type <> 'unknown'::owner_type"), - ("assets_v3_metadata_url", "assets_v3 (ast_metadata_url_id) WHERE ast_metadata_url_id IS NOT NULL"), - ("assets_v3_owner", "assets_v3(ast_owner) WHERE ast_owner IS NOT NULL"), - ("assets_v3_delegate", "assets_v3(ast_delegate) WHERE ast_delegate IS NOT NULL"), - ("assets_v3_authority_fk", "assets_v3(ast_authority_fk) WHERE ast_authority_fk IS NOT NULL"), - ("assets_v3_collection_is_collection_verified", "assets_v3(ast_collection, ast_is_collection_verified) WHERE ast_collection IS NOT NULL"), - ("assets_v3_is_burnt", "assets_v3(ast_is_burnt) WHERE ast_is_burnt IS TRUE"), - ("assets_v3_is_compressible", "assets_v3(ast_is_compressible) WHERE ast_is_compressible IS TRUE"), - ("assets_v3_is_compressed", "assets_v3(ast_is_compressed)"), - ("assets_v3_is_frozen", "assets_v3(ast_is_frozen) WHERE ast_is_frozen IS TRUE"), - ("assets_v3_supply", "assets_v3(ast_supply) WHERE ast_supply IS NOT NULL"), - ("assets_v3_slot_updated", "assets_v3(ast_slot_updated)"), - ]{ - self.create_index(transaction, index, on_query_string).await?; - } Ok(()) } - pub async fn recreate_constraints( + pub(crate) async fn load_from( + &self, + file_path: String, + table: &str, + columns: &str, + semaphore: Arc, + ) -> Result<(), IndexDbError> { + let guard = semaphore.acquire().await.map_err(|e| { + IndexDbError::BadArgument(format!("Failed to acquire semaphore: {}", e)) + })?; + let mut transaction = self.start_transaction().await?; + match self + .copy_table_from(&mut transaction, file_path, table, columns) + .await + { + Ok(_) => { + transaction.commit().await?; + drop(guard); + Ok(()) + } + Err(e) => { + transaction.rollback().await?; + drop(guard); + Err(e) + } + } + } + pub(crate) async fn load_through_temp_table( + &self, + file_path: String, + table: &str, + temp_postfix: &str, + columns: &str, + on_conflict_do_nothing: bool, + semaphore: Option>, + ) -> Result<(), IndexDbError> { + let guard = if let Some(ref s) = semaphore { + Some(s.acquire().await.map_err(|e| { + IndexDbError::BadArgument(format!("Failed to acquire semaphore: {}", e)) + })?) + } else { + None + }; + let mut transaction = self.start_transaction().await?; + let temp_table = format!("{}_{}", table, temp_postfix); + match self + .copy_through_temp_table_tx( + &mut transaction, + table, + temp_table.as_str(), + file_path, + columns, + on_conflict_do_nothing, + ) + .await + { + Ok(_) => { + transaction.commit().await?; + guard.map(|g| drop(g)); + Ok(()) + } + Err(e) => { + transaction.rollback().await?; + guard.map(|g| drop(g)); + Err(e) + } + } + } + + async fn copy_through_temp_table_tx( &self, transaction: &mut Transaction<'_, Postgres>, + table: &str, + temp_table: &str, + file_path: String, + columns: &str, + on_conflict_do_nothing: bool, ) -> Result<(), IndexDbError> { - let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new("ALTER TABLE assets_v3 ADD CONSTRAINT assets_v3_authority_fk_constraint FOREIGN KEY (ast_authority_fk) REFERENCES assets_authorities(auth_pubkey) ON DELETE SET NULL ON UPDATE CASCADE;"); - self.execute_query_with_metrics(transaction, &mut query_builder, DROP_ACTION, "assets_v3") + self.create_temp_tables(table, temp_table, transaction, true) + .await?; + self.copy_table_from(transaction, file_path, temp_table, columns) + .await?; + self.insert_from_temp_table(transaction, table, temp_table, on_conflict_do_nothing) .await } - pub async fn create_temp_tables( + pub(crate) async fn destructive_prep_to_batch_nft_load_tx( &self, - main_table: &str, transaction: &mut Transaction<'_, Postgres>, - drop_on_commit: bool, - prefix: &str, - ) -> Result<(), IndexDbError> { - let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new("CREATE TEMP TABLE "); - query_builder.push(prefix); - query_builder.push(main_table); - query_builder.push(" (LIKE "); - query_builder.push(main_table); - query_builder.push(" INCLUDING ALL)"); - if drop_on_commit { - query_builder.push(" ON COMMIT DROP"); + ) -> Result<(), IndexDbError> { + self.drop_nft_indexes(transaction).await?; + self.drop_constraints(transaction).await?; + // the only constraints left are the NOT NULL constraints, which are not dropped due to the performance impact + for table in ["assets_v3", "asset_creators_v3", "assets_authorities"] { + self.truncate_table(transaction, table).await?; + // self.set_unlogged_on(transaction, table).await?; + self.set_autovacuum_off_on(transaction, table).await?; } - query_builder.push(";"); + Ok(()) + } - self.execute_query_with_metrics(transaction, &mut query_builder, CREATE_ACTION, main_table) - .await?; + pub(crate) async fn finalize_assets_load(&self) -> Result<(), IndexDbError> { + let mut transaction = self.start_transaction().await?; + match self.finalize_assets_load_tx(&mut transaction).await { + Ok(_) => { + transaction.commit().await?; + Ok(()) + } + Err(e) => { + transaction.rollback().await?; + Err(e) + } + } + } + + pub(crate) async fn finalize_creators_load(&self) -> Result<(), IndexDbError> { + let mut transaction = self.start_transaction().await?; + match self.finalize_creators_load_tx(&mut transaction).await { + Ok(_) => { + transaction.commit().await?; + Ok(()) + } + Err(e) => { + transaction.rollback().await?; + Err(e) + } + } + } + pub(crate) async fn finalize_authorities_load(&self) -> Result<(), IndexDbError> { + let mut transaction = self.start_transaction().await?; + match self.finalize_authorities_load_tx(&mut transaction).await { + Ok(_) => { + transaction.commit().await?; + Ok(()) + } + Err(e) => { + transaction.rollback().await?; + Err(e) + } + } + } + + async fn finalize_assets_load_tx( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + self.recreate_asset_constraints(transaction).await?; + self.recreate_asset_indexes(transaction).await?; + self.reset_autovacuum_on(transaction, "assets_v3").await?; + // self.set_logged_on(transaction, "assets_v3").await?; Ok(()) } - pub(crate) async fn copy_fungibles( + async fn finalize_creators_load_tx( &self, - fungible_tokens_copy_path: String, transaction: &mut Transaction<'_, Postgres>, ) -> Result<(), IndexDbError> { - self.drop_fungible_indexes(transaction).await?; - self.drop_constraints(transaction).await?; - self.truncate_table(transaction, "fungible_tokens").await?; + self.recreate_asset_creators_constraints(transaction) + .await?; + self.recreate_creators_indexes(transaction).await?; + self.reset_autovacuum_on(transaction, "asset_creators_v3") + .await?; + // self.set_logged_on(transaction, "asset_creators_v3").await?; + Ok(()) + } - self.copy_table_from( - transaction, - fungible_tokens_copy_path, - "fungible_tokens", - "fbt_pubkey, fbt_owner, fbt_asset, fbt_balance, fbt_slot_updated", - ) - .await?; - self.recreate_fungible_indexes(transaction).await?; - self.recreate_constraints(transaction).await?; + async fn finalize_authorities_load_tx( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + self.recreate_asset_authorities_constraints(transaction) + .await?; + self.recreate_authorities_indexes(transaction).await?; + self.reset_autovacuum_on(transaction, "assets_authorities") + .await?; + // self.set_logged_on(transaction, "assets_authorities").await?; Ok(()) } - pub(crate) async fn copy_nfts( + pub(crate) async fn destructive_prep_to_batch_fungible_load_tx( &self, - matadata_copy_path: String, - asset_creators_copy_path: String, - assets_copy_path: String, - assets_authorities_copy_path: String, transaction: &mut Transaction<'_, Postgres>, ) -> Result<(), IndexDbError> { - self.drop_nft_indexes(transaction).await?; - self.drop_constraints(transaction).await?; - for table in ["assets_v3", "asset_creators_v3", "assets_authorities"] { - self.truncate_table(transaction, table).await?; - } + self.drop_fungible_indexes(transaction).await?; + self.drop_fungible_constraints(transaction).await?; + let table = "fungible_tokens"; + self.truncate_table(transaction, table).await?; + // self.set_unlogged_on(transaction, table).await?; + self.set_autovacuum_off_on(transaction, table).await?; + Ok(()) + } - let table = "tasks"; - self.create_temp_tables(table, transaction, true, TEMP_TABLE_PREFIX) - .await?; - self.copy_table_from( - transaction, - matadata_copy_path, - format!("{}{}", TEMP_TABLE_PREFIX, table).as_ref(), - "tsk_id, tsk_metadata_url, tsk_status", - ) - .await?; - self.insert_from_temp_table(transaction, table).await?; - for (table, path, columns) in [ - ( - "asset_creators_v3", - asset_creators_copy_path, - "asc_pubkey, asc_creator, asc_verified, asc_slot_updated", - ), - ( - "assets_authorities", - assets_authorities_copy_path, - "auth_pubkey, auth_authority, auth_slot_updated", - ), - ( - "assets_v3", - assets_copy_path, - "ast_pubkey, ast_specification_version, ast_specification_asset_class, ast_royalty_target_type, ast_royalty_amount, ast_slot_created, ast_owner_type, ast_owner, ast_delegate, ast_authority_fk, ast_collection, ast_is_collection_verified, ast_is_burnt, ast_is_compressible, ast_is_compressed, ast_is_frozen, ast_supply, ast_metadata_url_id, ast_slot_updated", - ), - ] { - self.copy_table_from(transaction, path, table, columns).await?; - } - self.recreate_nft_indexes(transaction).await?; - self.recreate_constraints(transaction).await?; + pub(crate) async fn finalize_batch_fungible_load_tx( + &self, + transaction: &mut Transaction<'_, Postgres>, + ) -> Result<(), IndexDbError> { + let table = "fungible_tokens"; + self.recreate_fungible_constraints(transaction).await?; + self.recreate_fungible_indexes(transaction).await?; + self.reset_autovacuum_on(transaction, table).await?; + // self.set_logged_on(transaction, table).await?; Ok(()) } } diff --git a/postgre-client/src/storage_traits.rs b/postgre-client/src/storage_traits.rs index c8827cd07..bd338d3ea 100644 --- a/postgre-client/src/storage_traits.rs +++ b/postgre-client/src/storage_traits.rs @@ -1,12 +1,30 @@ +use std::sync::Arc; + use crate::error::IndexDbError; use crate::model::{AssetSortedIndex, AssetSorting, SearchAssetsFilter}; -use crate::temp_index_client::TempClient; use async_trait::async_trait; use entities::api_req_params::GetByMethodsOptions; use entities::enums::AssetType; use entities::models::{AssetIndex, FungibleAssetIndex}; use mockall::{automock, mock}; +pub struct NFTSemaphores { + pub assets: Arc, + pub creators: Arc, + pub authority: Arc, + pub metadata: Arc, +} +impl NFTSemaphores { + pub fn new() -> Self { + Self { + assets: Arc::new(tokio::sync::Semaphore::new(1)), + creators: Arc::new(tokio::sync::Semaphore::new(1)), + authority: Arc::new(tokio::sync::Semaphore::new(1)), + metadata: Arc::new(tokio::sync::Semaphore::new(1)), + } + } +} + #[async_trait] pub trait AssetIndexStorage { async fn fetch_last_synced_id( @@ -26,12 +44,25 @@ pub trait AssetIndexStorage { last_key: &[u8], asset_type: AssetType, ) -> Result<(), IndexDbError>; - async fn load_from_dump( + + async fn load_from_dump_nfts( + &self, + assets_file_name: &str, + creators_file_name: &str, + authority_file_name: &str, + metadata_file_name: &str, + semaphores: Arc, + ) -> Result<(), IndexDbError>; + async fn load_from_dump_fungibles( &self, - base_path: &std::path::Path, - last_key: &[u8], - asset_type: AssetType, + fungible_tokens_path: &str, + semaphore: Arc, ) -> Result<(), IndexDbError>; + async fn destructive_prep_to_batch_nft_load(&self) -> Result<(), IndexDbError>; + async fn finalize_batch_nft_load(&self) -> Result<(), IndexDbError>; + + async fn destructive_prep_to_batch_fungible_load(&self) -> Result<(), IndexDbError>; + async fn finalize_batch_fungible_load(&self) -> Result<(), IndexDbError>; } mock!( @@ -47,13 +78,25 @@ mock!( &self, asset_indexes: &[FungibleAssetIndex], ) -> Result<(), IndexDbError>; - async fn load_from_dump( + async fn update_last_synced_key(&self, last_key: &[u8], assset_type: AssetType) -> Result<(), IndexDbError>; + + async fn load_from_dump_nfts( + &self, + assets_file_name: &str, + creators_file_name: &str, + authority_file_name: &str, + metadata_file_name: &str, + semaphores: Arc, + ) -> Result<(), IndexDbError>; + async fn load_from_dump_fungibles( &self, - base_path: &std::path::Path, - last_key: &[u8], - asset_type: AssetType, + fungible_tokens_path: &str, + semaphore: Arc, ) -> Result<(), IndexDbError>; - async fn update_last_synced_key(&self, last_key: &[u8], assset_type: AssetType) -> Result<(), IndexDbError>; + async fn destructive_prep_to_batch_nft_load(&self) -> Result<(), IndexDbError>; + async fn finalize_batch_nft_load(&self) -> Result<(), IndexDbError>; + async fn destructive_prep_to_batch_fungible_load(&self) -> Result<(), IndexDbError>; + async fn finalize_batch_fungible_load(&self) -> Result<(), IndexDbError>; } impl Clone for AssetIndexStorageMock { @@ -95,19 +138,3 @@ pub trait IntegrityVerificationKeysFetcher { &self, ) -> Result, IndexDbError>; } - -#[async_trait] -pub trait TempClientProvider { - async fn create_temp_client(&self) -> Result; -} - -mockall::mock!( -pub TempClientProviderMock {} -impl Clone for TempClientProviderMock { - fn clone(&self) -> Self; -} -#[async_trait] -impl TempClientProvider for TempClientProviderMock { - async fn create_temp_client(&self) -> Result; -} -); diff --git a/postgre-client/src/temp_index_client.rs b/postgre-client/src/temp_index_client.rs deleted file mode 100644 index 6d81a904b..000000000 --- a/postgre-client/src/temp_index_client.rs +++ /dev/null @@ -1,335 +0,0 @@ -use std::{sync::Arc, vec}; - -use async_trait::async_trait; -use sqlx::{pool::PoolConnection, Connection, Postgres, QueryBuilder}; -use tokio::sync::Mutex; - -use crate::{ - asset_index_client::{ - split_assets_into_components, split_into_fungible_tokens, FungibleTokenTable, TableNames, - }, - error::IndexDbError, - storage_traits::{AssetIndexStorage, TempClientProvider}, - PgClient, BATCH_UPSERT_ACTION, CREATE_ACTION, DROP_ACTION, INSERT_ACTION, UPDATE_ACTION, -}; -use entities::{ - enums::AssetType, - models::{AssetIndex, FungibleAssetIndex}, -}; - -pub const TEMP_INDEXING_TABLE_PREFIX: &str = "indexing_temp_"; -#[derive(Clone)] -pub struct TempClient { - pooled_connection: Arc>>, - pg_client: Arc, -} - -#[derive(sqlx::FromRow, Debug)] -struct AssetPubkeyRawResponse { - pub(crate) ast_pubkey: Vec, -} - -impl TempClient { - pub async fn create_new(pg_client: Arc) -> Result { - let pooled_connection = Arc::new(Mutex::new(pg_client.pool.acquire().await?)); - Ok(Self { - pooled_connection, - pg_client, - }) - } - - pub async fn initialize(&self, initial_key: &[u8]) -> Result<(), String> { - let mut c = self.pooled_connection.lock().await; - // todo: ensure the transactions are rolled back on error - let mut tx = c.begin().await.map_err(|e| e.to_string())?; - for table in [ - "tasks", - "asset_creators_v3", - "assets_authorities", - "assets_v3", - "fungible_tokens", - "last_synced_key", - ] { - self.pg_client - .create_temp_tables(table, &mut tx, false, TEMP_INDEXING_TABLE_PREFIX) - .await?; - } - - let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(format!( - "INSERT INTO {}last_synced_key (id, last_synced_asset_update_key) VALUES (1, null);", - TEMP_INDEXING_TABLE_PREFIX - )); - let last_key_table_name = format!("{}last_synced_key", TEMP_INDEXING_TABLE_PREFIX); - self.pg_client - .execute_query_with_metrics( - &mut tx, - &mut query_builder, - INSERT_ACTION, - last_key_table_name.as_str(), - ) - .await?; - self.pg_client - .update_last_synced_key( - initial_key, - &mut tx, - last_key_table_name.as_str(), - AssetType::Fungible, - ) - .await?; - self.pg_client - .update_last_synced_key( - initial_key, - &mut tx, - last_key_table_name.as_str(), - AssetType::NonFungible, - ) - .await?; - self.pg_client - .commit_transaction(tx) - .await - .map_err(|e| e.to_string())?; - Ok(()) - } - - pub async fn copy_to_main(&self) -> Result<(), IndexDbError> { - let mut c = self.pooled_connection.lock().await; - let mut tx = c.begin().await?; - - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("INSERT INTO tasks SELECT * FROM "); - query_builder.push(TEMP_INDEXING_TABLE_PREFIX); - query_builder.push("tasks ON CONFLICT DO NOTHING;"); - self.pg_client - .execute_query_with_metrics(&mut tx, &mut query_builder, BATCH_UPSERT_ACTION, "tasks") - .await?; - - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("INSERT INTO assets_authorities SELECT * FROM "); - query_builder.push(TEMP_INDEXING_TABLE_PREFIX); - query_builder.push("assets_authorities ON CONFLICT (auth_pubkey) - DO UPDATE SET - auth_authority = EXCLUDED.auth_authority, - auth_slot_updated = EXCLUDED.auth_slot_updated, - WHERE assets_authorities.auth_slot_updated <= EXCLUDED.auth_slot_updated OR assets_authorities.auth_slot_updated IS NULL;"); - - self.pg_client - .execute_query_with_metrics( - &mut tx, - &mut query_builder, - BATCH_UPSERT_ACTION, - "assets_authorities", - ) - .await?; - - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("INSERT INTO fungible_tokens SELECT * FROM "); - query_builder.push(TEMP_INDEXING_TABLE_PREFIX); - query_builder.push("fungible_tokens ON CONFLICT (fbt_owner, fbt_asset) DO NOTHING;"); - - self.pg_client - .execute_query_with_metrics( - &mut tx, - &mut query_builder, - BATCH_UPSERT_ACTION, - "fungible_tokens", - ) - .await?; - - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("INSERT INTO assets_v3 SELECT * FROM "); - query_builder.push(TEMP_INDEXING_TABLE_PREFIX); - query_builder.push("assets_v3 ON CONFLICT (ast_pubkey) - DO UPDATE SET - ast_specification_version = EXCLUDED.ast_specification_version, - ast_specification_asset_class = EXCLUDED.ast_specification_asset_class, - ast_royalty_target_type = EXCLUDED.ast_royalty_target_type, - ast_royalty_amount = EXCLUDED.ast_royalty_amount, - ast_slot_created = EXCLUDED.ast_slot_created, - ast_owner_type = EXCLUDED.ast_owner_type, - ast_owner = EXCLUDED.ast_owner, - ast_delegate = EXCLUDED.ast_delegate, - ast_authority_fk = EXCLUDED.ast_authority_fk, - ast_collection = EXCLUDED.ast_collection, - ast_is_collection_verified = EXCLUDED.ast_is_collection_verified, - ast_is_burnt = EXCLUDED.ast_is_burnt, - ast_is_compressible = EXCLUDED.ast_is_compressible, - ast_is_compressed = EXCLUDED.ast_is_compressed, - ast_is_frozen = EXCLUDED.ast_is_frozen, - ast_supply = EXCLUDED.ast_supply, - ast_metadata_url_id = EXCLUDED.ast_metadata_url_id, - ast_slot_updated = EXCLUDED.ast_slot_updated - WHERE assets_v3.ast_slot_updated <= EXCLUDED.ast_slot_updated OR assets_v3.ast_slot_updated IS NULL;"); - - self.pg_client - .execute_query_with_metrics( - &mut tx, - &mut query_builder, - BATCH_UPSERT_ACTION, - "assets_v3", - ) - .await?; - - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("DECLARE all_updated_assets CURSOR FOR SELECT ast_pubkey FROM "); - query_builder.push(TEMP_INDEXING_TABLE_PREFIX); - query_builder.push("assets_v3"); - self.pg_client - .execute_query_with_metrics(&mut tx, &mut query_builder, CREATE_ACTION, "cursor") - .await?; - let mut new_creators = vec![]; - let mut old_creators = vec![]; - loop { - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("FETCH 10000 FROM all_updated_assets"); - // Fetch a batch of rows from the cursor - let query: sqlx::query::QueryAs<'_, Postgres, _, sqlx::postgres::PgArguments> = - query_builder.build_query_as::(); - let rows = query.fetch_all(&mut tx).await?; - - // If no rows were fetched, we are done - if rows.is_empty() { - break; - } - let keys = rows - .iter() - .map(|r| r.ast_pubkey.clone()) - .collect::>(); - new_creators.extend( - self.pg_client - .batch_get_creators( - &mut tx, - &keys, - format!("{}asset_creators_v3", TEMP_INDEXING_TABLE_PREFIX).as_str(), - ) - .await?, - ); - old_creators.extend( - self.pg_client - .batch_get_creators(&mut tx, &keys, "asset_creators_v3") - .await? - .iter() - .map(|(pk, c, _)| (*pk, c.clone())), - ); - } - - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("CLOSE all_updated_assets"); - self.pg_client - .execute_query_with_metrics(&mut tx, &mut query_builder, DROP_ACTION, "cursor") - .await?; - - let creator_updates = PgClient::diff(new_creators, old_creators); - self.pg_client - .update_creators(&mut tx, creator_updates, "asset_creators_v3") - .await?; - - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("UPDATE last_synced_key SET last_synced_asset_update_key = (SELECT last_synced_asset_update_key FROM "); - query_builder.push(TEMP_INDEXING_TABLE_PREFIX); - query_builder.push("last_synced_key WHERE id = 1) WHERE id = 1;"); - self.pg_client - .execute_query_with_metrics( - &mut tx, - &mut query_builder, - UPDATE_ACTION, - "last_synced_key", - ) - .await?; - self.pg_client.commit_transaction(tx).await - } -} - -#[async_trait] -impl AssetIndexStorage for TempClient { - async fn fetch_last_synced_id( - &self, - asset_type: AssetType, - ) -> Result>, IndexDbError> { - let mut c = self.pooled_connection.lock().await; - let mut tx = c.begin().await?; - self.pg_client - .fetch_last_synced_id_impl( - format!("{}last_synced_key", TEMP_INDEXING_TABLE_PREFIX).as_str(), - &mut tx, - asset_type, - ) - .await - } - - async fn update_nft_asset_indexes_batch( - &self, - asset_indexes: &[AssetIndex], - ) -> Result<(), IndexDbError> { - let updated_components = split_assets_into_components(asset_indexes); - let mut c = self.pooled_connection.lock().await; - let mut transaction = c.begin().await?; - let table_names = TableNames { - metadata_table: format!("{}tasks", TEMP_INDEXING_TABLE_PREFIX), - assets_table: format!("{}assets_v3", TEMP_INDEXING_TABLE_PREFIX), - creators_table: format!("{}asset_creators_v3", TEMP_INDEXING_TABLE_PREFIX), - authorities_table: format!("{}assets_authorities", TEMP_INDEXING_TABLE_PREFIX), - }; - self.pg_client - .upsert_batched_nft(&mut transaction, table_names, updated_components) - .await?; - self.pg_client.commit_transaction(transaction).await - } - - async fn update_fungible_asset_indexes_batch( - &self, - fungible_asset_indexes: &[FungibleAssetIndex], - ) -> Result<(), IndexDbError> { - let mut c = self.pooled_connection.lock().await; - let mut transaction = c.begin().await?; - let table_name = FungibleTokenTable::new( - format!("{}fungible_tokens", TEMP_INDEXING_TABLE_PREFIX).as_str(), - ); - - self.pg_client - .upsert_batched_fungible( - &mut transaction, - table_name, - split_into_fungible_tokens(fungible_asset_indexes), - ) - .await?; - self.pg_client.commit_transaction(transaction).await - } - - async fn update_last_synced_key( - &self, - last_key: &[u8], - asset_type: AssetType, - ) -> Result<(), IndexDbError> { - let mut c = self.pooled_connection.lock().await; - let mut transaction = c.begin().await?; - self.pg_client - .update_last_synced_key( - last_key, - &mut transaction, - format!("{}last_synced_key", TEMP_INDEXING_TABLE_PREFIX).as_str(), - asset_type, - ) - .await?; - self.pg_client.commit_transaction(transaction).await - } - - async fn load_from_dump( - &self, - _base_path: &std::path::Path, - _last_key: &[u8], - _asset_type: AssetType, - ) -> Result<(), IndexDbError> { - Err(IndexDbError::NotImplemented( - "Temporary client does not support batch load from file".to_string(), - )) - } -} - -#[async_trait] -impl TempClientProvider for Arc { - async fn create_temp_client(&self) -> Result { - TempClient::create_new(self.clone()) - .await - .map_err(|e| e.into()) - } -} diff --git a/rocks-db/src/asset.rs b/rocks-db/src/asset.rs index e2dc6a83e..15755f680 100644 --- a/rocks-db/src/asset.rs +++ b/rocks-db/src/asset.rs @@ -2160,9 +2160,7 @@ pub fn merge_complete_details_fb_simple_raw<'a>( .iter() .filter_map(|(k, v)| { v.is_current_owner.as_ref().filter(|u| u.value()).map( - |is_owner| { - (is_owner.slot_updated(), k.clone(), v.clone()) - }, + |is_owner| (is_owner.slot_updated(), *k, v.clone()), ) }) .max_by_key(|(slot, _, _)| *slot) diff --git a/rocks-db/src/asset_client.rs b/rocks-db/src/asset_client.rs index b2793e983..991db2b04 100644 --- a/rocks-db/src/asset_client.rs +++ b/rocks-db/src/asset_client.rs @@ -3,8 +3,8 @@ use solana_sdk::pubkey::Pubkey; use std::sync::atomic::Ordering; use crate::asset::{ - AssetCollection, AssetCompleteDetails, AssetSelectedMaps, AssetsUpdateIdx, SlotAssetIdx, - SlotAssetIdxKey,FungibleAssetsUpdateIdx, + AssetCollection, AssetCompleteDetails, AssetSelectedMaps, AssetsUpdateIdx, + FungibleAssetsUpdateIdx, SlotAssetIdx, SlotAssetIdxKey, }; use crate::asset_generated::asset as fb; use crate::column::{Column, TypedColumn}; diff --git a/rocks-db/src/batch_client.rs b/rocks-db/src/batch_client.rs index a3fdae775..f34354cdc 100644 --- a/rocks-db/src/batch_client.rs +++ b/rocks-db/src/batch_client.rs @@ -1,8 +1,8 @@ use std::collections::{HashMap, HashSet}; use crate::asset::{ - AssetCollection, AssetCompleteDetails, AssetLeaf, AssetsUpdateIdx, MplCoreCollectionAuthority, - SlotAssetIdx, SlotAssetIdxKey, FungibleAssetsUpdateIdx, + AssetCollection, AssetCompleteDetails, AssetLeaf, AssetsUpdateIdx, FungibleAssetsUpdateIdx, + MplCoreCollectionAuthority, SlotAssetIdx, SlotAssetIdxKey, }; use crate::asset_generated::asset as fb; use crate::cl_items::{ClItem, ClItemKey, ClLeaf, ClLeafKey}; @@ -17,10 +17,13 @@ use crate::{ BATCH_GET_ACTION, BATCH_ITERATION_ACTION, ITERATOR_TOP_ACTION, ROCKS_COMPONENT, }; use async_trait::async_trait; -use entities::enums::{SpecificationAssetClass, SpecificationVersions, TokenMetadataEdition}; -use entities::models::{AssetIndex, CompleteAssetDetails, UpdateVersion, Updated, FungibleAssetIndex, UrlWithStatus}; +use entities::enums::{SpecificationAssetClass, TokenMetadataEdition}; +use entities::models::{ + AssetIndex, CompleteAssetDetails, FungibleAssetIndex, UpdateVersion, Updated, +}; use serde_json::json; use solana_sdk::pubkey::Pubkey; +use tracing::info; impl AssetUpdateIndexStorage for Storage { fn last_known_fungible_asset_updated_key(&self) -> Result> { _ = self.db.try_catch_up_with_primary(); @@ -97,6 +100,12 @@ impl AssetUpdateIndexStorage for Storage { break; } let decoded_key = decode_u64x2_pubkey(key.clone()).unwrap(); + if let Some(ref last_key) = last_key { + if decoded_key.seq != last_key.seq + 1 { + info!("Breaking the fungibles sync loop at seq {} as the sequence is not consecutive to the previously handled {}", decoded_key.seq, last_key.seq); + break; + } + } last_key = Some(decoded_key.clone()); // Skip keys that are in the skip_keys set if skip_keys @@ -157,6 +166,12 @@ impl AssetUpdateIndexStorage for Storage { break; } let decoded_key = decode_u64x2_pubkey(key.clone()).unwrap(); + if let Some(ref last_key) = last_key { + if decoded_key.seq != last_key.seq + 1 { + info!("Breaking the NFT sync loop at seq {} as the sequence is not consecutive to the previously handled {}", decoded_key.seq, last_key.seq); + break; + } + } last_key = Some(decoded_key.clone()); // Skip keys that are in the skip_keys set if skip_keys @@ -319,16 +334,13 @@ impl AssetIndexReader for Storage { Ok(fungible_assets_indexes) } - async fn get_nft_asset_indexes<'a>( - &self, - keys: &[Pubkey], - ) -> Result> { + async fn get_nft_asset_indexes<'a>(&self, keys: &[Pubkey]) -> Result> { let start_time = chrono::Utc::now(); - + let asset_index_collection_url_fut = self.get_asset_indexes_with_collections_and_urls(keys.to_vec()); let spl_mints_fut = self.spl_mints.batch_get(keys.to_vec()); - + let (mut asset_indexes, assets_collection_pks, urls) = asset_index_collection_url_fut.await?; @@ -596,6 +608,7 @@ impl Storage { batch, data.pubkey, &MplCoreCollectionAuthority { + // total BS authority: data.collection.as_ref().unwrap().authority.clone(), }, )?; //this will never error in fact diff --git a/rocks-db/src/dump_client.rs b/rocks-db/src/dump_client.rs index f76841e04..eb2f7f4e6 100644 --- a/rocks-db/src/dump_client.rs +++ b/rocks-db/src/dump_client.rs @@ -1,9 +1,12 @@ use crate::asset::MplCoreCollectionAuthority; use crate::asset_generated::asset as fb; use crate::column::TypedColumn; +use crate::key_encoders::encode_u64x2_pubkey; +use crate::storage_traits::AssetUpdatedKey; use crate::{column::Column, storage_traits::Dumper, Storage}; -use async_trait::async_trait; +use bincode::deserialize; use csv::WriterBuilder; +use entities::enums::AssetType; use entities::models::SplMint; use entities::{ enums::{OwnerType, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions}, @@ -13,17 +16,15 @@ use hex; use inflector::Inflector; use metrics_utils::SynchronizerMetricsConfig; use serde::{Serialize, Serializer}; +use solana_sdk::pubkey::Pubkey; use std::{ collections::{HashMap, HashSet}, fs::File, io::BufWriter, sync::Arc, }; -use tokio::time::Instant; use tracing::{error, info}; -const BUF_CAPACITY: usize = 1024 * 1024 * 32; - fn serialize_as_snake_case(value: &T, serializer: S) -> Result where S: Serializer, @@ -75,7 +76,7 @@ struct AssetRecord { ast_slot_updated: i64, } -impl Storage { +impl Dumper for Storage { /// Concurrently dumps data into several `CSV files`, /// where each file corresponds to a separate table in the index database (`Postgres`). /// @@ -87,17 +88,22 @@ impl Storage { /// `batch_size` - Batch size. /// `rx` - Channel for graceful shutdown. #[allow(clippy::too_many_arguments)] - pub async fn dump_nft_csv( + fn dump_nft_csv( &self, - metadata_file_and_path: (File, String), - assets_file_and_path: (File, String), - creators_file_and_path: (File, String), - authority_file_and_path: (File, String), - batch_size: usize, + assets_file: File, + creators_file: File, + authority_file: File, + metadata_file: File, + buf_capacity: usize, + asset_limit: Option, + start_pubkey: Option, + end_pubkey: Option, rx: &tokio::sync::broadcast::Receiver<()>, synchronizer_metrics: Arc, - ) -> Result<(), String> { - let mut core_collections: HashMap, Vec> = HashMap::new(); + ) -> Result { + let mut metadata_key_set = HashSet::new(); + + let mut core_collection_authorities: HashMap, Vec> = HashMap::new(); let mut core_collections_iter = self .db .raw_iterator_cf(&self.mpl_core_collection_authorities.handle()); @@ -107,39 +113,46 @@ impl Storage { let value = core_collections_iter.value().unwrap(); if let Ok(value) = bincode::deserialize::(value) { if let Some(authority) = value.authority.value { - core_collections.insert(key.to_vec(), authority.to_bytes().to_vec()); + core_collection_authorities.insert(key.to_vec(), authority.to_bytes().to_vec()); } } core_collections_iter.next(); } - let mut metadata_key_set = HashSet::new(); - let mut authorities_key_set = HashSet::new(); - - let buf_writer = BufWriter::with_capacity(BUF_CAPACITY, assets_file_and_path.0); + let buf_writer = BufWriter::with_capacity(buf_capacity, assets_file); let mut asset_writer = WriterBuilder::new() .has_headers(false) .from_writer(buf_writer); - let buf_writer = BufWriter::with_capacity(BUF_CAPACITY, authority_file_and_path.0); + let buf_writer = BufWriter::with_capacity(buf_capacity, authority_file); let mut authority_writer = WriterBuilder::new() .has_headers(false) .from_writer(buf_writer); - let buf_writer = BufWriter::with_capacity(BUF_CAPACITY, creators_file_and_path.0); + let buf_writer = BufWriter::with_capacity(buf_capacity, creators_file); let mut creators_writer = WriterBuilder::new() .has_headers(false) .from_writer(buf_writer); - - let buf_writer = BufWriter::with_capacity(BUF_CAPACITY, metadata_file_and_path.0); + let buf_writer = BufWriter::with_capacity(buf_capacity, metadata_file); let mut metadata_writer = WriterBuilder::new() .has_headers(false) .from_writer(buf_writer); // Iteration over `asset_data` column via CUSTOM iterator. let mut iter = self.db.raw_iterator_cf(&self.asset_data.handle()); - iter.seek_to_first(); + if let Some(start_pubkey) = start_pubkey { + iter.seek(&start_pubkey.to_bytes()); + } else { + iter.seek_to_first(); + } + let end_pubkey = end_pubkey.map(|pk| pk.to_bytes()); + let mut cnt = 0; while iter.valid() { + if let Some(end_pubkey) = end_pubkey { + if iter.key().unwrap() > end_pubkey.as_slice() { + break; + } + } let key = iter.key().unwrap(); let encoded_key = Self::encode(key); let value = iter.value().unwrap(); @@ -174,27 +187,28 @@ impl Storage { } } } - let metadata_url = asset + let metadata_url_id = asset .dynamic_details() .and_then(|dd| dd.url()) .and_then(|url| url.value()) .filter(|s| !s.is_empty()) - .map(|s| (UrlWithStatus::get_metadata_id_for(s), s)); - if let Some((ref metadata_key, ref url)) = metadata_url { - { - if !metadata_key_set.contains(metadata_key) { + .map(|s| UrlWithStatus::new(s, false)) + .filter(|uws| !uws.metadata_url.is_empty()) + .map(|uws| { + let metadata_key = uws.get_metadata_id(); + if !metadata_key_set.contains(&metadata_key) { metadata_key_set.insert(metadata_key.clone()); if let Err(e) = metadata_writer.serialize(( - Self::encode(metadata_key), - url.to_string(), + Self::encode(&metadata_key), + uws.metadata_url.to_string(), "pending".to_string(), )) { error!("Error writing metadata to csv: {:?}", e); } synchronizer_metrics.inc_num_of_records_written("metadata", 1); } - } - } + metadata_key + }); let slot_updated = asset.get_slot_updated() as i64; if let Some(cc) = asset @@ -215,11 +229,11 @@ impl Storage { synchronizer_metrics.inc_num_of_records_written("creators", 1); } } - let update_authority = asset + let core_collection_update_authority = asset .collection() .and_then(|c| c.collection()) .and_then(|c| c.value()) - .and_then(|c| core_collections.get(c.bytes())) + .and_then(|c| core_collection_authorities.get(c.bytes())) .map(|b| b.to_owned()); let authority = asset .authority() @@ -263,7 +277,7 @@ impl Storage { .map(|v| v.bytes()) .map(Self::encode), ast_authority_fk: if let Some(collection) = collection.as_ref() { - if update_authority.is_some() { + if core_collection_update_authority.is_some() { Some(collection.to_owned()) } else if authority.is_some() { Some(encoded_key.clone()) @@ -304,7 +318,7 @@ impl Storage { .dynamic_details() .and_then(|d| d.supply()) .map(|v| v.value() as i64), - ast_metadata_url_id: metadata_url.map(|(k, _)| k).map(Self::encode), + ast_metadata_url_id: metadata_url_id.map(Self::encode), ast_slot_updated: slot_updated, }; @@ -312,44 +326,46 @@ impl Storage { error!("Error writing asset to csv: {:?}", e); } synchronizer_metrics.inc_num_of_records_written("asset", 1); - let authority_key = if update_authority.is_some() { - collection - } else { - Some(encoded_key) - }; - let authority = update_authority.or(authority); - if let (Some(authority_key), Some(authority)) = (authority_key, authority) { - { - if !authorities_key_set.contains(&authority_key) { - authorities_key_set.insert(authority_key.clone()); - if let Err(e) = authority_writer.serialize(( - authority_key, - Self::encode(authority), - slot_updated, - )) { - error!("Error writing authority to csv: {:?}", e); - } - synchronizer_metrics.inc_num_of_records_written("authority", 1); + // the authority key is the collection key if the core collection update authority is set, or the asset key itself + // for the asset keys, we could write those without the need to check if they are already written, + // and for the collection keys, we just skip as those will be written as part of the core collection account dump + // todo: this was refactored, need to be verified as part of https://linear.app/mplx/issue/MTG-979/fix-test-mpl-core-get-assets-by-authority + if core_collection_update_authority.is_none() { + let authority_key = if core_collection_update_authority.is_some() { + collection + } else { + Some(encoded_key) + }; + let authority = core_collection_update_authority.or(authority); + if let (Some(authority_key), Some(authority)) = (authority_key, authority) { + if let Err(e) = authority_writer.serialize(( + authority_key, + Self::encode(authority), + slot_updated, + )) { + error!("Error writing authority to csv: {:?}", e); } + synchronizer_metrics.inc_num_of_records_written("authority", 1); } } if !rx.is_empty() { return Err("dump cancelled".to_string()); } iter.next(); + cnt += 1; + if let Some(limit) = asset_limit { + if cnt >= limit { + break; + } + } synchronizer_metrics.inc_num_of_assets_iter("asset", 1); } - _ = tokio::try_join!( - tokio::task::spawn_blocking(move || asset_writer.flush()), - tokio::task::spawn_blocking(move || authority_writer.flush()), - tokio::task::spawn_blocking(move || creators_writer.flush()), - tokio::task::spawn_blocking(move || metadata_writer.flush()) - ) - .map_err(|e| e.to_string())?; - + asset_writer.flush().map_err(|e| e.to_string())?; + authority_writer.flush().map_err(|e| e.to_string())?; + creators_writer.flush().map_err(|e| e.to_string())?; + metadata_writer.flush().map_err(|e| e.to_string())?; info!("asset writers are flushed."); - - Ok(()) + Ok(cnt) } /// Dumps data into several into `CSV file` dedicated for fungible tokens, @@ -360,65 +376,56 @@ impl Storage { /// `batch_size` - Batch size. /// `rx` - Channel for graceful shutdown. #[allow(clippy::too_many_arguments)] - pub async fn dump_fungible_csv( + fn dump_fungible_csv( &self, fungible_tokens_file_and_path: (File, String), - batch_size: usize, + buf_capacity: usize, + start_pubkey: Option, + end_pubkey: Option, rx: &tokio::sync::broadcast::Receiver<()>, synchronizer_metrics: Arc, - ) -> Result<(), String> { + ) -> Result { let column: Column = Self::column(self.db.clone(), self.red_metrics.clone()); - let buf_writer = BufWriter::with_capacity(BUF_CAPACITY, fungible_tokens_file_and_path.0); + let buf_writer = BufWriter::with_capacity(buf_capacity, fungible_tokens_file_and_path.0); + let mut iter = self.db.raw_iterator_cf(&column.handle()); let mut writer = WriterBuilder::new() .has_headers(false) .from_writer(buf_writer); - // asset, owner, balance, slot updated - let mut batch: Vec<(String, String, String, i64, i64)> = Vec::new(); - - for (_, token) in column.pairs_iterator(column.iter_start()) { + if let Some(start_pubkey) = start_pubkey { + iter.seek(&start_pubkey.to_bytes()); + } else { + iter.seek_to_first(); + } + let end_pubkey = end_pubkey.map(|pk| pk.to_bytes()); + let mut cnt = 0; + while iter.valid() { + if let Some(end_pubkey) = end_pubkey { + if iter.key().unwrap() > end_pubkey.as_slice() { + break; + } + } if !rx.is_empty() { info!("Shutdown signal received..."); - return Ok(()); + return Ok(cnt); } - batch.push(( - Self::encode(token.pubkey), - Self::encode(token.owner), - Self::encode(token.mint), - token.amount, - token.slot_updated, - )); - synchronizer_metrics.inc_num_of_assets_iter("token_account", 1); - - if batch.len() >= batch_size { - let start = Instant::now(); - for rec in &batch { - if let Err(e) = writer.serialize(rec).map_err(|e| e.to_string()) { - let msg = format!( - "Error while writing data into {:?}. Err: {:?}", - fungible_tokens_file_and_path.1, e - ); - error!("{}", msg); - return Err(msg); - } - } - batch.clear(); - synchronizer_metrics.set_file_write_time( - fungible_tokens_file_and_path.1.as_ref(), - start.elapsed().as_millis() as f64, - ); - } - } - - if !batch.is_empty() { - let start = Instant::now(); - for rec in &batch { - if !rx.is_empty() { - info!("Shutdown signal received..."); - return Ok(()); - } - if let Err(e) = writer.serialize(rec).map_err(|e| e.to_string()) { + if let Some(token) = iter + .value() + .map(deserialize::) + .map(|v| v.ok()) + .flatten() + { + if let Err(e) = writer + .serialize(( + Self::encode(token.pubkey), + Self::encode(token.owner), + Self::encode(token.mint), + token.amount, + token.slot_updated, + )) + .map_err(|e| e.to_string()) + { let msg = format!( "Error while writing data into {:?}. Err: {:?}", fungible_tokens_file_and_path.1, e @@ -426,15 +433,10 @@ impl Storage { error!("{}", msg); return Err(msg); } + synchronizer_metrics.inc_num_of_assets_iter("token_account", 1); } - - synchronizer_metrics.set_file_write_time( - fungible_tokens_file_and_path.1.as_ref(), - start.elapsed().as_millis() as f64, - ); - synchronizer_metrics - .inc_num_of_records_written(&fungible_tokens_file_and_path.1, batch.len() as u64); - batch.clear(); + iter.next(); + cnt += 1; } if let Err(e) = writer.flush().map_err(|e| e.to_string()) { @@ -447,115 +449,62 @@ impl Storage { } info!("Finish dumping fungible assets."); - Ok(()) + Ok(cnt) } - +} +impl Storage { fn encode>(v: T) -> String { format!("\\x{}", hex::encode(v)) } -} -#[async_trait] -impl Dumper for Storage { - /// The `dump_db` function is an asynchronous method responsible for dumping fungible database content into a dedicated `CSV file`. - /// The function supports batch processing and listens to a signal using a `tokio::sync::broadcast::Receiver` to handle cancellation - /// or control flow. - /// # Args: - /// * `base_path` - A reference to a Path that specifies the base directory where the `CSV files` will be created. - /// The function will append filenames (`metadata.csv, creators.csv, assets.csv, assets_authorities.csv`) to this path. - /// * `batch_size` - The size of the data batches to be processed and written to the files. - /// * `rx` - A receiver that listens for cancellation signals. - async fn dump_fungible_db( - &self, - base_path: &std::path::Path, - batch_size: usize, - rx: &tokio::sync::broadcast::Receiver<()>, + pub fn dump_metadata( + file: File, + buf_capacity: usize, + metadata: HashSet, synchronizer_metrics: Arc, ) -> Result<(), String> { - let fungible_tokens_path = base_path - .join("fungible_tokens.csv") - .to_str() - .map(str::to_owned); - tracing::info!("Dumping to fungible_tokens: {:?}", fungible_tokens_path); - - let fungible_tokens_file = File::create(fungible_tokens_path.clone().unwrap()) - .map_err(|e| format!("Could not create file for fungible tokens dump: {}", e))?; + let buf_writer = BufWriter::with_capacity(buf_capacity, file); + let mut metadata_writer = WriterBuilder::new() + .has_headers(false) + .from_writer(buf_writer); - self.dump_fungible_csv( - (fungible_tokens_file, fungible_tokens_path.unwrap()), - batch_size, - rx, - synchronizer_metrics, - ) - .await?; - Ok(()) + metadata.into_iter().for_each(|s| { + let url = s; + let metadata_key = UrlWithStatus::get_metadata_id_for(&url); + if let Err(e) = metadata_writer.serialize(( + Self::encode(metadata_key), + url.to_string(), + "pending".to_string(), + )) { + error!("Error writing metadata to csv: {:?}", e); + } + synchronizer_metrics.inc_num_of_records_written("metadata", 1); + }); + metadata_writer.flush().map_err(|e| e.to_string()) } - /// The `dump_db` function is an asynchronous method responsible for dumping database content into multiple `CSV files`. - /// It writes metadata, asset information, creator details, and asset authorities to separate `CSV files` in the provided directory. - /// The function supports batch processing and listens to a signal using a `tokio::sync::broadcast::Receiver` to handle cancellation - /// or control flow. - /// # Args: - /// * `base_path` - A reference to a Path that specifies the base directory where the `CSV files` will be created. - /// The function will append filenames (`metadata.csv, creators.csv, assets.csv, assets_authorities.csv`) to this path. - /// * `batch_size` - The size of the data batches to be processed and written to the files. - /// * `rx` - A receiver that listens for cancellation signals. - async fn dump_nft_db( - &self, - base_path: &std::path::Path, - batch_size: usize, - rx: &tokio::sync::broadcast::Receiver<()>, - synchronizer_metrics: Arc, + pub fn dump_last_keys( + file: File, + last_known_nft_key: AssetUpdatedKey, + last_known_fungible_key: AssetUpdatedKey, ) -> Result<(), String> { - let metadata_path = base_path.join("metadata.csv").to_str().map(str::to_owned); - if metadata_path.is_none() { - return Err("invalid path".to_string()); - } - let creators_path = base_path.join("creators.csv").to_str().map(str::to_owned); - if creators_path.is_none() { - return Err("invalid path".to_string()); - } - let assets_path = base_path.join("assets.csv").to_str().map(str::to_owned); - if assets_path.is_none() { - return Err("invalid path".to_string()); - } - let authorities_path = base_path - .join("assets_authorities.csv") - .to_str() - .map(str::to_owned); - if authorities_path.is_none() { - return Err("invalid path".to_string()); - } - if authorities_path.is_none() { - return Err("invalid path".to_string()); - } - tracing::info!( - "Dumping to metadata: {:?}, creators: {:?}, assets: {:?}, authorities: {:?}", - metadata_path, - creators_path, - assets_path, - authorities_path, + let mut writer = WriterBuilder::new().has_headers(false).from_writer(file); + let nft_key = encode_u64x2_pubkey( + last_known_nft_key.seq, + last_known_nft_key.slot, + last_known_nft_key.pubkey, ); - - let metadata_file = File::create(metadata_path.clone().unwrap()) - .map_err(|e| format!("Could not create file for metadata dump: {}", e))?; - let assets_file = File::create(assets_path.clone().unwrap()) - .map_err(|e| format!("Could not create file for assets dump: {}", e))?; - let creators_file = File::create(creators_path.clone().unwrap()) - .map_err(|e| format!("Could not create file for creators dump: {}", e))?; - let authority_file = File::create(authorities_path.clone().unwrap()) - .map_err(|e| format!("Could not create file for authority dump: {}", e))?; - - self.dump_nft_csv( - (metadata_file, metadata_path.unwrap()), - (assets_file, assets_path.unwrap()), - (creators_file, creators_path.unwrap()), - (authority_file, authorities_path.unwrap()), - batch_size, - rx, - synchronizer_metrics, - ) - .await?; - Ok(()) + let fungible_key = encode_u64x2_pubkey( + last_known_fungible_key.seq, + last_known_fungible_key.slot, + last_known_fungible_key.pubkey, + ); + writer + .serialize((AssetType::NonFungible as i32, Self::encode(nft_key))) + .map_err(|e| e.to_string())?; + writer + .serialize((AssetType::Fungible as i32, Self::encode(fungible_key))) + .map_err(|e| e.to_string())?; + writer.flush().map_err(|e| e.to_string()) } } diff --git a/rocks-db/src/storage_traits.rs b/rocks-db/src/storage_traits.rs index 223be2c83..b27702a33 100644 --- a/rocks-db/src/storage_traits.rs +++ b/rocks-db/src/storage_traits.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::{collections::HashSet, fs::File}; use async_trait::async_trait; use mockall::automock; @@ -47,35 +47,37 @@ pub trait AssetUpdateIndexStorage { #[automock] #[async_trait] pub trait AssetIndexReader { - async fn get_fungible_assets_indexes( - &self, - keys: &[Pubkey], - ) -> Result>; + async fn get_fungible_assets_indexes(&self, keys: &[Pubkey]) + -> Result>; - async fn get_nft_asset_indexes<'a>( - &self, - keys: &[Pubkey], - ) -> Result>; + async fn get_nft_asset_indexes<'a>(&self, keys: &[Pubkey]) -> Result>; } #[automock] -#[async_trait] pub trait Dumper { - async fn dump_nft_db( + fn dump_nft_csv( &self, - base_path: &std::path::Path, - batch_size: usize, + assets_file: File, + creators_file: File, + authority_file: File, + metadata_file: File, + buf_capacity: usize, + asset_limit: Option, + start_pubkey: Option, + end_pubkey: Option, rx: &tokio::sync::broadcast::Receiver<()>, synchronizer_metrics: std::sync::Arc, - ) -> core::result::Result<(), String>; + ) -> core::result::Result; - async fn dump_fungible_db( + fn dump_fungible_csv( &self, - base_path: &std::path::Path, - batch_size: usize, + fungible_tokens_file_and_path: (File, String), + buf_capacity: usize, + start_pubkey: Option, + end_pubkey: Option, rx: &tokio::sync::broadcast::Receiver<()>, synchronizer_metrics: std::sync::Arc, - ) -> core::result::Result<(), String>; + ) -> core::result::Result; } pub trait AssetIndexStorage: AssetIndexReader + AssetUpdateIndexStorage + Dumper {} @@ -142,40 +144,57 @@ impl AssetIndexReader for MockAssetIndexStorage { .await } - async fn get_nft_asset_indexes<'a>( - &self, - keys: &[Pubkey], - ) -> Result> { + async fn get_nft_asset_indexes<'a>(&self, keys: &[Pubkey]) -> Result> { self.mock_asset_index_reader .get_nft_asset_indexes(keys) .await } } -#[async_trait] impl Dumper for MockAssetIndexStorage { - async fn dump_nft_db( + fn dump_nft_csv( &self, - base_path: &std::path::Path, - batch_size: usize, + assets_file: File, + creators_file: File, + authority_file: File, + metadata_file: File, + buf_capacity: usize, + asset_limit: Option, + start_pubkey: Option, + end_pubkey: Option, rx: &tokio::sync::broadcast::Receiver<()>, synchronizer_metrics: std::sync::Arc, - ) -> core::result::Result<(), String> { - self.mock_dumper - .dump_nft_db(base_path, batch_size, rx, synchronizer_metrics) - .await + ) -> core::result::Result { + self.mock_dumper.dump_nft_csv( + assets_file, + creators_file, + authority_file, + metadata_file, + buf_capacity, + asset_limit, + start_pubkey, + end_pubkey, + rx, + synchronizer_metrics, + ) } - - async fn dump_fungible_db( + fn dump_fungible_csv( &self, - base_path: &std::path::Path, - batch_size: usize, + fungible_tokens_file_and_path: (File, String), + buf_capacity: usize, + start_pubkey: Option, + end_pubkey: Option, rx: &tokio::sync::broadcast::Receiver<()>, synchronizer_metrics: std::sync::Arc, - ) -> core::result::Result<(), String> { - self.mock_dumper - .dump_fungible_db(base_path, batch_size, rx, synchronizer_metrics) - .await + ) -> core::result::Result { + self.mock_dumper.dump_fungible_csv( + fungible_tokens_file_and_path, + buf_capacity, + start_pubkey, + end_pubkey, + rx, + synchronizer_metrics, + ) } }