From 03d500c9b68f7afc031943dc8a28ebee7ed0dfbc Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Jan 2024 04:36:14 +0200 Subject: [PATCH 01/17] feat: grpc client --- Cargo.lock | 1 + grpc/Cargo.toml | 3 +- grpc/src/client.rs | 49 +++++ grpc/src/lib.rs | 1 + grpc/src/mapper.rs | 207 ++++++++++++++++++ .../src/asset_streaming_and_discovery.rs | 17 +- nft_ingester/src/bin/ingester/main.rs | 24 ++ nft_ingester/src/config.rs | 4 +- nft_ingester/src/gapfiller.rs | 29 ++- nft_ingester/tests/gapfiller_tests.rs | 26 ++- 10 files changed, 346 insertions(+), 15 deletions(-) create mode 100644 grpc/src/client.rs diff --git a/Cargo.lock b/Cargo.lock index 3b514f48b..3a5908715 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2049,6 +2049,7 @@ dependencies = [ name = "grpc" version = "0.1.0" dependencies = [ + "async-trait", "entities", "futures", "interface", diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index 3c5796132..fec0b358c 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -12,9 +12,10 @@ futures = "0.3.29" solana-sdk = { version = "~1.14.14" } interface = { path = "../interface" } entities = { path = "../entities" } +tokio = { version = "1.35.1", features = ["full"] } +async-trait = "0.1.74" [dev-dependencies] -tokio = { version = "1.35.1", features = ["full"] } mockall = "0.12.0" [build-dependencies] diff --git a/grpc/src/client.rs b/grpc/src/client.rs new file mode 100644 index 000000000..dcf85e90b --- /dev/null +++ b/grpc/src/client.rs @@ -0,0 +1,49 @@ +use crate::gapfiller::gap_filler_service_client::GapFillerServiceClient; +use crate::gapfiller::RangeRequest; +use async_trait::async_trait; +use futures::StreamExt; +use interface::asset_streaming_and_discovery::{ + AssetDetailsConsumer, AssetDetailsStreamNonSync, AsyncError, PeerDiscovery, +}; +use tonic::transport::{Channel, Error}; + +pub struct Client { + inner: GapFillerServiceClient, +} + +impl Client { + pub async fn new(peer_discovery: impl PeerDiscovery) -> Result { + let channel = Channel::from_static(peer_discovery.get_gapfiller_peer_addr()) + .connect() + .await?; + + Ok(Self { + inner: GapFillerServiceClient::new(channel), + }) + } +} + +#[async_trait] +impl AssetDetailsConsumer for Client { + async fn get_consumable_stream_in_range( + &mut self, + start_slot: u64, + end_slot: u64, + ) -> Result { + Ok(Box::pin( + self.inner + .get_assets_updated_within(RangeRequest { + start_slot, + end_slot, + }) + .await + .map_err(|e| Box::new(e) as AsyncError)? + .into_inner() + .map(|stream| { + stream + .map(|asset_details| asset_details.into()) + .map_err(|e| Box::new(e) as AsyncError) + }), + )) + } +} diff --git a/grpc/src/lib.rs b/grpc/src/lib.rs index b50735ba6..b66378937 100644 --- a/grpc/src/lib.rs +++ b/grpc/src/lib.rs @@ -1,3 +1,4 @@ +pub mod client; pub mod gapfiller; mod mapper; pub mod service; diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 56f87ef0b..6b82c63a1 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -5,6 +5,7 @@ use crate::gapfiller::{ SpecificationVersions, TokenStandard, UseMethod, Uses, }; use entities::models::{CompleteAssetDetails, Updated}; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; impl From for AssetDetails { fn from(value: CompleteAssetDetails) -> Self { @@ -39,6 +40,41 @@ impl From for AssetDetails { } } +impl From for CompleteAssetDetails { + fn from(value: AssetDetails) -> Self { + Self { + pubkey: Pubkey::try_from(value.pubkey).unwrap_or_default(), + specification_asset_class: entities::enums::SpecificationAssetClass::from( + SpecificationAssetClass::try_from(value.specification_asset_class) + .unwrap_or_default(), + ), + royalty_target_type: entities::enums::RoyaltyTargetType::from( + RoyaltyTargetType::try_from(value.royalty_target_type).unwrap_or_default(), + ), + slot_created: value.slot_created, + is_compressible: value.is_compressible.map(Into::into).unwrap_or_default(), + is_compressed: value.is_compressed.map(Into::into).unwrap_or_default(), + is_frozen: value.is_frozen.map(Into::into).unwrap_or_default(), + supply: value.supply.map(Into::into), + seq: value.seq.map(Into::into), + is_burnt: value.is_burnt.map(Into::into).unwrap_or_default(), + was_decompressed: value.was_decompressed.map(Into::into).unwrap_or_default(), + creators: value.creators.map(Into::into).unwrap_or_default(), + royalty_amount: value.royalty_amount.map(Into::into).unwrap_or_default(), + authority: value.authority.map(Into::into).unwrap_or_default(), + owner: value.owner.map(Into::into).unwrap_or_default(), + delegate: value.delegate.map(Into::into), + owner_type: value.owner_type.map(Into::into).unwrap_or_default(), + owner_delegate_seq: value.owner_delegate_seq.map(Into::into), + asset_leaf: value.asset_leaf.map(Into::into), + collection: value.collection.map(Into::into), + onchain_data: value.chain_data.map(Into::into), + cl_leaf: value.cl_leaf.map(Into::into), + cl_items: value.cl_items.into_iter().map(Into::into).collect(), + } + } +} + impl From> for DynamicBoolField { fn from(value: Updated) -> Self { Self { @@ -123,6 +159,98 @@ impl From> for AssetLeaf { } } +impl From for Updated { + fn from(value: DynamicBoolField) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for Updated { + fn from(value: DynamicUint64Field) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for Updated { + fn from(value: DynamicUint32Field) -> Self { + Self { + value: value.value as u16, + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for Updated { + fn from(value: DynamicBytesField) -> Self { + Self { + value: Pubkey::try_from(value.value).unwrap_or_default(), + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for Updated { + fn from(value: DynamicEnumField) -> Self { + Self { + value: entities::enums::OwnerType::from( + OwnerType::try_from(value.value).unwrap_or_default(), + ), + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for entities::models::Creator { + fn from(value: Creator) -> Self { + Self { + creator: Pubkey::try_from(value.creator).unwrap_or_default(), + creator_verified: value.creator_verified, + creator_share: value.creator_share as u8, + } + } +} +impl From for Updated> { + fn from(value: DynamicCreatorsField) -> Self { + Self { + value: value.creators.into_iter().map(|v| v.into()).collect(), + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for Updated { + fn from(value: AssetLeaf) -> Self { + Self { + slot_updated: value.slot_updated, + seq: value.seq_updated, + value: entities::models::AssetLeaf { + tree_id: Pubkey::try_from(value.tree_id).unwrap_or_default(), + leaf: value.leaf.clone(), + nonce: value.nonce, + data_hash: value + .data_hash + .map(|h| Hash::from(<[u8; 32]>::try_from(h).unwrap_or_default())), + creator_hash: value + .creator_hash + .map(|h| Hash::from(<[u8; 32]>::try_from(h).unwrap_or_default())), + leaf_seq: value.leaf_seq, + }, + } + } +} + impl From for ClLeaf { fn from(value: entities::models::ClLeaf) -> Self { Self { @@ -187,6 +315,75 @@ impl From for Uses { } } } + +impl From for entities::models::ClLeaf { + fn from(value: ClLeaf) -> Self { + Self { + cli_leaf_idx: value.cli_leaf_idx, + cli_tree_key: Pubkey::try_from(value.cli_tree_key).unwrap_or_default(), + cli_node_idx: value.cli_node_idx, + } + } +} + +impl From for entities::models::ClItem { + fn from(value: ClItem) -> Self { + Self { + cli_leaf_idx: value.cli_leaf_idx, + cli_seq: value.cli_seq, + cli_level: value.cli_level, + cli_hash: value.cli_hash, + cli_tree_key: Pubkey::try_from(value.cli_tree_key).unwrap_or_default(), + cli_node_idx: value.cli_node_idx, + slot_updated: value.slot_updated, + } + } +} + +impl From for Updated { + fn from(value: AssetCollection) -> Self { + Self { + value: entities::models::AssetCollection { + collection: Pubkey::try_from(value.collection).unwrap_or_default(), + is_collection_verified: value.is_collection_verified, + collection_seq: value.collection_seq, + }, + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for Updated { + fn from(value: ChainDataV1) -> Self { + Self { + value: entities::models::ChainDataV1 { + name: value.name.clone(), + symbol: value.symbol.clone(), + edition_nonce: value.edition_nonce.map(|v| v as u8), + primary_sale_happened: value.primary_sale_happened, + token_standard: Some(entities::enums::TokenStandard::from( + TokenStandard::try_from(value.token_standard).unwrap_or_default(), + )), + uses: value.uses.map(|v| v.into()), + }, + slot_updated: value.slot_updated, + seq: value.seq_updated, + } + } +} + +impl From for entities::models::Uses { + fn from(value: Uses) -> Self { + Self { + use_method: entities::enums::UseMethod::from( + UseMethod::try_from(value.use_method).unwrap_or_default(), + ), + remaining: value.remaining, + total: value.total, + } + } +} macro_rules! impl_from_enum { ($src:ty, $dst:ident, $($variant:ident),*) => { impl From<$src> for $dst { @@ -198,6 +395,16 @@ macro_rules! impl_from_enum { } } } + + impl From<$dst> for $src { + fn from(value: $dst) -> Self { + match value { + $( + $dst::$variant => <$src>::$variant, + )* + } + } + } }; } diff --git a/interface/src/asset_streaming_and_discovery.rs b/interface/src/asset_streaming_and_discovery.rs index c1620e0fd..fe877dca3 100644 --- a/interface/src/asset_streaming_and_discovery.rs +++ b/interface/src/asset_streaming_and_discovery.rs @@ -5,8 +5,9 @@ use mockall::automock; use std::pin::Pin; pub type AsyncError = Box; -pub type AssetDetailsStream = - Pin> + Send + Sync>>; +type AssetResult = Result; +pub type AssetDetailsStream = Pin + Send + Sync>>; +pub type AssetDetailsStreamNonSync = Pin + Send>>; #[automock] #[async_trait] @@ -18,7 +19,17 @@ pub trait AssetDetailsStreamer: Send + Sync { ) -> Result; } +#[automock] +#[async_trait] +pub trait AssetDetailsConsumer: Send { + async fn get_consumable_stream_in_range( + &mut self, + start_slot: u64, + end_slot: u64, + ) -> Result; +} + #[automock] pub trait PeerDiscovery: Send + Sync { - fn get_gapfiller_peer_addr(&self) -> String; + fn get_gapfiller_peer_addr(&self) -> &'static str; } diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 84498338f..47c58e62d 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -9,6 +9,7 @@ use tokio::sync::oneshot; use tokio::sync::Mutex; use tokio::task::JoinSet; +use grpc::client::Client; use metrics_utils::utils::setup_metrics; use metrics_utils::{ ApiMetricsConfig, BackfillerMetricsConfig, IngesterMetricsConfig, JsonDownloaderMetricsConfig, @@ -19,6 +20,7 @@ use nft_ingester::bubblegum_updates_processor::BubblegumTxProcessor; use nft_ingester::buffer::Buffer; use nft_ingester::config::{setup_config, BackfillerConfig, IngesterConfig, INGESTER_BACKUP_NAME}; use nft_ingester::db_v2::DBClient as DBClientV2; +use nft_ingester::gapfiller::process_asset_details_stream; use nft_ingester::index_syncronizer::Synchronizer; use nft_ingester::init::graceful_stop; use nft_ingester::json_downloader::JsonDownloader; @@ -350,6 +352,28 @@ pub async fn main() -> Result<(), IngesterError> { Ok(()) }); + match Client::new(config).await { + Ok(gaped_data_client) => { + let cloned_keep_running = keep_running.clone(); + let cloned_rocks_storage = rocks_storage.clone(); + mutexed_tasks.lock().await.spawn(async move { + info!( + "Processed {} gaped slots", + process_asset_details_stream( + cloned_keep_running, + cloned_rocks_storage, + newest_restored_slot, + first_processed_slot.load(Ordering::SeqCst), + gaped_data_client, + ) + .await + ); + Ok(()) + }); + } + Err(e) => error!("GRPC Client new: {}", e), + }; + // --stop graceful_stop(mutexed_tasks, true, keep_running.clone(), shutdown_tx).await; diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 9736b6440..3c3e0c61f 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -358,8 +358,8 @@ impl TcpConfig { } impl PeerDiscovery for IngesterConfig { - fn get_gapfiller_peer_addr(&self) -> String { - self.gapfiller_peer_addr.clone() + fn get_gapfiller_peer_addr(&self) -> &'static str { + Box::leak(self.gapfiller_peer_addr.clone().into_boxed_str()) } } diff --git a/nft_ingester/src/gapfiller.rs b/nft_ingester/src/gapfiller.rs index 822bb50e8..b6ce4a780 100644 --- a/nft_ingester/src/gapfiller.rs +++ b/nft_ingester/src/gapfiller.rs @@ -2,27 +2,52 @@ use crate::error::IngesterError; use entities::models::CompleteAssetDetails; use entities::models::Updated; use futures::StreamExt; -use interface::asset_streaming_and_discovery::AssetDetailsStream; +use interface::asset_streaming_and_discovery::AssetDetailsConsumer; use log::error; use rocks_db::asset::{AssetCollection, AssetLeaf}; use rocks_db::cl_items::{ClItem, ClLeaf}; use rocks_db::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage}; use serde_json::json; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -pub async fn process_asset_details_stream(storage: Arc, mut stream: AssetDetailsStream) { +pub async fn process_asset_details_stream( + keep_running: Arc, + storage: Arc, + start_slot: u64, + end_slot: u64, + mut consumer: impl AssetDetailsConsumer, +) -> u64 { + let mut stream = match consumer + .get_consumable_stream_in_range(start_slot, end_slot) + .await + { + Ok(stream) => stream, + Err(e) => { + error!("Error consume asset details stream in range: {}", e); + return 0; + } + }; + + let mut processed_slots = 0; while let Some(result) = stream.next().await { + if !keep_running.load(Ordering::SeqCst) { + break; + } match result { Ok(details) => { if let Some(e) = insert_gaped_data(storage.clone(), details).err() { error!("Error processing gaped data: {}", e) } + processed_slots += 1; } Err(e) => { error!("Error processing stream item: {}", e); } } } + + processed_slots } pub fn insert_gaped_data( diff --git a/nft_ingester/tests/gapfiller_tests.rs b/nft_ingester/tests/gapfiller_tests.rs index 66331c8d4..9f24ef654 100644 --- a/nft_ingester/tests/gapfiller_tests.rs +++ b/nft_ingester/tests/gapfiller_tests.rs @@ -1,8 +1,9 @@ use entities::models::{CompleteAssetDetails, Updated}; use futures::stream; -use interface::asset_streaming_and_discovery::AsyncError; +use interface::asset_streaming_and_discovery::{AsyncError, MockAssetDetailsConsumer}; use nft_ingester::gapfiller::process_asset_details_stream; use solana_sdk::pubkey::Pubkey; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use tempfile::TempDir; use tokio::{sync::Mutex, task::JoinSet}; @@ -34,13 +35,24 @@ async fn test_process_asset_details_stream() { let details1 = create_test_complete_asset_details(first_key.clone()); let details2 = create_test_complete_asset_details(second_key.clone()); - let stream = stream::iter(vec![ - Ok(details1), - Ok(details2), - Err(AsyncError::from("test error")), - ]); + let mut mock = MockAssetDetailsConsumer::new(); + mock.expect_consume_asset_details_stream_in_range() + .returning(move |_, _| { + Ok(Box::pin(stream::iter(vec![ + Ok(details1.clone()), + Ok(details2.clone()), + Err(AsyncError::from("test error")), + ]))) + }); - process_asset_details_stream(storage.clone(), Box::pin(stream)).await; + process_asset_details_stream( + Arc::new(AtomicBool::new(true)), + storage.clone(), + 100, + 200, + mock, + ) + .await; let selected_data = storage .asset_dynamic_data From e15bfb9686c187cf9f2e2ee3ad0150818cd84b06 Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Jan 2024 04:46:18 +0200 Subject: [PATCH 02/17] add some validation for slot range --- nft_ingester/src/bin/ingester/main.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 47c58e62d..8d917803c 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -148,7 +148,7 @@ pub async fn main() -> Result<(), IngesterError> { .unwrap(); let rocks_storage = Arc::new(storage); - let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap_or(0); + let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap(); // panic if we do not have any slots in DB // start backup service let backup_cfg = backup_service::load_config()?; @@ -354,6 +354,10 @@ pub async fn main() -> Result<(), IngesterError> { match Client::new(config).await { Ok(gaped_data_client) => { + while first_processed_slot.load(Ordering::SeqCst) == 0 { + tokio::time::sleep(Duration::from_millis(100)).await + } + let cloned_keep_running = keep_running.clone(); let cloned_rocks_storage = rocks_storage.clone(); mutexed_tasks.lock().await.spawn(async move { From 4c3c6d9608a4917f5b8208d6545fe80a67a746a2 Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Jan 2024 11:01:05 +0200 Subject: [PATCH 03/17] fix test --- nft_ingester/tests/gapfiller_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nft_ingester/tests/gapfiller_tests.rs b/nft_ingester/tests/gapfiller_tests.rs index 9f24ef654..99bba2c29 100644 --- a/nft_ingester/tests/gapfiller_tests.rs +++ b/nft_ingester/tests/gapfiller_tests.rs @@ -36,7 +36,7 @@ async fn test_process_asset_details_stream() { let details2 = create_test_complete_asset_details(second_key.clone()); let mut mock = MockAssetDetailsConsumer::new(); - mock.expect_consume_asset_details_stream_in_range() + mock.expect_get_consumable_stream_in_range() .returning(move |_, _| { Ok(Box::pin(stream::iter(vec![ Ok(details1.clone()), From c5158fb2bc40317973ddcb9139c34ad91ef8944a Mon Sep 17 00:00:00 2001 From: requesco Date: Tue, 9 Jan 2024 02:37:28 +0200 Subject: [PATCH 04/17] comments --- grpc/src/client.rs | 10 ++++++---- interface/src/asset_streaming_and_discovery.rs | 2 +- nft_ingester/src/bin/ingester/main.rs | 4 ++-- nft_ingester/src/config.rs | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/grpc/src/client.rs b/grpc/src/client.rs index dcf85e90b..4181e127a 100644 --- a/grpc/src/client.rs +++ b/grpc/src/client.rs @@ -12,10 +12,12 @@ pub struct Client { } impl Client { - pub async fn new(peer_discovery: impl PeerDiscovery) -> Result { - let channel = Channel::from_static(peer_discovery.get_gapfiller_peer_addr()) - .connect() - .await?; + pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result { + let channel = Channel::from_static(Box::leak( + peer_discovery.get_gapfiller_peer_addr().into_boxed_str(), + )) + .connect() + .await?; Ok(Self { inner: GapFillerServiceClient::new(channel), diff --git a/interface/src/asset_streaming_and_discovery.rs b/interface/src/asset_streaming_and_discovery.rs index fe877dca3..fd6150a54 100644 --- a/interface/src/asset_streaming_and_discovery.rs +++ b/interface/src/asset_streaming_and_discovery.rs @@ -31,5 +31,5 @@ pub trait AssetDetailsConsumer: Send { #[automock] pub trait PeerDiscovery: Send + Sync { - fn get_gapfiller_peer_addr(&self) -> &'static str; + fn get_gapfiller_peer_addr(&self) -> String; } diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 8d917803c..40d2aa1bd 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -148,7 +148,7 @@ pub async fn main() -> Result<(), IngesterError> { .unwrap(); let rocks_storage = Arc::new(storage); - let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap(); // panic if we do not have any slots in DB + let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap_or_default(); // TODO: change to unwrap when we will have all gapfill logic implemented // start backup service let backup_cfg = backup_service::load_config()?; @@ -352,7 +352,7 @@ pub async fn main() -> Result<(), IngesterError> { Ok(()) }); - match Client::new(config).await { + match Client::connect(config).await { Ok(gaped_data_client) => { while first_processed_slot.load(Ordering::SeqCst) == 0 { tokio::time::sleep(Duration::from_millis(100)).await diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 3c3e0c61f..9736b6440 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -358,8 +358,8 @@ impl TcpConfig { } impl PeerDiscovery for IngesterConfig { - fn get_gapfiller_peer_addr(&self) -> &'static str { - Box::leak(self.gapfiller_peer_addr.clone().into_boxed_str()) + fn get_gapfiller_peer_addr(&self) -> String { + self.gapfiller_peer_addr.clone() } } From d14c7e81bf7b5d7854e54ea7dd5568e0efc0e0c9 Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Apr 2024 10:59:09 +0300 Subject: [PATCH 05/17] merge --- grpc/proto/gap_filler.proto | 32 ++-- grpc/src/client.rs | 7 +- grpc/src/gapfiller.rs | 56 +++++-- grpc/src/mapper.rs | 273 +++++++++++++++++++++++++--------- nft_ingester/src/gapfiller.rs | 29 +++- 5 files changed, 305 insertions(+), 92 deletions(-) diff --git a/grpc/proto/gap_filler.proto b/grpc/proto/gap_filler.proto index cdfd8aa1d..838b35a76 100644 --- a/grpc/proto/gap_filler.proto +++ b/grpc/proto/gap_filler.proto @@ -58,6 +58,16 @@ enum UseMethod { SINGLE = 2; } +enum UpdateVersion { + SEQUENCE = 0; + WRITE_VERSION = 1; +} + +message UpdateVersionValue { + UpdateVersion type = 1; + uint64 value = 2; +} + message Uses { UseMethod use_method = 1; uint64 remaining = 2; @@ -71,7 +81,7 @@ message ChainDataV1 { bool primary_sale_happened = 4; TokenStandard token_standard = 5; Uses uses = 6; - google.protobuf.UInt64Value seq_updated = 7; + UpdateVersionValue update_version = 7; uint64 slot_updated = 8; } @@ -82,7 +92,7 @@ message AssetLeaf { google.protobuf.BytesValue data_hash = 4; google.protobuf.BytesValue creator_hash = 5; google.protobuf.UInt64Value leaf_seq = 6; - google.protobuf.UInt64Value seq_updated = 7; + UpdateVersionValue update_version = 7; uint64 slot_updated = 8; } @@ -90,7 +100,7 @@ message AssetCollection { bytes collection = 1; bool is_collection_verified = 2; google.protobuf.UInt64Value collection_seq = 3; - google.protobuf.UInt64Value seq_updated = 4; + UpdateVersionValue update_version = 4; uint64 slot_updated = 5; } @@ -143,49 +153,49 @@ message AssetDetails { // Dynamic field messages message DynamicBoolField { bool value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicUint64Field { uint64 value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicUint32Field { uint32 value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicBytesField { bytes value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicStringField { string value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicChainMutability { ChainMutability value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicEnumField { OwnerType value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicCreatorsField { repeated Creator creators = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } diff --git a/grpc/src/client.rs b/grpc/src/client.rs index 4181e127a..2b1c44592 100644 --- a/grpc/src/client.rs +++ b/grpc/src/client.rs @@ -6,6 +6,7 @@ use interface::asset_streaming_and_discovery::{ AssetDetailsConsumer, AssetDetailsStreamNonSync, AsyncError, PeerDiscovery, }; use tonic::transport::{Channel, Error}; +use tonic::{Code, Status}; pub struct Client { inner: GapFillerServiceClient, @@ -43,7 +44,11 @@ impl AssetDetailsConsumer for Client { .into_inner() .map(|stream| { stream - .map(|asset_details| asset_details.into()) + .and_then(|asset_details| { + asset_details + .try_into() + .map_err(|e| Status::new(Code::Internal, e)) + }) .map_err(|e| Box::new(e) as AsyncError) }), )) diff --git a/grpc/src/gapfiller.rs b/grpc/src/gapfiller.rs index 91677dfd5..f0dc0aa79 100644 --- a/grpc/src/gapfiller.rs +++ b/grpc/src/gapfiller.rs @@ -1,5 +1,13 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct UpdateVersionValue { + #[prost(enumeration = "UpdateVersion", tag = "1")] + pub r#type: i32, + #[prost(uint64, tag = "2")] + pub value: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct Uses { #[prost(enumeration = "UseMethod", tag = "1")] pub use_method: i32, @@ -25,7 +33,7 @@ pub struct ChainDataV1 { #[prost(message, optional, tag = "6")] pub uses: ::core::option::Option, #[prost(message, optional, tag = "7")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "8")] pub slot_updated: u64, } @@ -45,7 +53,7 @@ pub struct AssetLeaf { #[prost(message, optional, tag = "6")] pub leaf_seq: ::core::option::Option, #[prost(message, optional, tag = "7")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "8")] pub slot_updated: u64, } @@ -59,7 +67,7 @@ pub struct AssetCollection { #[prost(message, optional, tag = "3")] pub collection_seq: ::core::option::Option, #[prost(message, optional, tag = "4")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "5")] pub slot_updated: u64, } @@ -148,7 +156,7 @@ pub struct DynamicBoolField { #[prost(bool, tag = "1")] pub value: bool, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -158,7 +166,7 @@ pub struct DynamicUint64Field { #[prost(uint64, tag = "1")] pub value: u64, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -168,7 +176,7 @@ pub struct DynamicUint32Field { #[prost(uint32, tag = "1")] pub value: u32, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -178,7 +186,7 @@ pub struct DynamicBytesField { #[prost(bytes = "vec", tag = "1")] pub value: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -188,7 +196,7 @@ pub struct DynamicStringField { #[prost(string, tag = "1")] pub value: ::prost::alloc::string::String, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -198,7 +206,7 @@ pub struct DynamicChainMutability { #[prost(enumeration = "ChainMutability", tag = "1")] pub value: i32, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -208,7 +216,7 @@ pub struct DynamicEnumField { #[prost(enumeration = "OwnerType", tag = "1")] pub value: i32, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -218,7 +226,7 @@ pub struct DynamicCreatorsField { #[prost(message, repeated, tag = "1")] pub creators: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -535,6 +543,32 @@ impl UseMethod { } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum UpdateVersion { + Sequence = 0, + WriteVersion = 1, +} +impl UpdateVersion { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + UpdateVersion::Sequence => "SEQUENCE", + UpdateVersion::WriteVersion => "WRITE_VERSION", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SEQUENCE" => Some(Self::Sequence), + "WRITE_VERSION" => Some(Self::WriteVersion), + _ => None, + } + } +} /// Generated client implementations. pub mod gap_filler_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 033352780..5aeb61427 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -3,9 +3,9 @@ use crate::gapfiller::{ Creator, DynamicBoolField, DynamicBytesField, DynamicChainMutability, DynamicCreatorsField, DynamicEnumField, DynamicStringField, DynamicUint32Field, DynamicUint64Field, EditionV1, MasterEdition, OwnerType, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions, - TokenStandard, UseMethod, Uses, + TokenStandard, UpdateVersionValue, UseMethod, Uses, }; -use entities::models::{CompleteAssetDetails, Updated}; +use entities::models::{CompleteAssetDetails, UpdateVersion, Updated}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; impl From for AssetDetails { @@ -13,12 +13,12 @@ impl From for AssetDetails { let delegate = value.delegate.value.map(|key| DynamicBytesField { value: key.to_bytes().to_vec(), slot_updated: value.delegate.slot_updated, - seq_updated: value.delegate.get_upd_ver_seq(), + update_version: value.delegate.update_version.map(Into::into), }); let owner = value.owner.value.map(|key| DynamicBytesField { value: key.to_bytes().to_vec(), slot_updated: value.owner.slot_updated, - seq_updated: value.owner.get_upd_ver_seq(), + update_version: value.owner.update_version.map(Into::into), }); let owner_delegate_seq = value @@ -27,7 +27,7 @@ impl From for AssetDetails { .map(|seq| DynamicUint64Field { value: seq, slot_updated: value.owner_delegate_seq.slot_updated, - seq_updated: value.owner_delegate_seq.get_upd_ver_seq(), + update_version: value.owner_delegate_seq.update_version.map(Into::into), }); Self { @@ -68,9 +68,22 @@ impl From for AssetDetails { } } -impl From for CompleteAssetDetails { - fn from(value: AssetDetails) -> Self { - Self { +impl TryFrom for CompleteAssetDetails { + type Error = String; + + fn try_from(value: AssetDetails) -> Result { + let owner_delegate_seq = value + .owner_delegate_seq + .map(|val| { + Updated::new( + val.slot_updated, + val.update_version.map(Into::into), + Some(val.value), + ) + }) + .unwrap_or_default(); + + Ok(Self { pubkey: Pubkey::try_from(value.pubkey).unwrap_or_default(), specification_asset_class: entities::enums::SpecificationAssetClass::from( SpecificationAssetClass::try_from(value.specification_asset_class) @@ -80,6 +93,11 @@ impl From for CompleteAssetDetails { RoyaltyTargetType::try_from(value.royalty_target_type).unwrap_or_default(), ), slot_created: value.slot_created, + edition_address: value + .edition_address + .map(TryInto::try_into) + .transpose() + .map_err(|e| format!("{:?}", e))?, is_compressible: value.is_compressible.map(Into::into).unwrap_or_default(), is_compressed: value.is_compressed.map(Into::into).unwrap_or_default(), is_frozen: value.is_frozen.map(Into::into).unwrap_or_default(), @@ -89,17 +107,64 @@ impl From for CompleteAssetDetails { was_decompressed: value.was_decompressed.map(Into::into).unwrap_or_default(), creators: value.creators.map(Into::into).unwrap_or_default(), royalty_amount: value.royalty_amount.map(Into::into).unwrap_or_default(), - authority: value.authority.map(Into::into).unwrap_or_default(), - owner: value.owner.map(Into::into).unwrap_or_default(), - delegate: value.delegate.map(Into::into), + url: Default::default(), + chain_mutability: None, + lamports: value.lamports.map(Into::into), + executable: value.executable.map(Into::into), + metadata_owner: value.metadata_owner.map(Into::into), + authority: value + .authority + .map(TryInto::try_into) + .transpose()? + .unwrap_or_default(), + owner: value + .owner + .map(TryInto::try_into) + .transpose()? + .unwrap_or_default(), + delegate: value + .delegate + .map(TryInto::try_into) + .transpose()? + .unwrap_or_default(), owner_type: value.owner_type.map(Into::into).unwrap_or_default(), - owner_delegate_seq: value.owner_delegate_seq.map(Into::into), - asset_leaf: value.asset_leaf.map(Into::into), - collection: value.collection.map(Into::into), + owner_delegate_seq, + asset_leaf: value.asset_leaf.map(TryInto::try_into).transpose()?, + collection: value.collection.map(TryInto::try_into).transpose()?, onchain_data: value.chain_data.map(Into::into), - cl_leaf: value.cl_leaf.map(Into::into), - cl_items: value.cl_items.into_iter().map(Into::into).collect(), + cl_leaf: value.cl_leaf.map(TryInto::try_into).transpose()?, + cl_items: value + .cl_items + .into_iter() + .flat_map(TryInto::try_into) + .collect(), + edition: value.edition.map(TryInto::try_into).transpose()?, + master_edition: value.master_edition.map(TryInto::try_into).transpose()?, + }) + } +} + +impl From for UpdateVersionValue { + fn from(value: UpdateVersion) -> Self { + match value { + UpdateVersion::Sequence(seq) => Self { + r#type: crate::gapfiller::UpdateVersion::Sequence.into(), + value: seq, + }, + UpdateVersion::WriteVersion(wv) => Self { + r#type: crate::gapfiller::UpdateVersion::WriteVersion.into(), + value: wv, + }, + } + } +} + +impl From for UpdateVersion { + fn from(value: UpdateVersionValue) -> Self { + if value.r#type == crate::gapfiller::UpdateVersion::Sequence as i32 { + return Self::Sequence(value.value); } + Self::WriteVersion(value.value) } } @@ -108,7 +173,7 @@ impl From> for DynamicBoolField { Self { value: value.value, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -118,7 +183,7 @@ impl From> for DynamicUint64Field { Self { value: value.value, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -128,7 +193,7 @@ impl From> for DynamicStringField { Self { value: value.clone().value, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -138,7 +203,7 @@ impl From> for DynamicUint32Field { Self { value: value.value as u32, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -148,7 +213,7 @@ impl From> for DynamicBytesField { Self { value: value.value.to_bytes().to_vec(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -158,7 +223,7 @@ impl From> for DynamicEnumField { Self { value: OwnerType::from(value.value).into(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -168,7 +233,7 @@ impl From> for DynamicChainMutability Self { value: ChainMutability::from(value.value).into(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -187,7 +252,7 @@ impl From>> for DynamicCreatorsField { Self { creators: value.value.iter().map(|v| v.into()).collect(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -202,7 +267,7 @@ impl From> for AssetLeaf { creator_hash: value.value.creator_hash.map(|h| h.to_bytes().to_vec()), leaf_seq: value.value.leaf_seq, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -212,7 +277,7 @@ impl From for Updated { Self { value: value.value, slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), } } } @@ -222,7 +287,7 @@ impl From for Updated { Self { value: value.value, slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), } } } @@ -232,21 +297,46 @@ impl From for Updated { Self { value: value.value as u16, slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), } } } -impl From for Updated { - fn from(value: DynamicBytesField) -> Self { +impl From for Updated { + fn from(value: DynamicStringField) -> Self { Self { - value: Pubkey::try_from(value.value).unwrap_or_default(), + value: value.clone().value, slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), } } } +impl TryFrom for Updated> { + type Error = String; + + fn try_from(value: DynamicBytesField) -> Result { + Ok(Self::new( + value.slot_updated, + value.update_version.map(Into::into), + Some(Pubkey::try_from(value.value).map_err(|e| format!("{:?}", e))?), + )) + } +} + +impl TryFrom for Updated { + type Error = String; + + fn try_from(value: DynamicBytesField) -> Result { + Ok(Self { + value: Pubkey::try_from(value.value).map_err(|e| format!("{:?}", e))?, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + }) + } +} + +// TODO impl From for Updated { fn from(value: DynamicEnumField) -> Self { Self { @@ -254,48 +344,66 @@ impl From for Updated { OwnerType::try_from(value.value).unwrap_or_default(), ), slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), } } } -impl From for entities::models::Creator { - fn from(value: Creator) -> Self { - Self { - creator: Pubkey::try_from(value.creator).unwrap_or_default(), +impl TryFrom for entities::models::Creator { + type Error = String; + + fn try_from(value: Creator) -> Result { + Ok(Self { + creator: Pubkey::try_from(value.creator).map_err(|e| format!("{:?}", e))?, creator_verified: value.creator_verified, creator_share: value.creator_share as u8, - } + }) } } impl From for Updated> { fn from(value: DynamicCreatorsField) -> Self { Self { - value: value.creators.into_iter().map(|v| v.into()).collect(), + value: value + .creators + .into_iter() + .flat_map(TryInto::try_into) + .collect(), slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), } } } -impl From for Updated { - fn from(value: AssetLeaf) -> Self { - Self { +impl TryFrom for Updated { + type Error = String; + + fn try_from(value: AssetLeaf) -> Result { + Ok(Self { slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), value: entities::models::AssetLeaf { - tree_id: Pubkey::try_from(value.tree_id).unwrap_or_default(), + tree_id: Pubkey::try_from(value.tree_id).map_err(|e| format!("{:?}", e))?, leaf: value.leaf.clone(), nonce: value.nonce, data_hash: value .data_hash - .map(|h| Hash::from(<[u8; 32]>::try_from(h).unwrap_or_default())), + .map(|h| { + Ok::<_, String>(Hash::from( + <[u8; 32]>::try_from(h).map_err(|e| format!("{:?}", e))?, + )) + }) + .transpose()?, creator_hash: value .creator_hash - .map(|h| Hash::from(<[u8; 32]>::try_from(h).unwrap_or_default())), + .map(|h| { + Ok::<_, String>(Hash::from( + <[u8; 32]>::try_from(h).map_err(|e| format!("{:?}", e))?, + )) + }) + .transpose()?, leaf_seq: value.leaf_seq, }, - } + }) } } @@ -330,7 +438,7 @@ impl From> for AssetCollection { is_collection_verified: value.value.is_collection_verified, collection_seq: value.value.collection_seq, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -350,7 +458,7 @@ impl From> for ChainDataV1 { .unwrap_or_default(), uses: value.clone().value.uses.map(|v| v.into()), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -387,42 +495,73 @@ impl From for EditionV1 { } } +impl TryFrom for entities::models::MasterEdition { + type Error = String; -impl From for entities::models::ClLeaf { - fn from(value: ClLeaf) -> Self { - Self { + fn try_from(value: MasterEdition) -> Result { + Ok(Self { + key: Pubkey::try_from(value.key).map_err(|e| format!("{:?}", e))?, + supply: value.supply, + max_supply: value.max_supply, + write_version: value.write_version, + }) + } +} + +impl TryFrom for entities::models::EditionV1 { + type Error = String; + + fn try_from(value: EditionV1) -> Result { + Ok(Self { + key: Pubkey::try_from(value.key).map_err(|e| format!("{:?}", e))?, + parent: Pubkey::try_from(value.parent).map_err(|e| format!("{:?}", e))?, + edition: value.edition, + write_version: value.write_version, + }) + } +} + +impl TryFrom for entities::models::ClLeaf { + type Error = String; + + fn try_from(value: ClLeaf) -> Result { + Ok(Self { cli_leaf_idx: value.cli_leaf_idx, - cli_tree_key: Pubkey::try_from(value.cli_tree_key).unwrap_or_default(), + cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(|e| format!("{:?}", e))?, cli_node_idx: value.cli_node_idx, - } + }) } } -impl From for entities::models::ClItem { - fn from(value: ClItem) -> Self { - Self { +impl TryFrom for entities::models::ClItem { + type Error = String; + + fn try_from(value: ClItem) -> Result { + Ok(Self { cli_leaf_idx: value.cli_leaf_idx, cli_seq: value.cli_seq, cli_level: value.cli_level, cli_hash: value.cli_hash, - cli_tree_key: Pubkey::try_from(value.cli_tree_key).unwrap_or_default(), + cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(|e| format!("{:?}", e))?, cli_node_idx: value.cli_node_idx, slot_updated: value.slot_updated, - } + }) } } -impl From for Updated { - fn from(value: AssetCollection) -> Self { - Self { +impl TryFrom for Updated { + type Error = String; + + fn try_from(value: AssetCollection) -> Result { + Ok(Self { value: entities::models::AssetCollection { - collection: Pubkey::try_from(value.collection).unwrap_or_default(), + collection: Pubkey::try_from(value.collection).map_err(|e| format!("{:?}", e))?, is_collection_verified: value.is_collection_verified, collection_seq: value.collection_seq, }, slot_updated: value.slot_updated, - seq: value.seq_updated, - } + update_version: value.update_version.map(Into::into), + }) } } @@ -440,7 +579,7 @@ impl From for Updated { uses: value.uses.map(|v| v.into()), }, slot_updated: value.slot_updated, - seq: value.seq_updated, + update_version: value.update_version.map(Into::into), } } } diff --git a/nft_ingester/src/gapfiller.rs b/nft_ingester/src/gapfiller.rs index 1cefa5cc6..33a3f5904 100644 --- a/nft_ingester/src/gapfiller.rs +++ b/nft_ingester/src/gapfiller.rs @@ -1,24 +1,49 @@ use crate::error::IngesterError; use entities::models::CompleteAssetDetails; use futures::StreamExt; -use interface::asset_streaming_and_discovery::AssetDetailsStream; +use interface::asset_streaming_and_discovery::AssetDetailsConsumer; use log::error; use rocks_db::Storage; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -pub async fn process_asset_details_stream(storage: Arc, mut stream: AssetDetailsStream) { +pub async fn process_asset_details_stream( + keep_running: Arc, + storage: Arc, + start_slot: u64, + end_slot: u64, + mut consumer: impl AssetDetailsConsumer, +) -> u64 { + let mut stream = match consumer + .get_consumable_stream_in_range(start_slot, end_slot) + .await + { + Ok(stream) => stream, + Err(e) => { + error!("Error consume asset details stream in range: {}", e); + return 0; + } + }; + + let mut processed_slots = 0; while let Some(result) = stream.next().await { + if !keep_running.load(Ordering::SeqCst) { + break; + } match result { Ok(details) => { if let Some(e) = insert_gaped_data(storage.clone(), details).await.err() { error!("Error processing gaped data: {}", e) } + processed_slots += 1; } Err(e) => { error!("Error processing stream item: {}", e); } } } + + processed_slots } pub async fn insert_gaped_data( From 02b1624738a761595415850d8b1be3b428c90171 Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Apr 2024 15:07:53 +0300 Subject: [PATCH 06/17] error handling --- Cargo.lock | 1 + grpc/Cargo.toml | 1 + grpc/proto/gap_filler.proto | 15 ++-- grpc/src/client.rs | 3 +- grpc/src/error/mod.rs | 11 +++ grpc/src/gapfiller.rs | 14 ++-- grpc/src/lib.rs | 1 + grpc/src/mapper.rs | 160 +++++++++++++++++++++++------------- 8 files changed, 135 insertions(+), 71 deletions(-) create mode 100644 grpc/src/error/mod.rs diff --git a/Cargo.lock b/Cargo.lock index d6c14d037..74833b6da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2720,6 +2720,7 @@ dependencies = [ "mockall 0.12.1", "prost 0.12.3", "solana-sdk", + "thiserror", "tokio", "tonic 0.10.2", "tonic-build 0.10.2", diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index 69e4e24a5..351d27b66 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -14,6 +14,7 @@ interface = { path = "../interface" } entities = { path = "../entities" } tokio = { version = "1.35.1", features = ["full"] } async-trait = "0.1.74" +thiserror = "1.0.31" [dev-dependencies] mockall = "0.12.0" diff --git a/grpc/proto/gap_filler.proto b/grpc/proto/gap_filler.proto index 838b35a76..e3aa666aa 100644 --- a/grpc/proto/gap_filler.proto +++ b/grpc/proto/gap_filler.proto @@ -137,17 +137,18 @@ message AssetDetails { DynamicUint64Field lamports = 21; DynamicBoolField executable = 22; DynamicStringField metadata_owner = 23; + DynamicStringField url = 24; - AssetLeaf asset_leaf = 24; - AssetCollection collection = 25; - ChainDataV1 chain_data = 26; + AssetLeaf asset_leaf = 25; + AssetCollection collection = 26; + ChainDataV1 chain_data = 27; - ClLeaf cl_leaf = 27; - repeated ClItem cl_items = 28; + ClLeaf cl_leaf = 28; + repeated ClItem cl_items = 29; // From TokenMetadataEdition - EditionV1 edition = 29; - MasterEdition master_edition = 30; + EditionV1 edition = 30; + MasterEdition master_edition = 31; } // Dynamic field messages diff --git a/grpc/src/client.rs b/grpc/src/client.rs index 2b1c44592..c89963410 100644 --- a/grpc/src/client.rs +++ b/grpc/src/client.rs @@ -1,3 +1,4 @@ +use crate::error::GrpcError; use crate::gapfiller::gap_filler_service_client::GapFillerServiceClient; use crate::gapfiller::RangeRequest; use async_trait::async_trait; @@ -47,7 +48,7 @@ impl AssetDetailsConsumer for Client { .and_then(|asset_details| { asset_details .try_into() - .map_err(|e| Status::new(Code::Internal, e)) + .map_err(|e: GrpcError| Status::new(Code::Internal, e.to_string())) }) .map_err(|e| Box::new(e) as AsyncError) }), diff --git a/grpc/src/error/mod.rs b/grpc/src/error/mod.rs new file mode 100644 index 000000000..a3618a2f0 --- /dev/null +++ b/grpc/src/error/mod.rs @@ -0,0 +1,11 @@ +use thiserror::Error; + +#[derive(Error, Debug, PartialEq, Eq)] +pub enum GrpcError { + #[error("Pubkey from: {0:?}")] + PubkeyFrom(Vec), + #[error("Missing field: {0}")] + MissingField(String), + #[error("Cannot cast enum: {0} {1}")] + EnumCast(String, String), +} diff --git a/grpc/src/gapfiller.rs b/grpc/src/gapfiller.rs index f0dc0aa79..51f2261a5 100644 --- a/grpc/src/gapfiller.rs +++ b/grpc/src/gapfiller.rs @@ -134,19 +134,21 @@ pub struct AssetDetails { #[prost(message, optional, tag = "23")] pub metadata_owner: ::core::option::Option, #[prost(message, optional, tag = "24")] - pub asset_leaf: ::core::option::Option, + pub url: ::core::option::Option, #[prost(message, optional, tag = "25")] - pub collection: ::core::option::Option, + pub asset_leaf: ::core::option::Option, #[prost(message, optional, tag = "26")] - pub chain_data: ::core::option::Option, + pub collection: ::core::option::Option, #[prost(message, optional, tag = "27")] + pub chain_data: ::core::option::Option, + #[prost(message, optional, tag = "28")] pub cl_leaf: ::core::option::Option, - #[prost(message, repeated, tag = "28")] + #[prost(message, repeated, tag = "29")] pub cl_items: ::prost::alloc::vec::Vec, /// From TokenMetadataEdition - #[prost(message, optional, tag = "29")] - pub edition: ::core::option::Option, #[prost(message, optional, tag = "30")] + pub edition: ::core::option::Option, + #[prost(message, optional, tag = "31")] pub master_edition: ::core::option::Option, } /// Dynamic field messages diff --git a/grpc/src/lib.rs b/grpc/src/lib.rs index b66378937..19f7d755d 100644 --- a/grpc/src/lib.rs +++ b/grpc/src/lib.rs @@ -1,4 +1,5 @@ pub mod client; +pub mod error; pub mod gapfiller; mod mapper; pub mod service; diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 5aeb61427..30aa00e5a 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -1,3 +1,4 @@ +use crate::error::GrpcError; use crate::gapfiller::{ AssetCollection, AssetDetails, AssetLeaf, ChainDataV1, ChainMutability, ClItem, ClLeaf, Creator, DynamicBoolField, DynamicBytesField, DynamicChainMutability, DynamicCreatorsField, @@ -8,6 +9,7 @@ use crate::gapfiller::{ use entities::models::{CompleteAssetDetails, UpdateVersion, Updated}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; + impl From for AssetDetails { fn from(value: CompleteAssetDetails) -> Self { let delegate = value.delegate.value.map(|key| DynamicBytesField { @@ -57,6 +59,7 @@ impl From for AssetDetails { lamports: value.lamports.map(|v| v.into()), executable: value.executable.map(|v| v.into()), metadata_owner: value.metadata_owner.map(|v| v.into()), + url: Some(value.url.into()), asset_leaf: value.asset_leaf.map(|v| v.into()), collection: value.collection.map(|v| v.into()), chain_data: value.onchain_data.map(|v| v.into()), @@ -69,7 +72,7 @@ impl From for AssetDetails { } impl TryFrom for CompleteAssetDetails { - type Error = String; + type Error = GrpcError; fn try_from(value: AssetDetails) -> Result { let owner_delegate_seq = value @@ -84,30 +87,57 @@ impl TryFrom for CompleteAssetDetails { .unwrap_or_default(); Ok(Self { - pubkey: Pubkey::try_from(value.pubkey).unwrap_or_default(), + pubkey: Pubkey::try_from(value.pubkey).map_err(|e| GrpcError::PubkeyFrom(e))?, specification_asset_class: entities::enums::SpecificationAssetClass::from( - SpecificationAssetClass::try_from(value.specification_asset_class) - .unwrap_or_default(), + SpecificationAssetClass::try_from(value.specification_asset_class).map_err( + |e| GrpcError::EnumCast("SpecificationAssetClass".to_string(), e.to_string()), + )?, ), royalty_target_type: entities::enums::RoyaltyTargetType::from( - RoyaltyTargetType::try_from(value.royalty_target_type).unwrap_or_default(), + RoyaltyTargetType::try_from(value.royalty_target_type).map_err(|e| { + GrpcError::EnumCast("RoyaltyTargetType".to_string(), e.to_string()) + })?, ), slot_created: value.slot_created, edition_address: value .edition_address .map(TryInto::try_into) .transpose() - .map_err(|e| format!("{:?}", e))?, - is_compressible: value.is_compressible.map(Into::into).unwrap_or_default(), - is_compressed: value.is_compressed.map(Into::into).unwrap_or_default(), - is_frozen: value.is_frozen.map(Into::into).unwrap_or_default(), + .map_err(|e| GrpcError::PubkeyFrom(e))?, + is_compressible: value + .is_compressible + .map(Into::into) + .ok_or(GrpcError::MissingField("is_compressible".to_string()))?, + is_compressed: value + .is_compressed + .map(Into::into) + .ok_or(GrpcError::MissingField("is_compressed".to_string()))?, + is_frozen: value + .is_frozen + .map(Into::into) + .ok_or(GrpcError::MissingField("is_frozen".to_string()))?, supply: value.supply.map(Into::into), seq: value.seq.map(Into::into), - is_burnt: value.is_burnt.map(Into::into).unwrap_or_default(), - was_decompressed: value.was_decompressed.map(Into::into).unwrap_or_default(), - creators: value.creators.map(Into::into).unwrap_or_default(), - royalty_amount: value.royalty_amount.map(Into::into).unwrap_or_default(), - url: Default::default(), + is_burnt: value + .is_burnt + .map(Into::into) + .ok_or(GrpcError::MissingField("is_burnt".to_string()))?, + was_decompressed: value + .was_decompressed + .map(Into::into) + .ok_or(GrpcError::MissingField("was_decompressed".to_string()))?, + creators: value + .creators + .map(Into::into) + .ok_or(GrpcError::MissingField("creators".to_string()))?, + royalty_amount: value + .royalty_amount + .map(Into::into) + .ok_or(GrpcError::MissingField("royalty_amount".to_string()))?, + url: value + .url + .map(Into::into) + .ok_or(GrpcError::MissingField("url".to_string()))?, chain_mutability: None, lamports: value.lamports.map(Into::into), executable: value.executable.map(Into::into), @@ -116,7 +146,7 @@ impl TryFrom for CompleteAssetDetails { .authority .map(TryInto::try_into) .transpose()? - .unwrap_or_default(), + .ok_or(GrpcError::MissingField("authority".to_string()))?, owner: value .owner .map(TryInto::try_into) @@ -127,11 +157,15 @@ impl TryFrom for CompleteAssetDetails { .map(TryInto::try_into) .transpose()? .unwrap_or_default(), - owner_type: value.owner_type.map(Into::into).unwrap_or_default(), + owner_type: value + .owner_type + .map(TryInto::try_into) + .transpose()? + .ok_or(GrpcError::MissingField("owner_type".to_string()))?, owner_delegate_seq, asset_leaf: value.asset_leaf.map(TryInto::try_into).transpose()?, collection: value.collection.map(TryInto::try_into).transpose()?, - onchain_data: value.chain_data.map(Into::into), + onchain_data: value.chain_data.map(TryInto::try_into).transpose()?, cl_leaf: value.cl_leaf.map(TryInto::try_into).transpose()?, cl_items: value .cl_items @@ -313,48 +347,50 @@ impl From for Updated { } impl TryFrom for Updated> { - type Error = String; + type Error = GrpcError; fn try_from(value: DynamicBytesField) -> Result { Ok(Self::new( value.slot_updated, value.update_version.map(Into::into), - Some(Pubkey::try_from(value.value).map_err(|e| format!("{:?}", e))?), + Some(Pubkey::try_from(value.value).map_err(|e| GrpcError::PubkeyFrom(e))?), )) } } impl TryFrom for Updated { - type Error = String; + type Error = GrpcError; fn try_from(value: DynamicBytesField) -> Result { Ok(Self { - value: Pubkey::try_from(value.value).map_err(|e| format!("{:?}", e))?, + value: Pubkey::try_from(value.value).map_err(|e| GrpcError::PubkeyFrom(e))?, slot_updated: value.slot_updated, update_version: value.update_version.map(Into::into), }) } } -// TODO -impl From for Updated { - fn from(value: DynamicEnumField) -> Self { - Self { +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: DynamicEnumField) -> Result { + Ok(Self { value: entities::enums::OwnerType::from( - OwnerType::try_from(value.value).unwrap_or_default(), + OwnerType::try_from(value.value) + .map_err(|e| GrpcError::EnumCast("OwnerType".to_string(), e.to_string()))?, ), slot_updated: value.slot_updated, update_version: value.update_version.map(Into::into), - } + }) } } impl TryFrom for entities::models::Creator { - type Error = String; + type Error = GrpcError; fn try_from(value: Creator) -> Result { Ok(Self { - creator: Pubkey::try_from(value.creator).map_err(|e| format!("{:?}", e))?, + creator: Pubkey::try_from(value.creator).map_err(|e| GrpcError::PubkeyFrom(e))?, creator_verified: value.creator_verified, creator_share: value.creator_share as u8, }) @@ -375,29 +411,29 @@ impl From for Updated> { } impl TryFrom for Updated { - type Error = String; + type Error = GrpcError; fn try_from(value: AssetLeaf) -> Result { Ok(Self { slot_updated: value.slot_updated, update_version: value.update_version.map(Into::into), value: entities::models::AssetLeaf { - tree_id: Pubkey::try_from(value.tree_id).map_err(|e| format!("{:?}", e))?, + tree_id: Pubkey::try_from(value.tree_id).map_err(|e| GrpcError::PubkeyFrom(e))?, leaf: value.leaf.clone(), nonce: value.nonce, data_hash: value .data_hash .map(|h| { - Ok::<_, String>(Hash::from( - <[u8; 32]>::try_from(h).map_err(|e| format!("{:?}", e))?, + Ok::<_, GrpcError>(Hash::from( + <[u8; 32]>::try_from(h).map_err(|e| GrpcError::PubkeyFrom(e))?, )) }) .transpose()?, creator_hash: value .creator_hash .map(|h| { - Ok::<_, String>(Hash::from( - <[u8; 32]>::try_from(h).map_err(|e| format!("{:?}", e))?, + Ok::<_, GrpcError>(Hash::from( + <[u8; 32]>::try_from(h).map_err(|e| GrpcError::PubkeyFrom(e))?, )) }) .transpose()?, @@ -496,11 +532,11 @@ impl From for EditionV1 { } impl TryFrom for entities::models::MasterEdition { - type Error = String; + type Error = GrpcError; fn try_from(value: MasterEdition) -> Result { Ok(Self { - key: Pubkey::try_from(value.key).map_err(|e| format!("{:?}", e))?, + key: Pubkey::try_from(value.key).map_err(|e| GrpcError::PubkeyFrom(e))?, supply: value.supply, max_supply: value.max_supply, write_version: value.write_version, @@ -509,12 +545,12 @@ impl TryFrom for entities::models::MasterEdition { } impl TryFrom for entities::models::EditionV1 { - type Error = String; + type Error = GrpcError; fn try_from(value: EditionV1) -> Result { Ok(Self { - key: Pubkey::try_from(value.key).map_err(|e| format!("{:?}", e))?, - parent: Pubkey::try_from(value.parent).map_err(|e| format!("{:?}", e))?, + key: Pubkey::try_from(value.key).map_err(|e| GrpcError::PubkeyFrom(e))?, + parent: Pubkey::try_from(value.parent).map_err(|e| GrpcError::PubkeyFrom(e))?, edition: value.edition, write_version: value.write_version, }) @@ -522,19 +558,20 @@ impl TryFrom for entities::models::EditionV1 { } impl TryFrom for entities::models::ClLeaf { - type Error = String; + type Error = GrpcError; fn try_from(value: ClLeaf) -> Result { Ok(Self { cli_leaf_idx: value.cli_leaf_idx, - cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(|e| format!("{:?}", e))?, + cli_tree_key: Pubkey::try_from(value.cli_tree_key) + .map_err(|e| GrpcError::PubkeyFrom(e))?, cli_node_idx: value.cli_node_idx, }) } } impl TryFrom for entities::models::ClItem { - type Error = String; + type Error = GrpcError; fn try_from(value: ClItem) -> Result { Ok(Self { @@ -542,7 +579,8 @@ impl TryFrom for entities::models::ClItem { cli_seq: value.cli_seq, cli_level: value.cli_level, cli_hash: value.cli_hash, - cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(|e| format!("{:?}", e))?, + cli_tree_key: Pubkey::try_from(value.cli_tree_key) + .map_err(|e| GrpcError::PubkeyFrom(e))?, cli_node_idx: value.cli_node_idx, slot_updated: value.slot_updated, }) @@ -550,12 +588,13 @@ impl TryFrom for entities::models::ClItem { } impl TryFrom for Updated { - type Error = String; + type Error = GrpcError; fn try_from(value: AssetCollection) -> Result { Ok(Self { value: entities::models::AssetCollection { - collection: Pubkey::try_from(value.collection).map_err(|e| format!("{:?}", e))?, + collection: Pubkey::try_from(value.collection) + .map_err(|e| GrpcError::PubkeyFrom(e))?, is_collection_verified: value.is_collection_verified, collection_seq: value.collection_seq, }, @@ -565,34 +604,41 @@ impl TryFrom for Updated { } } -impl From for Updated { - fn from(value: ChainDataV1) -> Self { - Self { +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: ChainDataV1) -> Result { + Ok(Self { value: entities::models::ChainDataV1 { name: value.name.clone(), symbol: value.symbol.clone(), edition_nonce: value.edition_nonce.map(|v| v as u8), primary_sale_happened: value.primary_sale_happened, token_standard: Some(entities::enums::TokenStandard::from( - TokenStandard::try_from(value.token_standard).unwrap_or_default(), + TokenStandard::try_from(value.token_standard).map_err(|e| { + GrpcError::EnumCast("TokenStandard".to_string(), e.to_string()) + })?, )), - uses: value.uses.map(|v| v.into()), + uses: value.uses.map(TryInto::try_into).transpose()?, }, slot_updated: value.slot_updated, update_version: value.update_version.map(Into::into), - } + }) } } -impl From for entities::models::Uses { - fn from(value: Uses) -> Self { - Self { +impl TryFrom for entities::models::Uses { + type Error = GrpcError; + + fn try_from(value: Uses) -> Result { + Ok(Self { use_method: entities::enums::UseMethod::from( - UseMethod::try_from(value.use_method).unwrap_or_default(), + UseMethod::try_from(value.use_method) + .map_err(|e| GrpcError::EnumCast("UseMethod".to_string(), e.to_string()))?, ), remaining: value.remaining, total: value.total, - } + }) } } macro_rules! impl_from_enum { From a8a9186b38ef4488b397afa7f7eccac17ebf8f70 Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Apr 2024 15:17:17 +0300 Subject: [PATCH 07/17] starting service --- nft_ingester/src/bin/ingester/main.rs | 30 +++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index d97d4fd6b..9f566028f 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -17,6 +17,7 @@ use tokio::task::JoinSet; use tokio::time::Instant; use backfill_rpc::rpc::BackfillRPC; +use grpc::client::Client; use interface::error::{StorageError, UsecaseError}; use interface::signature_persistence::{BlockProducer, ProcessingDataGetter}; use metrics_utils::utils::start_metrics; @@ -48,6 +49,7 @@ use nft_ingester::backfiller::{ TransactionsParser, }; use nft_ingester::fork_cleaner::ForkCleaner; +use nft_ingester::gapfiller::process_asset_details_stream; use nft_ingester::mpl_core_processor::MplCoreProcessor; use nft_ingester::sequence_consistent::SequenceConsistentGapfiller; use usecase::bigtable::BigTableClient; @@ -181,6 +183,7 @@ pub async fn main() -> Result<(), IngesterError> { .unwrap(); let rocks_storage = Arc::new(storage); + let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap_or_default(); // TODO: change to unwrap when we will have all gapfill logic implemented let synchronizer = Synchronizer::new( rocks_storage.clone(), index_storage.clone(), @@ -247,8 +250,6 @@ pub async fn main() -> Result<(), IngesterError> { } })); - let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap_or(0); - // start backup service let backup_cfg = backup_service::load_config()?; let mut backup_service = BackupService::new(rocks_storage.db.clone(), &backup_cfg)?; @@ -372,6 +373,31 @@ pub async fn main() -> Result<(), IngesterError> { } })); + match Client::connect(config.clone()).await { + Ok(gaped_data_client) => { + while first_processed_slot.load(Ordering::SeqCst) == 0 { + tokio::time::sleep(Duration::from_millis(100)).await + } + let cloned_keep_running = keep_running.clone(); + let cloned_rocks_storage = rocks_storage.clone(); + mutexed_tasks.lock().await.spawn(async move { + info!( + "Processed {} gaped slots", + process_asset_details_stream( + cloned_keep_running, + cloned_rocks_storage, + newest_restored_slot, + first_processed_slot.load(Ordering::SeqCst), + gaped_data_client, + ) + .await + ); + Ok(()) + }); + } + Err(e) => error!("GRPC Client new: {}", e), + }; + let cloned_keep_running = keep_running.clone(); let cloned_rocks_storage = rocks_storage.clone(); let cloned_red_metrics = metrics_state.red_metrics.clone(); From 8f1342dd80b20fdf91317b7dfa0b477946b1b83b Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Apr 2024 15:50:41 +0300 Subject: [PATCH 08/17] clippy --- grpc/src/mapper.rs | 52 +++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 30aa00e5a..66f5f3209 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -87,7 +87,7 @@ impl TryFrom for CompleteAssetDetails { .unwrap_or_default(); Ok(Self { - pubkey: Pubkey::try_from(value.pubkey).map_err(|e| GrpcError::PubkeyFrom(e))?, + pubkey: Pubkey::try_from(value.pubkey).map_err(GrpcError::PubkeyFrom)?, specification_asset_class: entities::enums::SpecificationAssetClass::from( SpecificationAssetClass::try_from(value.specification_asset_class).map_err( |e| GrpcError::EnumCast("SpecificationAssetClass".to_string(), e.to_string()), @@ -103,7 +103,7 @@ impl TryFrom for CompleteAssetDetails { .edition_address .map(TryInto::try_into) .transpose() - .map_err(|e| GrpcError::PubkeyFrom(e))?, + .map_err(GrpcError::PubkeyFrom)?, is_compressible: value .is_compressible .map(Into::into) @@ -128,7 +128,8 @@ impl TryFrom for CompleteAssetDetails { .ok_or(GrpcError::MissingField("was_decompressed".to_string()))?, creators: value .creators - .map(Into::into) + .map(TryInto::try_into) + .transpose()? .ok_or(GrpcError::MissingField("creators".to_string()))?, royalty_amount: value .royalty_amount @@ -170,8 +171,8 @@ impl TryFrom for CompleteAssetDetails { cl_items: value .cl_items .into_iter() - .flat_map(TryInto::try_into) - .collect(), + .map(TryInto::::try_into) + .collect::, _>>()?, edition: value.edition.map(TryInto::try_into).transpose()?, master_edition: value.master_edition.map(TryInto::try_into).transpose()?, }) @@ -353,7 +354,7 @@ impl TryFrom for Updated> { Ok(Self::new( value.slot_updated, value.update_version.map(Into::into), - Some(Pubkey::try_from(value.value).map_err(|e| GrpcError::PubkeyFrom(e))?), + Some(Pubkey::try_from(value.value).map_err(GrpcError::PubkeyFrom)?), )) } } @@ -363,7 +364,7 @@ impl TryFrom for Updated { fn try_from(value: DynamicBytesField) -> Result { Ok(Self { - value: Pubkey::try_from(value.value).map_err(|e| GrpcError::PubkeyFrom(e))?, + value: Pubkey::try_from(value.value).map_err(GrpcError::PubkeyFrom)?, slot_updated: value.slot_updated, update_version: value.update_version.map(Into::into), }) @@ -390,23 +391,25 @@ impl TryFrom for entities::models::Creator { fn try_from(value: Creator) -> Result { Ok(Self { - creator: Pubkey::try_from(value.creator).map_err(|e| GrpcError::PubkeyFrom(e))?, + creator: Pubkey::try_from(value.creator).map_err(GrpcError::PubkeyFrom)?, creator_verified: value.creator_verified, creator_share: value.creator_share as u8, }) } } -impl From for Updated> { - fn from(value: DynamicCreatorsField) -> Self { - Self { +impl TryFrom for Updated> { + type Error = GrpcError; + + fn try_from(value: DynamicCreatorsField) -> Result { + Ok(Self { value: value .creators .into_iter() - .flat_map(TryInto::try_into) - .collect(), + .map(TryInto::try_into) + .collect::, _>>()?, slot_updated: value.slot_updated, update_version: value.update_version.map(Into::into), - } + }) } } @@ -418,14 +421,14 @@ impl TryFrom for Updated { slot_updated: value.slot_updated, update_version: value.update_version.map(Into::into), value: entities::models::AssetLeaf { - tree_id: Pubkey::try_from(value.tree_id).map_err(|e| GrpcError::PubkeyFrom(e))?, + tree_id: Pubkey::try_from(value.tree_id).map_err(GrpcError::PubkeyFrom)?, leaf: value.leaf.clone(), nonce: value.nonce, data_hash: value .data_hash .map(|h| { Ok::<_, GrpcError>(Hash::from( - <[u8; 32]>::try_from(h).map_err(|e| GrpcError::PubkeyFrom(e))?, + <[u8; 32]>::try_from(h).map_err(GrpcError::PubkeyFrom)?, )) }) .transpose()?, @@ -433,7 +436,7 @@ impl TryFrom for Updated { .creator_hash .map(|h| { Ok::<_, GrpcError>(Hash::from( - <[u8; 32]>::try_from(h).map_err(|e| GrpcError::PubkeyFrom(e))?, + <[u8; 32]>::try_from(h).map_err(GrpcError::PubkeyFrom)?, )) }) .transpose()?, @@ -536,7 +539,7 @@ impl TryFrom for entities::models::MasterEdition { fn try_from(value: MasterEdition) -> Result { Ok(Self { - key: Pubkey::try_from(value.key).map_err(|e| GrpcError::PubkeyFrom(e))?, + key: Pubkey::try_from(value.key).map_err(GrpcError::PubkeyFrom)?, supply: value.supply, max_supply: value.max_supply, write_version: value.write_version, @@ -549,8 +552,8 @@ impl TryFrom for entities::models::EditionV1 { fn try_from(value: EditionV1) -> Result { Ok(Self { - key: Pubkey::try_from(value.key).map_err(|e| GrpcError::PubkeyFrom(e))?, - parent: Pubkey::try_from(value.parent).map_err(|e| GrpcError::PubkeyFrom(e))?, + key: Pubkey::try_from(value.key).map_err(GrpcError::PubkeyFrom)?, + parent: Pubkey::try_from(value.parent).map_err(GrpcError::PubkeyFrom)?, edition: value.edition, write_version: value.write_version, }) @@ -563,8 +566,7 @@ impl TryFrom for entities::models::ClLeaf { fn try_from(value: ClLeaf) -> Result { Ok(Self { cli_leaf_idx: value.cli_leaf_idx, - cli_tree_key: Pubkey::try_from(value.cli_tree_key) - .map_err(|e| GrpcError::PubkeyFrom(e))?, + cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(GrpcError::PubkeyFrom)?, cli_node_idx: value.cli_node_idx, }) } @@ -579,8 +581,7 @@ impl TryFrom for entities::models::ClItem { cli_seq: value.cli_seq, cli_level: value.cli_level, cli_hash: value.cli_hash, - cli_tree_key: Pubkey::try_from(value.cli_tree_key) - .map_err(|e| GrpcError::PubkeyFrom(e))?, + cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(GrpcError::PubkeyFrom)?, cli_node_idx: value.cli_node_idx, slot_updated: value.slot_updated, }) @@ -593,8 +594,7 @@ impl TryFrom for Updated { fn try_from(value: AssetCollection) -> Result { Ok(Self { value: entities::models::AssetCollection { - collection: Pubkey::try_from(value.collection) - .map_err(|e| GrpcError::PubkeyFrom(e))?, + collection: Pubkey::try_from(value.collection).map_err(GrpcError::PubkeyFrom)?, is_collection_verified: value.is_collection_verified, collection_seq: value.collection_seq, }, From 5811c2778cfd94264add2bdf8ca23f0550d74059 Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Apr 2024 15:57:35 +0300 Subject: [PATCH 09/17] chain_mutability --- grpc/src/mapper.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 66f5f3209..0e60acfa6 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -139,7 +139,7 @@ impl TryFrom for CompleteAssetDetails { .url .map(Into::into) .ok_or(GrpcError::MissingField("url".to_string()))?, - chain_mutability: None, + chain_mutability: value.chain_mutability.map(TryInto::try_into).transpose()?, lamports: value.lamports.map(Into::into), executable: value.executable.map(Into::into), metadata_owner: value.metadata_owner.map(Into::into), @@ -446,6 +446,22 @@ impl TryFrom for Updated { } } +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: DynamicChainMutability) -> Result { + Ok(Self { + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + value: entities::enums::ChainMutability::from( + ChainMutability::try_from(value.value).map_err(|e| { + GrpcError::EnumCast("ChainMutability".to_string(), e.to_string()) + })?, + ), + }) + } +} + impl From for ClLeaf { fn from(value: entities::models::ClLeaf) -> Self { Self { From ec12da0969233a04eeb01086508c9a9da02c96bc Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Apr 2024 16:03:50 +0300 Subject: [PATCH 10/17] add comment --- grpc/src/mapper.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 0e60acfa6..e76353673 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -148,6 +148,7 @@ impl TryFrom for CompleteAssetDetails { .map(TryInto::try_into) .transpose()? .ok_or(GrpcError::MissingField("authority".to_string()))?, + // unwrap_or_default used for fields with Update> type owner: value .owner .map(TryInto::try_into) From 4f8fbcc71dc603801a7f41209e25b3e6eef00b11 Mon Sep 17 00:00:00 2001 From: requesco Date: Wed, 3 Apr 2024 16:16:34 +0300 Subject: [PATCH 11/17] add missing fields --- entities/src/models.rs | 7 ++++++ grpc/proto/gap_filler.proto | 21 ++++++++++------ grpc/src/gapfiller.rs | 26 +++++++++++++++----- grpc/src/mapper.rs | 34 ++++++++++++++++++++++++++ rocks-db/src/asset_streaming_client.rs | 7 ++++++ rocks-db/src/batch_client.rs | 8 +++++- 6 files changed, 89 insertions(+), 14 deletions(-) diff --git a/entities/src/models.rs b/entities/src/models.rs index aef4e010b..b88729732 100644 --- a/entities/src/models.rs +++ b/entities/src/models.rs @@ -92,6 +92,13 @@ pub struct CompleteAssetDetails { pub lamports: Option>, pub executable: Option>, pub metadata_owner: Option>, + pub raw_name: Option>, + pub plugins: Option>, + pub unknown_plugins: Option>, + pub rent_epoch: Option>, + pub num_minted: Option>, + pub current_size: Option>, + pub plugins_json_version: Option>, // From AssetAuthority as Tuple pub authority: Updated, diff --git a/grpc/proto/gap_filler.proto b/grpc/proto/gap_filler.proto index e3aa666aa..52bca9693 100644 --- a/grpc/proto/gap_filler.proto +++ b/grpc/proto/gap_filler.proto @@ -138,17 +138,24 @@ message AssetDetails { DynamicBoolField executable = 22; DynamicStringField metadata_owner = 23; DynamicStringField url = 24; + DynamicStringField raw_name = 25; + DynamicStringField plugins = 26; + DynamicStringField unknown_plugins = 27; + DynamicUint64Field rent_epoch = 28; + DynamicUint32Field num_minted = 29; + DynamicUint32Field current_size = 30; + DynamicUint32Field plugins_json_version = 31; - AssetLeaf asset_leaf = 25; - AssetCollection collection = 26; - ChainDataV1 chain_data = 27; + AssetLeaf asset_leaf = 32; + AssetCollection collection = 33; + ChainDataV1 chain_data = 34; - ClLeaf cl_leaf = 28; - repeated ClItem cl_items = 29; + ClLeaf cl_leaf = 35; + repeated ClItem cl_items = 36; // From TokenMetadataEdition - EditionV1 edition = 30; - MasterEdition master_edition = 31; + EditionV1 edition = 37; + MasterEdition master_edition = 38; } // Dynamic field messages diff --git a/grpc/src/gapfiller.rs b/grpc/src/gapfiller.rs index 51f2261a5..7f8b1bec6 100644 --- a/grpc/src/gapfiller.rs +++ b/grpc/src/gapfiller.rs @@ -136,19 +136,33 @@ pub struct AssetDetails { #[prost(message, optional, tag = "24")] pub url: ::core::option::Option, #[prost(message, optional, tag = "25")] - pub asset_leaf: ::core::option::Option, + pub raw_name: ::core::option::Option, #[prost(message, optional, tag = "26")] - pub collection: ::core::option::Option, + pub plugins: ::core::option::Option, #[prost(message, optional, tag = "27")] - pub chain_data: ::core::option::Option, + pub unknown_plugins: ::core::option::Option, #[prost(message, optional, tag = "28")] + pub rent_epoch: ::core::option::Option, + #[prost(message, optional, tag = "29")] + pub num_minted: ::core::option::Option, + #[prost(message, optional, tag = "30")] + pub current_size: ::core::option::Option, + #[prost(message, optional, tag = "31")] + pub plugins_json_version: ::core::option::Option, + #[prost(message, optional, tag = "32")] + pub asset_leaf: ::core::option::Option, + #[prost(message, optional, tag = "33")] + pub collection: ::core::option::Option, + #[prost(message, optional, tag = "34")] + pub chain_data: ::core::option::Option, + #[prost(message, optional, tag = "35")] pub cl_leaf: ::core::option::Option, - #[prost(message, repeated, tag = "29")] + #[prost(message, repeated, tag = "36")] pub cl_items: ::prost::alloc::vec::Vec, /// From TokenMetadataEdition - #[prost(message, optional, tag = "30")] + #[prost(message, optional, tag = "37")] pub edition: ::core::option::Option, - #[prost(message, optional, tag = "31")] + #[prost(message, optional, tag = "38")] pub master_edition: ::core::option::Option, } /// Dynamic field messages diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index e76353673..143a382c2 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -60,6 +60,13 @@ impl From for AssetDetails { executable: value.executable.map(|v| v.into()), metadata_owner: value.metadata_owner.map(|v| v.into()), url: Some(value.url.into()), + raw_name: value.raw_name.map(Into::into), + plugins: value.plugins.map(Into::into), + unknown_plugins: value.unknown_plugins.map(Into::into), + rent_epoch: value.rent_epoch.map(Into::into), + num_minted: value.num_minted.map(Into::into), + current_size: value.current_size.map(Into::into), + plugins_json_version: value.plugins_json_version.map(Into::into), asset_leaf: value.asset_leaf.map(|v| v.into()), collection: value.collection.map(|v| v.into()), chain_data: value.onchain_data.map(|v| v.into()), @@ -143,6 +150,13 @@ impl TryFrom for CompleteAssetDetails { lamports: value.lamports.map(Into::into), executable: value.executable.map(Into::into), metadata_owner: value.metadata_owner.map(Into::into), + raw_name: value.raw_name.map(Into::into), + plugins: value.plugins.map(Into::into), + unknown_plugins: value.unknown_plugins.map(Into::into), + rent_epoch: value.rent_epoch.map(Into::into), + num_minted: value.num_minted.map(Into::into), + current_size: value.current_size.map(Into::into), + plugins_json_version: value.plugins_json_version.map(Into::into), authority: value .authority .map(TryInto::try_into) @@ -224,6 +238,26 @@ impl From> for DynamicUint64Field { } } +impl From> for DynamicUint32Field { + fn from(value: Updated) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + } + } +} + +impl From for Updated { + fn from(value: DynamicUint32Field) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + } + } +} + impl From> for DynamicStringField { fn from(value: Updated) -> Self { Self { diff --git a/rocks-db/src/asset_streaming_client.rs b/rocks-db/src/asset_streaming_client.rs index 76d0fba97..ea85cf7d6 100644 --- a/rocks-db/src/asset_streaming_client.rs +++ b/rocks-db/src/asset_streaming_client.rs @@ -209,6 +209,13 @@ async fn get_complete_asset_details( lamports: dynamic_data.lamports, executable: dynamic_data.executable, metadata_owner: dynamic_data.metadata_owner, + raw_name: dynamic_data.raw_name, + plugins: dynamic_data.plugins, + unknown_plugins: dynamic_data.unknown_plugins, + rent_epoch: dynamic_data.rent_epoch, + num_minted: dynamic_data.num_minted, + current_size: dynamic_data.current_size, + plugins_json_version: dynamic_data.plugins_json_version, authority: Updated::new(authority.slot_updated, None, authority.authority), owner: owner.owner, delegate: owner.delegate, diff --git a/rocks-db/src/batch_client.rs b/rocks-db/src/batch_client.rs index 3780b9d60..68d08e54d 100644 --- a/rocks-db/src/batch_client.rs +++ b/rocks-db/src/batch_client.rs @@ -299,7 +299,13 @@ impl Storage { lamports: data.lamports, executable: data.executable, metadata_owner: data.metadata_owner, - ..Default::default() + raw_name: data.raw_name, + plugins: data.plugins, + unknown_plugins: data.unknown_plugins, + rent_epoch: data.rent_epoch, + num_minted: data.num_minted, + current_size: data.current_size, + plugins_json_version: data.plugins_json_version, }, )?; From 0314bbdee639a2d88e47535b283a788fe1694f5f Mon Sep 17 00:00:00 2001 From: requesco Date: Tue, 9 Apr 2024 08:40:28 +0300 Subject: [PATCH 12/17] comments --- grpc/src/client.rs | 13 ++++----- grpc/src/error/mod.rs | 10 +++++++ nft_ingester/src/bin/ingester/main.rs | 42 +++++++++++++++------------ nft_ingester/src/gapfiller.rs | 6 ++-- 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/grpc/src/client.rs b/grpc/src/client.rs index c89963410..043653d0a 100644 --- a/grpc/src/client.rs +++ b/grpc/src/client.rs @@ -6,7 +6,8 @@ use futures::StreamExt; use interface::asset_streaming_and_discovery::{ AssetDetailsConsumer, AssetDetailsStreamNonSync, AsyncError, PeerDiscovery, }; -use tonic::transport::{Channel, Error}; +use std::str::FromStr; +use tonic::transport::{Channel, Uri}; use tonic::{Code, Status}; pub struct Client { @@ -14,12 +15,10 @@ pub struct Client { } impl Client { - pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result { - let channel = Channel::from_static(Box::leak( - peer_discovery.get_gapfiller_peer_addr().into_boxed_str(), - )) - .connect() - .await?; + pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result { + let url = Uri::from_str(peer_discovery.get_gapfiller_peer_addr().as_str()) + .map_err(|e| GrpcError::UriCreate(e.to_string()))?; + let channel = Channel::builder(url).connect().await?; Ok(Self { inner: GapFillerServiceClient::new(channel), diff --git a/grpc/src/error/mod.rs b/grpc/src/error/mod.rs index a3618a2f0..cb3db906a 100644 --- a/grpc/src/error/mod.rs +++ b/grpc/src/error/mod.rs @@ -1,4 +1,5 @@ use thiserror::Error; +use tonic::transport::Error; #[derive(Error, Debug, PartialEq, Eq)] pub enum GrpcError { @@ -8,4 +9,13 @@ pub enum GrpcError { MissingField(String), #[error("Cannot cast enum: {0} {1}")] EnumCast(String, String), + #[error("UriCreate: {0}")] + UriCreate(String), + #[error("TonicTransport: {0}")] + TonicTransport(String), +} +impl From for GrpcError { + fn from(value: Error) -> Self { + Self::TonicTransport(value.to_string()) + } } diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 9f566028f..ae0808a99 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -183,7 +183,7 @@ pub async fn main() -> Result<(), IngesterError> { .unwrap(); let rocks_storage = Arc::new(storage); - let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap_or_default(); // TODO: change to unwrap when we will have all gapfill logic implemented + let last_saved_slot = rocks_storage.last_saved_slot()?.unwrap_or_default(); let synchronizer = Synchronizer::new( rocks_storage.clone(), index_storage.clone(), @@ -355,7 +355,7 @@ pub async fn main() -> Result<(), IngesterError> { match slot { Ok(slot) => { if let Some(slot) = slot { - if slot != newest_restored_slot { + if slot != last_saved_slot { first_processed_slot_clone.store(slot, Ordering::SeqCst); break; } @@ -375,25 +375,29 @@ pub async fn main() -> Result<(), IngesterError> { match Client::connect(config.clone()).await { Ok(gaped_data_client) => { - while first_processed_slot.load(Ordering::SeqCst) == 0 { + while first_processed_slot.load(Ordering::SeqCst) == 0 + && keep_running.load(Ordering::SeqCst) + { tokio::time::sleep(Duration::from_millis(100)).await } - let cloned_keep_running = keep_running.clone(); - let cloned_rocks_storage = rocks_storage.clone(); - mutexed_tasks.lock().await.spawn(async move { - info!( - "Processed {} gaped slots", - process_asset_details_stream( - cloned_keep_running, - cloned_rocks_storage, - newest_restored_slot, - first_processed_slot.load(Ordering::SeqCst), - gaped_data_client, - ) - .await - ); - Ok(()) - }); + if keep_running.load(Ordering::SeqCst) { + let cloned_keep_running = keep_running.clone(); + let cloned_rocks_storage = rocks_storage.clone(); + mutexed_tasks.lock().await.spawn(async move { + info!( + "Processed {} gaped assets", + process_asset_details_stream( + cloned_keep_running, + cloned_rocks_storage, + last_saved_slot, + first_processed_slot.load(Ordering::SeqCst), + gaped_data_client, + ) + .await + ); + Ok(()) + }); + } } Err(e) => error!("GRPC Client new: {}", e), }; diff --git a/nft_ingester/src/gapfiller.rs b/nft_ingester/src/gapfiller.rs index 33a3f5904..561dbe5bb 100644 --- a/nft_ingester/src/gapfiller.rs +++ b/nft_ingester/src/gapfiller.rs @@ -25,7 +25,7 @@ pub async fn process_asset_details_stream( } }; - let mut processed_slots = 0; + let mut processed_assets = 0; while let Some(result) = stream.next().await { if !keep_running.load(Ordering::SeqCst) { break; @@ -35,7 +35,7 @@ pub async fn process_asset_details_stream( if let Some(e) = insert_gaped_data(storage.clone(), details).await.err() { error!("Error processing gaped data: {}", e) } - processed_slots += 1; + processed_assets += 1; } Err(e) => { error!("Error processing stream item: {}", e); @@ -43,7 +43,7 @@ pub async fn process_asset_details_stream( } } - processed_slots + processed_assets } pub async fn insert_gaped_data( From b24618406b196b0f0e479cdfc5ac9da364c8f223 Mon Sep 17 00:00:00 2001 From: requesco Date: Thu, 25 Apr 2024 18:38:24 +0300 Subject: [PATCH 13/17] comments --- digital_asset_types/src/dao/scopes/asset.rs | 3 +-- entities/src/models.rs | 9 +++++++++ grpc/proto/gap_filler.proto | 6 ++++++ grpc/src/gapfiller.rs | 10 ++++++++++ grpc/src/mapper.rs | 20 +++++++++++++++++--- nft_ingester/src/bin/migrator/main.rs | 3 +-- nft_ingester/src/bin/raw_backup/main.rs | 6 ++---- nft_ingester/src/json_downloader.rs | 3 ++- rocks-db/src/asset.rs | 4 ++-- rocks-db/src/asset_streaming_client.rs | 8 +++++--- rocks-db/src/lib.rs | 8 ++++---- rocks-db/src/offchain_data.rs | 15 +-------------- tests/setup/src/rocks.rs | 6 +++--- 13 files changed, 63 insertions(+), 38 deletions(-) diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index 0cc78b8d7..33956f744 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -4,7 +4,7 @@ use std::string::ToString; use std::sync::Arc; use entities::api_req_params::{AssetSortDirection, Options}; -use entities::models::AssetSignatureWithPagination; +use entities::models::{AssetSignatureWithPagination, OffChainData}; use interface::asset_sigratures::AssetSignaturesGetter; use log::error; use sea_orm::prelude::Json; @@ -15,7 +15,6 @@ use rocks_db::asset::{ AssetAuthority, AssetCollection, AssetDynamicDetails, AssetLeaf, AssetOwner, AssetSelectedMaps, AssetStaticDetails, }; -use rocks_db::offchain_data::OffChainData; use rocks_db::Storage; use crate::dao::sea_orm_active_enums::{ diff --git a/entities/src/models.rs b/entities/src/models.rs index b88729732..dde6fd8d6 100644 --- a/entities/src/models.rs +++ b/entities/src/models.rs @@ -67,6 +67,12 @@ pub struct Creator { pub creator_share: u8, } +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct OffChainData { + pub url: String, + pub metadata: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct CompleteAssetDetails { // From AssetStaticDetails @@ -120,6 +126,9 @@ pub struct CompleteAssetDetails { // TokenMetadataEdition pub edition: Option, pub master_edition: Option, + + // OffChainData + pub offchain_data: Option, } /// Leaf information about compressed asset diff --git a/grpc/proto/gap_filler.proto b/grpc/proto/gap_filler.proto index 52bca9693..21aff6bef 100644 --- a/grpc/proto/gap_filler.proto +++ b/grpc/proto/gap_filler.proto @@ -156,6 +156,7 @@ message AssetDetails { // From TokenMetadataEdition EditionV1 edition = 37; MasterEdition master_edition = 38; + OffchainData offchain_data = 39; } // Dynamic field messages @@ -237,6 +238,11 @@ message MasterEdition { uint64 write_version = 4; } +message OffchainData { + string url = 1; + string metadata = 2; +} + // RangeRequest and AssetDetailsResponse for data synchronization message RangeRequest { uint64 start_slot = 1; diff --git a/grpc/src/gapfiller.rs b/grpc/src/gapfiller.rs index 7f8b1bec6..31d6ca7b1 100644 --- a/grpc/src/gapfiller.rs +++ b/grpc/src/gapfiller.rs @@ -164,6 +164,8 @@ pub struct AssetDetails { pub edition: ::core::option::Option, #[prost(message, optional, tag = "38")] pub master_edition: ::core::option::Option, + #[prost(message, optional, tag = "39")] + pub offchain_data: ::core::option::Option, } /// Dynamic field messages #[allow(clippy::derive_partial_eq_without_eq)] @@ -298,6 +300,14 @@ pub struct MasterEdition { #[prost(uint64, tag = "4")] pub write_version: u64, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OffchainData { + #[prost(string, tag = "1")] + pub url: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub metadata: ::prost::alloc::string::String, +} /// RangeRequest and AssetDetailsResponse for data synchronization #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 143a382c2..3aefe695a 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -3,10 +3,10 @@ use crate::gapfiller::{ AssetCollection, AssetDetails, AssetLeaf, ChainDataV1, ChainMutability, ClItem, ClLeaf, Creator, DynamicBoolField, DynamicBytesField, DynamicChainMutability, DynamicCreatorsField, DynamicEnumField, DynamicStringField, DynamicUint32Field, DynamicUint64Field, EditionV1, - MasterEdition, OwnerType, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions, - TokenStandard, UpdateVersionValue, UseMethod, Uses, + MasterEdition, OffchainData, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, TokenStandard, UpdateVersionValue, UseMethod, Uses, }; -use entities::models::{CompleteAssetDetails, UpdateVersion, Updated}; +use entities::models::{CompleteAssetDetails, OffChainData, UpdateVersion, Updated}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -74,6 +74,7 @@ impl From for AssetDetails { cl_items: value.cl_items.into_iter().map(ClItem::from).collect(), edition: value.edition.map(|e| e.into()), master_edition: value.master_edition.map(|e| e.into()), + offchain_data: value.offchain_data.map(|e| e.into()), } } } @@ -190,10 +191,23 @@ impl TryFrom for CompleteAssetDetails { .collect::, _>>()?, edition: value.edition.map(TryInto::try_into).transpose()?, master_edition: value.master_edition.map(TryInto::try_into).transpose()?, + offchain_data: value.offchain_data.map(|e| OffChainData { + url: e.url, + metadata: e.metadata, + }), }) } } +impl From for OffchainData { + fn from(value: OffChainData) -> Self { + Self { + url: value.url, + metadata: value.metadata, + } + } +} + impl From for UpdateVersionValue { fn from(value: UpdateVersion) -> Self { match value { diff --git a/nft_ingester/src/bin/migrator/main.rs b/nft_ingester/src/bin/migrator/main.rs index 2ebe0c9de..6775b52fe 100644 --- a/nft_ingester/src/bin/migrator/main.rs +++ b/nft_ingester/src/bin/migrator/main.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use entities::enums::TaskStatus; -use entities::models::Task; +use entities::models::{OffChainData, Task}; use log::{error, info}; use metrics_utils::red::RequestErrorDurationMetrics; use metrics_utils::utils::start_metrics; @@ -16,7 +16,6 @@ use nft_ingester::config::{ use nft_ingester::db_v2::DBClient; use nft_ingester::error::IngesterError; use nft_ingester::init::graceful_stop; -use rocks_db::offchain_data::OffChainData; use rocks_db::{AssetDynamicDetails, Storage}; #[tokio::main(flavor = "multi_thread")] diff --git a/nft_ingester/src/bin/raw_backup/main.rs b/nft_ingester/src/bin/raw_backup/main.rs index b2f859002..f1ffa660b 100644 --- a/nft_ingester/src/bin/raw_backup/main.rs +++ b/nft_ingester/src/bin/raw_backup/main.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use clap::{arg, Parser}; +use entities::models::OffChainData; use metrics_utils::red::RequestErrorDurationMetrics; use nft_ingester::config::init_logger; use rocks_db::column::TypedColumn; @@ -59,10 +60,7 @@ pub async fn main() -> Result<(), IngesterError> { .for_each(|(k, v)| target_storage.db.put_cf(cf, k, v).unwrap()); info!("Done copying raw blocks"); - let cf = &target_storage - .db - .cf_handle(rocks_db::offchain_data::OffChainData::NAME) - .unwrap(); + let cf = &target_storage.db.cf_handle(OffChainData::NAME).unwrap(); info!("Copying offchain data..."); source_storage .asset_offchain_data diff --git a/nft_ingester/src/json_downloader.rs b/nft_ingester/src/json_downloader.rs index a09f08e18..cf9412ca4 100644 --- a/nft_ingester/src/json_downloader.rs +++ b/nft_ingester/src/json_downloader.rs @@ -3,10 +3,11 @@ use crate::config::{ }; use crate::db_v2::{DBClient, UpdatedTask}; use entities::enums::TaskStatus; +use entities::models::OffChainData; use log::{debug, error}; use metrics_utils::{JsonDownloaderMetricsConfig, MetricStatus}; use reqwest::{Client, ClientBuilder}; -use rocks_db::{offchain_data::OffChainData, Storage}; +use rocks_db::Storage; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::task::JoinSet; diff --git a/rocks-db/src/asset.rs b/rocks-db/src/asset.rs index 65ea89286..d22822e26 100644 --- a/rocks-db/src/asset.rs +++ b/rocks-db/src/asset.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use bincode::{deserialize, serialize}; use entities::enums::{ChainMutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass}; -use entities::models::{EditionData, Updated}; +use entities::models::{EditionData, OffChainData, Updated}; use log::{error, warn}; use rocksdb::MergeOperands; use serde::{Deserialize, Serialize}; @@ -21,7 +21,7 @@ pub struct AssetSelectedMaps { pub assets_collection: HashMap, pub assets_owner: HashMap, pub assets_leaf: HashMap, - pub offchain_data: HashMap, + pub offchain_data: HashMap, pub urls: HashMap, pub editions: HashMap, } diff --git a/rocks-db/src/asset_streaming_client.rs b/rocks-db/src/asset_streaming_client.rs index ea85cf7d6..eb70a9e16 100644 --- a/rocks-db/src/asset_streaming_client.rs +++ b/rocks-db/src/asset_streaming_client.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; -use entities::models::{CompleteAssetDetails, UpdateVersion, Updated}; +use entities::models::{CompleteAssetDetails, OffChainData, UpdateVersion, Updated}; use interface::asset_streaming_and_discovery::{ AssetDetailsStream, AssetDetailsStreamer, AsyncError, }; @@ -187,7 +187,7 @@ async fn get_complete_asset_details( (Some(edition), master_edition) } }; - + let url = dynamic_data.url.clone(); Ok(CompleteAssetDetails { pubkey: static_data.pubkey, specification_asset_class: static_data.specification_asset_class, @@ -204,7 +204,7 @@ async fn get_complete_asset_details( onchain_data, creators: dynamic_data.creators, royalty_amount: dynamic_data.royalty_amount, - url: dynamic_data.url, + url: url.clone(), chain_mutability: dynamic_data.chain_mutability, lamports: dynamic_data.lamports, executable: dynamic_data.executable, @@ -270,6 +270,8 @@ async fn get_complete_asset_details( .collect(), edition, master_edition, + offchain_data: Storage::column::(backend.clone(), metrics.clone()) + .get(url.value)?, }) } diff --git a/rocks-db/src/lib.rs b/rocks-db/src/lib.rs index d9243fc87..4ffeec36f 100644 --- a/rocks-db/src/lib.rs +++ b/rocks-db/src/lib.rs @@ -15,7 +15,7 @@ pub use asset::{ }; pub use column::columns; use column::{Column, TypedColumn}; -use entities::models::AssetSignature; +use entities::models::{AssetSignature, OffChainData}; use metrics_utils::red::RequestErrorDurationMetrics; use tokio::sync::Mutex; use tokio::task::JoinSet; @@ -74,7 +74,7 @@ pub struct Storage { pub asset_leaf_data: Column, pub asset_collection_data: Column, pub asset_collection_data_deprecated: Column, - pub asset_offchain_data: Column, + pub asset_offchain_data: Column, pub cl_items: Column, pub cl_leafs: Column, pub bubblegum_slots: Column, @@ -201,7 +201,7 @@ impl Storage { fn create_cf_descriptors() -> Vec { vec![ - Self::new_cf_descriptor::(), + Self::new_cf_descriptor::(), Self::new_cf_descriptor::(), Self::new_cf_descriptor::(), Self::new_cf_descriptor::(), @@ -379,7 +379,7 @@ impl Storage { asset::AssetStaticDetails::merge_keep_existing, ); } - offchain_data::OffChainData::NAME => { + OffChainData::NAME => { cf_options.set_merge_operator_associative( "merge_fn_off_chain_data_keep_existing", asset::AssetStaticDetails::merge_keep_existing, diff --git a/rocks-db/src/offchain_data.rs b/rocks-db/src/offchain_data.rs index 68ac0fba9..876ebf678 100644 --- a/rocks-db/src/offchain_data.rs +++ b/rocks-db/src/offchain_data.rs @@ -1,20 +1,7 @@ use crate::column::TypedColumn; use crate::key_encoders::{decode_string, encode_string}; use crate::Result; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Debug)] -pub enum ChainDataMutability { - Immutable, - Mutable, - Unknown, -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default)] -pub struct OffChainData { - pub url: String, - pub metadata: String, -} +use entities::models::OffChainData; impl TypedColumn for OffChainData { type KeyType = String; diff --git a/tests/setup/src/rocks.rs b/tests/setup/src/rocks.rs index e5c21f38d..5c0e6b439 100644 --- a/tests/setup/src/rocks.rs +++ b/tests/setup/src/rocks.rs @@ -1,14 +1,14 @@ use std::sync::Arc; -use entities::models::Updated; +use entities::models::{OffChainData, Updated}; use rand::{random, Rng}; use solana_sdk::pubkey::Pubkey; use tempfile::TempDir; use metrics_utils::red::RequestErrorDurationMetrics; use rocks_db::{ - asset::AssetCollection, offchain_data::OffChainData, AssetAuthority, AssetDynamicDetails, - AssetOwner, AssetStaticDetails, Storage, + asset::AssetCollection, AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, + Storage, }; use tokio::{sync::Mutex, task::JoinSet}; From 7fd473c6c43e7ba7be633d0d4ce46fa9091d993c Mon Sep 17 00:00:00 2001 From: requesco Date: Thu, 25 Apr 2024 18:42:18 +0300 Subject: [PATCH 14/17] merge --- nft_ingester/src/json_worker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nft_ingester/src/json_worker.rs b/nft_ingester/src/json_worker.rs index ad5fd40dd..5dbbbf990 100644 --- a/nft_ingester/src/json_worker.rs +++ b/nft_ingester/src/json_worker.rs @@ -1,7 +1,7 @@ use crate::config::{setup_config, IngesterConfig, INGESTER_CONFIG_PREFIX}; use async_trait::async_trait; use entities::enums::TaskStatus; -use entities::models::JsonDownloadTask; +use entities::models::{JsonDownloadTask, OffChainData}; use interface::error::JsonDownloaderError; use interface::json::{JsonDownloader, JsonPersister}; use log::{debug, error}; @@ -9,7 +9,7 @@ use metrics_utils::{JsonDownloaderMetricsConfig, MetricStatus}; use postgre_client::tasks::UpdatedTask; use postgre_client::PgClient; use reqwest::{Client, ClientBuilder}; -use rocks_db::{offchain_data::OffChainData, Storage}; +use rocks_db::Storage; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; From 3bb7f6c1f2b550b66c0a54a0502beda20977fd2e Mon Sep 17 00:00:00 2001 From: requesco Date: Thu, 25 Apr 2024 18:48:27 +0300 Subject: [PATCH 15/17] add saving offchain data --- rocks-db/src/batch_client.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rocks-db/src/batch_client.rs b/rocks-db/src/batch_client.rs index 68d08e54d..545b1c1a2 100644 --- a/rocks-db/src/batch_client.rs +++ b/rocks-db/src/batch_client.rs @@ -422,6 +422,13 @@ impl Storage { &TokenMetadataEdition::MasterEdition(master_edition), )?; } + if let Some(offchain_data) = data.offchain_data { + self.asset_offchain_data.merge_with_batch_cbor( + &mut batch, + offchain_data.url.clone(), + &offchain_data, + )?; + } self.write_batch(batch).await?; Ok(()) } From b68769c6ca33aaeb173b00adb593f6460b5ed14a Mon Sep 17 00:00:00 2001 From: requesco Date: Thu, 25 Apr 2024 19:08:20 +0300 Subject: [PATCH 16/17] tests fix --- nft_ingester/tests/api_tests.rs | 3 +-- nft_ingester/tests/bubblegum_tests.rs | 3 ++- nft_ingester/tests/decompress.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 47234188c..8b22264cf 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -15,7 +15,7 @@ mod tests { AssetList, TokenAccountsList, TransactionSignatureList, }; use entities::api_req_params::{DisplayOptions, GetAssetSignatures, GetTokenAccounts, Options}; - use entities::models::{AssetSignature, AssetSignatureKey}; + use entities::models::{AssetSignature, AssetSignatureKey, OffChainData}; use entities::{ api_req_params::{ AssetSortBy, AssetSortDirection, AssetSorting, GetAsset, GetAssetsByAuthority, @@ -40,7 +40,6 @@ mod tests { }; use rocks_db::{ columns::{Mint, TokenAccount}, - offchain_data::OffChainData, AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, }; use serde_json::{json, Value}; diff --git a/nft_ingester/tests/bubblegum_tests.rs b/nft_ingester/tests/bubblegum_tests.rs index 2f5aff977..b5d942527 100644 --- a/nft_ingester/tests/bubblegum_tests.rs +++ b/nft_ingester/tests/bubblegum_tests.rs @@ -2,6 +2,7 @@ #[cfg(feature = "integration_tests")] mod tests { use entities::api_req_params::{GetAsset, GetAssetProof, Options}; + use entities::models::OffChainData; use metrics_utils::red::RequestErrorDurationMetrics; use metrics_utils::{ApiMetricsConfig, BackfillerMetricsConfig, IngesterMetricsConfig}; use nft_ingester::config::JsonMiddlewareConfig; @@ -12,7 +13,7 @@ mod tests { buffer::Buffer, transaction_ingester::{self, BackfillTransactionIngester}, }; - use rocks_db::{bubblegum_slots::BubblegumSlotGetter, offchain_data::OffChainData, Storage}; + use rocks_db::{bubblegum_slots::BubblegumSlotGetter, Storage}; use std::fs::File; use std::io::{self, Read}; use std::sync::Arc; diff --git a/nft_ingester/tests/decompress.rs b/nft_ingester/tests/decompress.rs index 8d83afa29..e8aaf7df1 100644 --- a/nft_ingester/tests/decompress.rs +++ b/nft_ingester/tests/decompress.rs @@ -4,6 +4,7 @@ mod tests { use blockbuster::token_metadata::accounts::Metadata; use blockbuster::token_metadata::types::{Collection, Creator, Key}; use entities::api_req_params::{GetAsset, Options}; + use entities::models::OffChainData; use metrics_utils::red::RequestErrorDurationMetrics; use metrics_utils::{ApiMetricsConfig, BackfillerMetricsConfig, IngesterMetricsConfig}; use nft_ingester::config::JsonMiddlewareConfig; @@ -20,7 +21,6 @@ mod tests { use rocks_db::{ bubblegum_slots::BubblegumSlotGetter, columns::{Mint, TokenAccount}, - offchain_data::OffChainData, Storage, }; use solana_sdk::pubkey::Pubkey; From 294a910e4ed2a2285963b5a0605b03938219e30e Mon Sep 17 00:00:00 2001 From: requesco Date: Thu, 25 Apr 2024 19:18:20 +0300 Subject: [PATCH 17/17] merge --- nft_ingester/src/bubblegum_updates_processor.rs | 4 ++-- rocks-db/src/transaction.rs | 3 +-- rocks-db/src/transaction_client.rs | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/nft_ingester/src/bubblegum_updates_processor.rs b/nft_ingester/src/bubblegum_updates_processor.rs index 324e87e9d..9e8b369af 100644 --- a/nft_ingester/src/bubblegum_updates_processor.rs +++ b/nft_ingester/src/bubblegum_updates_processor.rs @@ -14,7 +14,8 @@ use entities::enums::{ TokenStandard, UseMethod, }; use entities::models::{ - BufferedTransaction, SignatureWithSlot, Task, UpdateVersion, Updated, UrlWithStatus, + BufferedTransaction, OffChainData, SignatureWithSlot, Task, UpdateVersion, Updated, + UrlWithStatus, }; use entities::models::{ChainDataV1, Creator, Uses}; use entities::rollup::BatchMintInstruction; @@ -29,7 +30,6 @@ use rocks_db::asset::AssetOwner; use rocks_db::asset::{ AssetAuthority, AssetCollection, AssetDynamicDetails, AssetLeaf, AssetStaticDetails, }; -use rocks_db::offchain_data::OffChainData; use rocks_db::transaction::{ AssetDynamicUpdate, AssetUpdate, AssetUpdateEvent, InstructionResult, TransactionResult, TreeUpdate, diff --git a/rocks-db/src/transaction.rs b/rocks-db/src/transaction.rs index 204a407ba..2a72f5cfe 100644 --- a/rocks-db/src/transaction.rs +++ b/rocks-db/src/transaction.rs @@ -1,12 +1,11 @@ use async_trait::async_trait; -use entities::models::Task; use entities::models::{BufferedTransaction, SignatureWithSlot}; +use entities::models::{OffChainData, Task}; use interface::error::StorageError; use solana_sdk::pubkey::Pubkey; use spl_account_compression::events::ChangeLogEventV1; use spl_account_compression::state::PathNode; -use crate::offchain_data::OffChainData; use crate::{ asset::{AssetCollection, AssetLeaf}, AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, diff --git a/rocks-db/src/transaction_client.rs b/rocks-db/src/transaction_client.rs index 52c873455..9842368c9 100644 --- a/rocks-db/src/transaction_client.rs +++ b/rocks-db/src/transaction_client.rs @@ -1,9 +1,8 @@ use async_trait::async_trait; -use entities::models::SignatureWithSlot; +use entities::models::{OffChainData, SignatureWithSlot}; use interface::error::StorageError; use solana_sdk::pubkey::Pubkey; -use crate::offchain_data::OffChainData; use crate::parameters::Parameter; use crate::{ parameters,