diff --git a/Cargo.lock b/Cargo.lock index 6bc870b8f..61f7b1d9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2718,12 +2718,14 @@ dependencies = [ name = "grpc" version = "0.1.0" dependencies = [ + "async-trait", "entities", "futures", "interface", "mockall 0.12.1", "prost 0.12.3", "solana-sdk", + "thiserror", "tokio", "tonic 0.10.2", "tonic-build 0.10.2", diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index 5c290f7f6..7e9effe70 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -4,7 +4,7 @@ use std::string::ToString; use std::sync::Arc; use entities::api_req_params::{AssetSortDirection, Options}; -use entities::models::AssetSignatureWithPagination; +use entities::models::{AssetSignatureWithPagination, OffChainData}; use interface::asset_sigratures::AssetSignaturesGetter; use interface::json::{JsonDownloader, JsonPersister}; use log::error; @@ -17,7 +17,6 @@ use rocks_db::asset::{ AssetAuthority, AssetCollection, AssetDynamicDetails, AssetLeaf, AssetOwner, AssetSelectedMaps, AssetStaticDetails, }; -use rocks_db::offchain_data::OffChainData; use rocks_db::Storage; use tokio::sync::Mutex; use tokio::task::{JoinError, JoinSet}; diff --git a/entities/src/models.rs b/entities/src/models.rs index eacaa4223..90d759279 100644 --- a/entities/src/models.rs +++ b/entities/src/models.rs @@ -67,6 +67,12 @@ pub struct Creator { pub creator_share: u8, } +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct OffChainData { + pub url: String, + pub metadata: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct CompleteAssetDetails { // From AssetStaticDetails @@ -92,6 +98,13 @@ pub struct CompleteAssetDetails { pub lamports: Option>, pub executable: Option>, pub metadata_owner: Option>, + pub raw_name: Option>, + pub plugins: Option>, + pub unknown_plugins: Option>, + pub rent_epoch: Option>, + pub num_minted: Option>, + pub current_size: Option>, + pub plugins_json_version: Option>, // From AssetAuthority as Tuple pub authority: Updated, @@ -113,6 +126,9 @@ pub struct CompleteAssetDetails { // TokenMetadataEdition pub edition: Option, pub master_edition: Option, + + // OffChainData + pub offchain_data: Option, } /// Leaf information about compressed asset diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index f6646ff7f..351d27b66 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -12,9 +12,11 @@ futures = "0.3.29" solana-sdk = "~1.17" interface = { path = "../interface" } entities = { path = "../entities" } +tokio = { version = "1.35.1", features = ["full"] } +async-trait = "0.1.74" +thiserror = "1.0.31" [dev-dependencies] -tokio = { version = "1.35.1", features = ["full"] } mockall = "0.12.0" [build-dependencies] diff --git a/grpc/proto/gap_filler.proto b/grpc/proto/gap_filler.proto index cdfd8aa1d..21aff6bef 100644 --- a/grpc/proto/gap_filler.proto +++ b/grpc/proto/gap_filler.proto @@ -58,6 +58,16 @@ enum UseMethod { SINGLE = 2; } +enum UpdateVersion { + SEQUENCE = 0; + WRITE_VERSION = 1; +} + +message UpdateVersionValue { + UpdateVersion type = 1; + uint64 value = 2; +} + message Uses { UseMethod use_method = 1; uint64 remaining = 2; @@ -71,7 +81,7 @@ message ChainDataV1 { bool primary_sale_happened = 4; TokenStandard token_standard = 5; Uses uses = 6; - google.protobuf.UInt64Value seq_updated = 7; + UpdateVersionValue update_version = 7; uint64 slot_updated = 8; } @@ -82,7 +92,7 @@ message AssetLeaf { google.protobuf.BytesValue data_hash = 4; google.protobuf.BytesValue creator_hash = 5; google.protobuf.UInt64Value leaf_seq = 6; - google.protobuf.UInt64Value seq_updated = 7; + UpdateVersionValue update_version = 7; uint64 slot_updated = 8; } @@ -90,7 +100,7 @@ message AssetCollection { bytes collection = 1; bool is_collection_verified = 2; google.protobuf.UInt64Value collection_seq = 3; - google.protobuf.UInt64Value seq_updated = 4; + UpdateVersionValue update_version = 4; uint64 slot_updated = 5; } @@ -127,65 +137,74 @@ message AssetDetails { DynamicUint64Field lamports = 21; DynamicBoolField executable = 22; DynamicStringField metadata_owner = 23; - - AssetLeaf asset_leaf = 24; - AssetCollection collection = 25; - ChainDataV1 chain_data = 26; - - ClLeaf cl_leaf = 27; - repeated ClItem cl_items = 28; + DynamicStringField url = 24; + DynamicStringField raw_name = 25; + DynamicStringField plugins = 26; + DynamicStringField unknown_plugins = 27; + DynamicUint64Field rent_epoch = 28; + DynamicUint32Field num_minted = 29; + DynamicUint32Field current_size = 30; + DynamicUint32Field plugins_json_version = 31; + + AssetLeaf asset_leaf = 32; + AssetCollection collection = 33; + ChainDataV1 chain_data = 34; + + ClLeaf cl_leaf = 35; + repeated ClItem cl_items = 36; // From TokenMetadataEdition - EditionV1 edition = 29; - MasterEdition master_edition = 30; + EditionV1 edition = 37; + MasterEdition master_edition = 38; + OffchainData offchain_data = 39; } // Dynamic field messages message DynamicBoolField { bool value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicUint64Field { uint64 value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicUint32Field { uint32 value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicBytesField { bytes value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicStringField { string value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicChainMutability { ChainMutability value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicEnumField { OwnerType value = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } message DynamicCreatorsField { repeated Creator creators = 1; - google.protobuf.UInt64Value seq_updated = 2; + UpdateVersionValue update_version = 2; uint64 slot_updated = 3; } @@ -219,6 +238,11 @@ message MasterEdition { uint64 write_version = 4; } +message OffchainData { + string url = 1; + string metadata = 2; +} + // RangeRequest and AssetDetailsResponse for data synchronization message RangeRequest { uint64 start_slot = 1; diff --git a/grpc/src/client.rs b/grpc/src/client.rs new file mode 100644 index 000000000..043653d0a --- /dev/null +++ b/grpc/src/client.rs @@ -0,0 +1,56 @@ +use crate::error::GrpcError; +use crate::gapfiller::gap_filler_service_client::GapFillerServiceClient; +use crate::gapfiller::RangeRequest; +use async_trait::async_trait; +use futures::StreamExt; +use interface::asset_streaming_and_discovery::{ + AssetDetailsConsumer, AssetDetailsStreamNonSync, AsyncError, PeerDiscovery, +}; +use std::str::FromStr; +use tonic::transport::{Channel, Uri}; +use tonic::{Code, Status}; + +pub struct Client { + inner: GapFillerServiceClient, +} + +impl Client { + pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result { + let url = Uri::from_str(peer_discovery.get_gapfiller_peer_addr().as_str()) + .map_err(|e| GrpcError::UriCreate(e.to_string()))?; + let channel = Channel::builder(url).connect().await?; + + Ok(Self { + inner: GapFillerServiceClient::new(channel), + }) + } +} + +#[async_trait] +impl AssetDetailsConsumer for Client { + async fn get_consumable_stream_in_range( + &mut self, + start_slot: u64, + end_slot: u64, + ) -> Result { + Ok(Box::pin( + self.inner + .get_assets_updated_within(RangeRequest { + start_slot, + end_slot, + }) + .await + .map_err(|e| Box::new(e) as AsyncError)? + .into_inner() + .map(|stream| { + stream + .and_then(|asset_details| { + asset_details + .try_into() + .map_err(|e: GrpcError| Status::new(Code::Internal, e.to_string())) + }) + .map_err(|e| Box::new(e) as AsyncError) + }), + )) + } +} diff --git a/grpc/src/error/mod.rs b/grpc/src/error/mod.rs new file mode 100644 index 000000000..cb3db906a --- /dev/null +++ b/grpc/src/error/mod.rs @@ -0,0 +1,21 @@ +use thiserror::Error; +use tonic::transport::Error; + +#[derive(Error, Debug, PartialEq, Eq)] +pub enum GrpcError { + #[error("Pubkey from: {0:?}")] + PubkeyFrom(Vec), + #[error("Missing field: {0}")] + MissingField(String), + #[error("Cannot cast enum: {0} {1}")] + EnumCast(String, String), + #[error("UriCreate: {0}")] + UriCreate(String), + #[error("TonicTransport: {0}")] + TonicTransport(String), +} +impl From for GrpcError { + fn from(value: Error) -> Self { + Self::TonicTransport(value.to_string()) + } +} diff --git a/grpc/src/gapfiller.rs b/grpc/src/gapfiller.rs index 91677dfd5..31d6ca7b1 100644 --- a/grpc/src/gapfiller.rs +++ b/grpc/src/gapfiller.rs @@ -1,5 +1,13 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct UpdateVersionValue { + #[prost(enumeration = "UpdateVersion", tag = "1")] + pub r#type: i32, + #[prost(uint64, tag = "2")] + pub value: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct Uses { #[prost(enumeration = "UseMethod", tag = "1")] pub use_method: i32, @@ -25,7 +33,7 @@ pub struct ChainDataV1 { #[prost(message, optional, tag = "6")] pub uses: ::core::option::Option, #[prost(message, optional, tag = "7")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "8")] pub slot_updated: u64, } @@ -45,7 +53,7 @@ pub struct AssetLeaf { #[prost(message, optional, tag = "6")] pub leaf_seq: ::core::option::Option, #[prost(message, optional, tag = "7")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "8")] pub slot_updated: u64, } @@ -59,7 +67,7 @@ pub struct AssetCollection { #[prost(message, optional, tag = "3")] pub collection_seq: ::core::option::Option, #[prost(message, optional, tag = "4")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "5")] pub slot_updated: u64, } @@ -126,20 +134,38 @@ pub struct AssetDetails { #[prost(message, optional, tag = "23")] pub metadata_owner: ::core::option::Option, #[prost(message, optional, tag = "24")] - pub asset_leaf: ::core::option::Option, + pub url: ::core::option::Option, #[prost(message, optional, tag = "25")] - pub collection: ::core::option::Option, + pub raw_name: ::core::option::Option, #[prost(message, optional, tag = "26")] - pub chain_data: ::core::option::Option, + pub plugins: ::core::option::Option, #[prost(message, optional, tag = "27")] + pub unknown_plugins: ::core::option::Option, + #[prost(message, optional, tag = "28")] + pub rent_epoch: ::core::option::Option, + #[prost(message, optional, tag = "29")] + pub num_minted: ::core::option::Option, + #[prost(message, optional, tag = "30")] + pub current_size: ::core::option::Option, + #[prost(message, optional, tag = "31")] + pub plugins_json_version: ::core::option::Option, + #[prost(message, optional, tag = "32")] + pub asset_leaf: ::core::option::Option, + #[prost(message, optional, tag = "33")] + pub collection: ::core::option::Option, + #[prost(message, optional, tag = "34")] + pub chain_data: ::core::option::Option, + #[prost(message, optional, tag = "35")] pub cl_leaf: ::core::option::Option, - #[prost(message, repeated, tag = "28")] + #[prost(message, repeated, tag = "36")] pub cl_items: ::prost::alloc::vec::Vec, /// From TokenMetadataEdition - #[prost(message, optional, tag = "29")] + #[prost(message, optional, tag = "37")] pub edition: ::core::option::Option, - #[prost(message, optional, tag = "30")] + #[prost(message, optional, tag = "38")] pub master_edition: ::core::option::Option, + #[prost(message, optional, tag = "39")] + pub offchain_data: ::core::option::Option, } /// Dynamic field messages #[allow(clippy::derive_partial_eq_without_eq)] @@ -148,7 +174,7 @@ pub struct DynamicBoolField { #[prost(bool, tag = "1")] pub value: bool, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -158,7 +184,7 @@ pub struct DynamicUint64Field { #[prost(uint64, tag = "1")] pub value: u64, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -168,7 +194,7 @@ pub struct DynamicUint32Field { #[prost(uint32, tag = "1")] pub value: u32, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -178,7 +204,7 @@ pub struct DynamicBytesField { #[prost(bytes = "vec", tag = "1")] pub value: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -188,7 +214,7 @@ pub struct DynamicStringField { #[prost(string, tag = "1")] pub value: ::prost::alloc::string::String, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -198,7 +224,7 @@ pub struct DynamicChainMutability { #[prost(enumeration = "ChainMutability", tag = "1")] pub value: i32, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -208,7 +234,7 @@ pub struct DynamicEnumField { #[prost(enumeration = "OwnerType", tag = "1")] pub value: i32, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -218,7 +244,7 @@ pub struct DynamicCreatorsField { #[prost(message, repeated, tag = "1")] pub creators: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "2")] - pub seq_updated: ::core::option::Option, + pub update_version: ::core::option::Option, #[prost(uint64, tag = "3")] pub slot_updated: u64, } @@ -274,6 +300,14 @@ pub struct MasterEdition { #[prost(uint64, tag = "4")] pub write_version: u64, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OffchainData { + #[prost(string, tag = "1")] + pub url: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub metadata: ::prost::alloc::string::String, +} /// RangeRequest and AssetDetailsResponse for data synchronization #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -535,6 +569,32 @@ impl UseMethod { } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum UpdateVersion { + Sequence = 0, + WriteVersion = 1, +} +impl UpdateVersion { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + UpdateVersion::Sequence => "SEQUENCE", + UpdateVersion::WriteVersion => "WRITE_VERSION", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SEQUENCE" => Some(Self::Sequence), + "WRITE_VERSION" => Some(Self::WriteVersion), + _ => None, + } + } +} /// Generated client implementations. pub mod gap_filler_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] diff --git a/grpc/src/lib.rs b/grpc/src/lib.rs index b50735ba6..19f7d755d 100644 --- a/grpc/src/lib.rs +++ b/grpc/src/lib.rs @@ -1,3 +1,5 @@ +pub mod client; +pub mod error; pub mod gapfiller; mod mapper; pub mod service; diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index a0914a21c..3aefe695a 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -1,23 +1,26 @@ +use crate::error::GrpcError; use crate::gapfiller::{ AssetCollection, AssetDetails, AssetLeaf, ChainDataV1, ChainMutability, ClItem, ClLeaf, Creator, DynamicBoolField, DynamicBytesField, DynamicChainMutability, DynamicCreatorsField, DynamicEnumField, DynamicStringField, DynamicUint32Field, DynamicUint64Field, EditionV1, - MasterEdition, OwnerType, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions, - TokenStandard, UseMethod, Uses, + MasterEdition, OffchainData, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, TokenStandard, UpdateVersionValue, UseMethod, Uses, }; -use entities::models::{CompleteAssetDetails, Updated}; +use entities::models::{CompleteAssetDetails, OffChainData, UpdateVersion, Updated}; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; + impl From for AssetDetails { fn from(value: CompleteAssetDetails) -> Self { let delegate = value.delegate.value.map(|key| DynamicBytesField { value: key.to_bytes().to_vec(), slot_updated: value.delegate.slot_updated, - seq_updated: value.delegate.get_upd_ver_seq(), + update_version: value.delegate.update_version.map(Into::into), }); let owner = value.owner.value.map(|key| DynamicBytesField { value: key.to_bytes().to_vec(), slot_updated: value.owner.slot_updated, - seq_updated: value.owner.get_upd_ver_seq(), + update_version: value.owner.update_version.map(Into::into), }); let owner_delegate_seq = value @@ -26,7 +29,7 @@ impl From for AssetDetails { .map(|seq| DynamicUint64Field { value: seq, slot_updated: value.owner_delegate_seq.slot_updated, - seq_updated: value.owner_delegate_seq.get_upd_ver_seq(), + update_version: value.owner_delegate_seq.update_version.map(Into::into), }); Self { @@ -56,6 +59,14 @@ impl From for AssetDetails { lamports: value.lamports.map(|v| v.into()), executable: value.executable.map(|v| v.into()), metadata_owner: value.metadata_owner.map(|v| v.into()), + url: Some(value.url.into()), + raw_name: value.raw_name.map(Into::into), + plugins: value.plugins.map(Into::into), + unknown_plugins: value.unknown_plugins.map(Into::into), + rent_epoch: value.rent_epoch.map(Into::into), + num_minted: value.num_minted.map(Into::into), + current_size: value.current_size.map(Into::into), + plugins_json_version: value.plugins_json_version.map(Into::into), asset_leaf: value.asset_leaf.map(|v| v.into()), collection: value.collection.map(|v| v.into()), chain_data: value.onchain_data.map(|v| v.into()), @@ -63,7 +74,161 @@ impl From for AssetDetails { cl_items: value.cl_items.into_iter().map(ClItem::from).collect(), edition: value.edition.map(|e| e.into()), master_edition: value.master_edition.map(|e| e.into()), + offchain_data: value.offchain_data.map(|e| e.into()), + } + } +} + +impl TryFrom for CompleteAssetDetails { + type Error = GrpcError; + + fn try_from(value: AssetDetails) -> Result { + let owner_delegate_seq = value + .owner_delegate_seq + .map(|val| { + Updated::new( + val.slot_updated, + val.update_version.map(Into::into), + Some(val.value), + ) + }) + .unwrap_or_default(); + + Ok(Self { + pubkey: Pubkey::try_from(value.pubkey).map_err(GrpcError::PubkeyFrom)?, + specification_asset_class: entities::enums::SpecificationAssetClass::from( + SpecificationAssetClass::try_from(value.specification_asset_class).map_err( + |e| GrpcError::EnumCast("SpecificationAssetClass".to_string(), e.to_string()), + )?, + ), + royalty_target_type: entities::enums::RoyaltyTargetType::from( + RoyaltyTargetType::try_from(value.royalty_target_type).map_err(|e| { + GrpcError::EnumCast("RoyaltyTargetType".to_string(), e.to_string()) + })?, + ), + slot_created: value.slot_created, + edition_address: value + .edition_address + .map(TryInto::try_into) + .transpose() + .map_err(GrpcError::PubkeyFrom)?, + is_compressible: value + .is_compressible + .map(Into::into) + .ok_or(GrpcError::MissingField("is_compressible".to_string()))?, + is_compressed: value + .is_compressed + .map(Into::into) + .ok_or(GrpcError::MissingField("is_compressed".to_string()))?, + is_frozen: value + .is_frozen + .map(Into::into) + .ok_or(GrpcError::MissingField("is_frozen".to_string()))?, + supply: value.supply.map(Into::into), + seq: value.seq.map(Into::into), + is_burnt: value + .is_burnt + .map(Into::into) + .ok_or(GrpcError::MissingField("is_burnt".to_string()))?, + was_decompressed: value + .was_decompressed + .map(Into::into) + .ok_or(GrpcError::MissingField("was_decompressed".to_string()))?, + creators: value + .creators + .map(TryInto::try_into) + .transpose()? + .ok_or(GrpcError::MissingField("creators".to_string()))?, + royalty_amount: value + .royalty_amount + .map(Into::into) + .ok_or(GrpcError::MissingField("royalty_amount".to_string()))?, + url: value + .url + .map(Into::into) + .ok_or(GrpcError::MissingField("url".to_string()))?, + chain_mutability: value.chain_mutability.map(TryInto::try_into).transpose()?, + lamports: value.lamports.map(Into::into), + executable: value.executable.map(Into::into), + metadata_owner: value.metadata_owner.map(Into::into), + raw_name: value.raw_name.map(Into::into), + plugins: value.plugins.map(Into::into), + unknown_plugins: value.unknown_plugins.map(Into::into), + rent_epoch: value.rent_epoch.map(Into::into), + num_minted: value.num_minted.map(Into::into), + current_size: value.current_size.map(Into::into), + plugins_json_version: value.plugins_json_version.map(Into::into), + authority: value + .authority + .map(TryInto::try_into) + .transpose()? + .ok_or(GrpcError::MissingField("authority".to_string()))?, + // unwrap_or_default used for fields with Update> type + owner: value + .owner + .map(TryInto::try_into) + .transpose()? + .unwrap_or_default(), + delegate: value + .delegate + .map(TryInto::try_into) + .transpose()? + .unwrap_or_default(), + owner_type: value + .owner_type + .map(TryInto::try_into) + .transpose()? + .ok_or(GrpcError::MissingField("owner_type".to_string()))?, + owner_delegate_seq, + asset_leaf: value.asset_leaf.map(TryInto::try_into).transpose()?, + collection: value.collection.map(TryInto::try_into).transpose()?, + onchain_data: value.chain_data.map(TryInto::try_into).transpose()?, + cl_leaf: value.cl_leaf.map(TryInto::try_into).transpose()?, + cl_items: value + .cl_items + .into_iter() + .map(TryInto::::try_into) + .collect::, _>>()?, + edition: value.edition.map(TryInto::try_into).transpose()?, + master_edition: value.master_edition.map(TryInto::try_into).transpose()?, + offchain_data: value.offchain_data.map(|e| OffChainData { + url: e.url, + metadata: e.metadata, + }), + }) + } +} + +impl From for OffchainData { + fn from(value: OffChainData) -> Self { + Self { + url: value.url, + metadata: value.metadata, + } + } +} + +impl From for UpdateVersionValue { + fn from(value: UpdateVersion) -> Self { + match value { + UpdateVersion::Sequence(seq) => Self { + r#type: crate::gapfiller::UpdateVersion::Sequence.into(), + value: seq, + }, + UpdateVersion::WriteVersion(wv) => Self { + r#type: crate::gapfiller::UpdateVersion::WriteVersion.into(), + value: wv, + }, + } + } +} + +impl From for UpdateVersion { + fn from(value: UpdateVersionValue) -> Self { + if value.r#type == crate::gapfiller::UpdateVersion::Sequence as i32 { + return Self::Sequence(value.value); } + Self::WriteVersion(value.value) } } @@ -72,7 +237,7 @@ impl From> for DynamicBoolField { Self { value: value.value, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -82,7 +247,27 @@ impl From> for DynamicUint64Field { Self { value: value.value, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), + } + } +} + +impl From> for DynamicUint32Field { + fn from(value: Updated) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + } + } +} + +impl From for Updated { + fn from(value: DynamicUint32Field) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), } } } @@ -92,7 +277,7 @@ impl From> for DynamicStringField { Self { value: value.clone().value, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -102,7 +287,7 @@ impl From> for DynamicUint32Field { Self { value: value.value as u32, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -112,7 +297,7 @@ impl From> for DynamicBytesField { Self { value: value.value.to_bytes().to_vec(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -122,7 +307,7 @@ impl From> for DynamicEnumField { Self { value: OwnerType::from(value.value).into(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -132,7 +317,7 @@ impl From> for DynamicChainMutability Self { value: ChainMutability::from(value.value).into(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -151,7 +336,7 @@ impl From>> for DynamicCreatorsField { Self { creators: value.value.iter().map(|v| v.into()).collect(), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -166,11 +351,166 @@ impl From> for AssetLeaf { creator_hash: value.value.creator_hash.map(|h| h.to_bytes().to_vec()), leaf_seq: value.value.leaf_seq, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), + } + } +} + +impl From for Updated { + fn from(value: DynamicBoolField) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + } + } +} + +impl From for Updated { + fn from(value: DynamicUint64Field) -> Self { + Self { + value: value.value, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + } + } +} + +impl From for Updated { + fn from(value: DynamicUint32Field) -> Self { + Self { + value: value.value as u16, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + } + } +} + +impl From for Updated { + fn from(value: DynamicStringField) -> Self { + Self { + value: value.clone().value, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), } } } +impl TryFrom for Updated> { + type Error = GrpcError; + + fn try_from(value: DynamicBytesField) -> Result { + Ok(Self::new( + value.slot_updated, + value.update_version.map(Into::into), + Some(Pubkey::try_from(value.value).map_err(GrpcError::PubkeyFrom)?), + )) + } +} + +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: DynamicBytesField) -> Result { + Ok(Self { + value: Pubkey::try_from(value.value).map_err(GrpcError::PubkeyFrom)?, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + }) + } +} + +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: DynamicEnumField) -> Result { + Ok(Self { + value: entities::enums::OwnerType::from( + OwnerType::try_from(value.value) + .map_err(|e| GrpcError::EnumCast("OwnerType".to_string(), e.to_string()))?, + ), + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + }) + } +} + +impl TryFrom for entities::models::Creator { + type Error = GrpcError; + + fn try_from(value: Creator) -> Result { + Ok(Self { + creator: Pubkey::try_from(value.creator).map_err(GrpcError::PubkeyFrom)?, + creator_verified: value.creator_verified, + creator_share: value.creator_share as u8, + }) + } +} +impl TryFrom for Updated> { + type Error = GrpcError; + + fn try_from(value: DynamicCreatorsField) -> Result { + Ok(Self { + value: value + .creators + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + }) + } +} + +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: AssetLeaf) -> Result { + Ok(Self { + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + value: entities::models::AssetLeaf { + tree_id: Pubkey::try_from(value.tree_id).map_err(GrpcError::PubkeyFrom)?, + leaf: value.leaf.clone(), + nonce: value.nonce, + data_hash: value + .data_hash + .map(|h| { + Ok::<_, GrpcError>(Hash::from( + <[u8; 32]>::try_from(h).map_err(GrpcError::PubkeyFrom)?, + )) + }) + .transpose()?, + creator_hash: value + .creator_hash + .map(|h| { + Ok::<_, GrpcError>(Hash::from( + <[u8; 32]>::try_from(h).map_err(GrpcError::PubkeyFrom)?, + )) + }) + .transpose()?, + leaf_seq: value.leaf_seq, + }, + }) + } +} + +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: DynamicChainMutability) -> Result { + Ok(Self { + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + value: entities::enums::ChainMutability::from( + ChainMutability::try_from(value.value).map_err(|e| { + GrpcError::EnumCast("ChainMutability".to_string(), e.to_string()) + })?, + ), + }) + } +} + impl From for ClLeaf { fn from(value: entities::models::ClLeaf) -> Self { Self { @@ -202,7 +542,7 @@ impl From> for AssetCollection { is_collection_verified: value.value.is_collection_verified, collection_seq: value.value.collection_seq, slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -222,7 +562,7 @@ impl From> for ChainDataV1 { .unwrap_or_default(), uses: value.clone().value.uses.map(|v| v.into()), slot_updated: value.slot_updated, - seq_updated: value.get_upd_ver_seq(), + update_version: value.update_version.map(Into::into), } } } @@ -259,6 +599,113 @@ impl From for EditionV1 { } } +impl TryFrom for entities::models::MasterEdition { + type Error = GrpcError; + + fn try_from(value: MasterEdition) -> Result { + Ok(Self { + key: Pubkey::try_from(value.key).map_err(GrpcError::PubkeyFrom)?, + supply: value.supply, + max_supply: value.max_supply, + write_version: value.write_version, + }) + } +} + +impl TryFrom for entities::models::EditionV1 { + type Error = GrpcError; + + fn try_from(value: EditionV1) -> Result { + Ok(Self { + key: Pubkey::try_from(value.key).map_err(GrpcError::PubkeyFrom)?, + parent: Pubkey::try_from(value.parent).map_err(GrpcError::PubkeyFrom)?, + edition: value.edition, + write_version: value.write_version, + }) + } +} + +impl TryFrom for entities::models::ClLeaf { + type Error = GrpcError; + + fn try_from(value: ClLeaf) -> Result { + Ok(Self { + cli_leaf_idx: value.cli_leaf_idx, + cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(GrpcError::PubkeyFrom)?, + cli_node_idx: value.cli_node_idx, + }) + } +} + +impl TryFrom for entities::models::ClItem { + type Error = GrpcError; + + fn try_from(value: ClItem) -> Result { + Ok(Self { + cli_leaf_idx: value.cli_leaf_idx, + cli_seq: value.cli_seq, + cli_level: value.cli_level, + cli_hash: value.cli_hash, + cli_tree_key: Pubkey::try_from(value.cli_tree_key).map_err(GrpcError::PubkeyFrom)?, + cli_node_idx: value.cli_node_idx, + slot_updated: value.slot_updated, + }) + } +} + +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: AssetCollection) -> Result { + Ok(Self { + value: entities::models::AssetCollection { + collection: Pubkey::try_from(value.collection).map_err(GrpcError::PubkeyFrom)?, + is_collection_verified: value.is_collection_verified, + collection_seq: value.collection_seq, + }, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + }) + } +} + +impl TryFrom for Updated { + type Error = GrpcError; + + fn try_from(value: ChainDataV1) -> Result { + Ok(Self { + value: entities::models::ChainDataV1 { + name: value.name.clone(), + symbol: value.symbol.clone(), + edition_nonce: value.edition_nonce.map(|v| v as u8), + primary_sale_happened: value.primary_sale_happened, + token_standard: Some(entities::enums::TokenStandard::from( + TokenStandard::try_from(value.token_standard).map_err(|e| { + GrpcError::EnumCast("TokenStandard".to_string(), e.to_string()) + })?, + )), + uses: value.uses.map(TryInto::try_into).transpose()?, + }, + slot_updated: value.slot_updated, + update_version: value.update_version.map(Into::into), + }) + } +} + +impl TryFrom for entities::models::Uses { + type Error = GrpcError; + + fn try_from(value: Uses) -> Result { + Ok(Self { + use_method: entities::enums::UseMethod::from( + UseMethod::try_from(value.use_method) + .map_err(|e| GrpcError::EnumCast("UseMethod".to_string(), e.to_string()))?, + ), + remaining: value.remaining, + total: value.total, + }) + } +} macro_rules! impl_from_enum { ($src:ty, $dst:ident, $($variant:ident),*) => { impl From<$src> for $dst { @@ -270,6 +717,16 @@ macro_rules! impl_from_enum { } } } + + impl From<$dst> for $src { + fn from(value: $dst) -> Self { + match value { + $( + $dst::$variant => <$src>::$variant, + )* + } + } + } }; } diff --git a/interface/src/asset_streaming_and_discovery.rs b/interface/src/asset_streaming_and_discovery.rs index c1620e0fd..fd6150a54 100644 --- a/interface/src/asset_streaming_and_discovery.rs +++ b/interface/src/asset_streaming_and_discovery.rs @@ -5,8 +5,9 @@ use mockall::automock; use std::pin::Pin; pub type AsyncError = Box; -pub type AssetDetailsStream = - Pin> + Send + Sync>>; +type AssetResult = Result; +pub type AssetDetailsStream = Pin + Send + Sync>>; +pub type AssetDetailsStreamNonSync = Pin + Send>>; #[automock] #[async_trait] @@ -18,6 +19,16 @@ pub trait AssetDetailsStreamer: Send + Sync { ) -> Result; } +#[automock] +#[async_trait] +pub trait AssetDetailsConsumer: Send { + async fn get_consumable_stream_in_range( + &mut self, + start_slot: u64, + end_slot: u64, + ) -> Result; +} + #[automock] pub trait PeerDiscovery: Send + Sync { fn get_gapfiller_peer_addr(&self) -> String; diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index f87a67bd6..a5720de48 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -17,6 +17,7 @@ use tokio::task::JoinSet; use tokio::time::Instant; use backfill_rpc::rpc::BackfillRPC; +use grpc::client::Client; use interface::error::{StorageError, UsecaseError}; use interface::signature_persistence::{BlockProducer, ProcessingDataGetter}; use metrics_utils::utils::start_metrics; @@ -48,6 +49,7 @@ use nft_ingester::backfiller::{ TransactionsParser, }; use nft_ingester::fork_cleaner::ForkCleaner; +use nft_ingester::gapfiller::process_asset_details_stream; use nft_ingester::mpl_core_processor::MplCoreProcessor; use nft_ingester::sequence_consistent::SequenceConsistentGapfiller; use usecase::bigtable::BigTableClient; @@ -181,6 +183,7 @@ pub async fn main() -> Result<(), IngesterError> { .unwrap(); let rocks_storage = Arc::new(storage); + let last_saved_slot = rocks_storage.last_saved_slot()?.unwrap_or_default(); let synchronizer = Synchronizer::new( rocks_storage.clone(), index_storage.clone(), @@ -271,8 +274,6 @@ pub async fn main() -> Result<(), IngesterError> { } })); - let newest_restored_slot = rocks_storage.last_saved_slot()?.unwrap_or(0); - // start backup service let backup_cfg = backup_service::load_config()?; let mut backup_service = BackupService::new(rocks_storage.db.clone(), &backup_cfg)?; @@ -378,7 +379,7 @@ pub async fn main() -> Result<(), IngesterError> { match slot { Ok(slot) => { if let Some(slot) = slot { - if slot != newest_restored_slot { + if slot != last_saved_slot { first_processed_slot_clone.store(slot, Ordering::SeqCst); break; } @@ -405,6 +406,35 @@ pub async fn main() -> Result<(), IngesterError> { .await, ); + match Client::connect(config.clone()).await { + Ok(gaped_data_client) => { + while first_processed_slot.load(Ordering::SeqCst) == 0 + && keep_running.load(Ordering::SeqCst) + { + tokio::time::sleep(Duration::from_millis(100)).await + } + if keep_running.load(Ordering::SeqCst) { + let cloned_keep_running = keep_running.clone(); + let cloned_rocks_storage = rocks_storage.clone(); + mutexed_tasks.lock().await.spawn(async move { + info!( + "Processed {} gaped assets", + process_asset_details_stream( + cloned_keep_running, + cloned_rocks_storage, + last_saved_slot, + first_processed_slot.load(Ordering::SeqCst), + gaped_data_client, + ) + .await + ); + Ok(()) + }); + } + } + Err(e) => error!("GRPC Client new: {}", e), + }; + let cloned_keep_running = keep_running.clone(); let cloned_rocks_storage = rocks_storage.clone(); let cloned_api_metrics = metrics_state.api_metrics.clone(); diff --git a/nft_ingester/src/bin/migrator/main.rs b/nft_ingester/src/bin/migrator/main.rs index ac5ab0b31..de767e9bc 100644 --- a/nft_ingester/src/bin/migrator/main.rs +++ b/nft_ingester/src/bin/migrator/main.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use entities::enums::TaskStatus; -use entities::models::Task; +use entities::models::{OffChainData, Task}; use log::{error, info}; use metrics_utils::red::RequestErrorDurationMetrics; use metrics_utils::utils::start_metrics; @@ -16,7 +16,6 @@ use nft_ingester::config::{ }; use nft_ingester::error::IngesterError; use nft_ingester::init::graceful_stop; -use rocks_db::offchain_data::OffChainData; use rocks_db::{AssetDynamicDetails, Storage}; pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100; diff --git a/nft_ingester/src/bin/raw_backup/main.rs b/nft_ingester/src/bin/raw_backup/main.rs index b2f859002..f1ffa660b 100644 --- a/nft_ingester/src/bin/raw_backup/main.rs +++ b/nft_ingester/src/bin/raw_backup/main.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use clap::{arg, Parser}; +use entities::models::OffChainData; use metrics_utils::red::RequestErrorDurationMetrics; use nft_ingester::config::init_logger; use rocks_db::column::TypedColumn; @@ -59,10 +60,7 @@ pub async fn main() -> Result<(), IngesterError> { .for_each(|(k, v)| target_storage.db.put_cf(cf, k, v).unwrap()); info!("Done copying raw blocks"); - let cf = &target_storage - .db - .cf_handle(rocks_db::offchain_data::OffChainData::NAME) - .unwrap(); + let cf = &target_storage.db.cf_handle(OffChainData::NAME).unwrap(); info!("Copying offchain data..."); source_storage .asset_offchain_data diff --git a/nft_ingester/src/bubblegum_updates_processor.rs b/nft_ingester/src/bubblegum_updates_processor.rs index 324e87e9d..9e8b369af 100644 --- a/nft_ingester/src/bubblegum_updates_processor.rs +++ b/nft_ingester/src/bubblegum_updates_processor.rs @@ -14,7 +14,8 @@ use entities::enums::{ TokenStandard, UseMethod, }; use entities::models::{ - BufferedTransaction, SignatureWithSlot, Task, UpdateVersion, Updated, UrlWithStatus, + BufferedTransaction, OffChainData, SignatureWithSlot, Task, UpdateVersion, Updated, + UrlWithStatus, }; use entities::models::{ChainDataV1, Creator, Uses}; use entities::rollup::BatchMintInstruction; @@ -29,7 +30,6 @@ use rocks_db::asset::AssetOwner; use rocks_db::asset::{ AssetAuthority, AssetCollection, AssetDynamicDetails, AssetLeaf, AssetStaticDetails, }; -use rocks_db::offchain_data::OffChainData; use rocks_db::transaction::{ AssetDynamicUpdate, AssetUpdate, AssetUpdateEvent, InstructionResult, TransactionResult, TreeUpdate, diff --git a/nft_ingester/src/gapfiller.rs b/nft_ingester/src/gapfiller.rs index 1cefa5cc6..561dbe5bb 100644 --- a/nft_ingester/src/gapfiller.rs +++ b/nft_ingester/src/gapfiller.rs @@ -1,24 +1,49 @@ use crate::error::IngesterError; use entities::models::CompleteAssetDetails; use futures::StreamExt; -use interface::asset_streaming_and_discovery::AssetDetailsStream; +use interface::asset_streaming_and_discovery::AssetDetailsConsumer; use log::error; use rocks_db::Storage; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -pub async fn process_asset_details_stream(storage: Arc, mut stream: AssetDetailsStream) { +pub async fn process_asset_details_stream( + keep_running: Arc, + storage: Arc, + start_slot: u64, + end_slot: u64, + mut consumer: impl AssetDetailsConsumer, +) -> u64 { + let mut stream = match consumer + .get_consumable_stream_in_range(start_slot, end_slot) + .await + { + Ok(stream) => stream, + Err(e) => { + error!("Error consume asset details stream in range: {}", e); + return 0; + } + }; + + let mut processed_assets = 0; while let Some(result) = stream.next().await { + if !keep_running.load(Ordering::SeqCst) { + break; + } match result { Ok(details) => { if let Some(e) = insert_gaped_data(storage.clone(), details).await.err() { error!("Error processing gaped data: {}", e) } + processed_assets += 1; } Err(e) => { error!("Error processing stream item: {}", e); } } } + + processed_assets } pub async fn insert_gaped_data( diff --git a/nft_ingester/src/json_downloader.rs b/nft_ingester/src/json_downloader.rs new file mode 100644 index 000000000..e69de29bb diff --git a/nft_ingester/src/json_worker.rs b/nft_ingester/src/json_worker.rs index ad5fd40dd..5dbbbf990 100644 --- a/nft_ingester/src/json_worker.rs +++ b/nft_ingester/src/json_worker.rs @@ -1,7 +1,7 @@ use crate::config::{setup_config, IngesterConfig, INGESTER_CONFIG_PREFIX}; use async_trait::async_trait; use entities::enums::TaskStatus; -use entities::models::JsonDownloadTask; +use entities::models::{JsonDownloadTask, OffChainData}; use interface::error::JsonDownloaderError; use interface::json::{JsonDownloader, JsonPersister}; use log::{debug, error}; @@ -9,7 +9,7 @@ use metrics_utils::{JsonDownloaderMetricsConfig, MetricStatus}; use postgre_client::tasks::UpdatedTask; use postgre_client::PgClient; use reqwest::{Client, ClientBuilder}; -use rocks_db::{offchain_data::OffChainData, Storage}; +use rocks_db::Storage; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 47234188c..8b22264cf 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -15,7 +15,7 @@ mod tests { AssetList, TokenAccountsList, TransactionSignatureList, }; use entities::api_req_params::{DisplayOptions, GetAssetSignatures, GetTokenAccounts, Options}; - use entities::models::{AssetSignature, AssetSignatureKey}; + use entities::models::{AssetSignature, AssetSignatureKey, OffChainData}; use entities::{ api_req_params::{ AssetSortBy, AssetSortDirection, AssetSorting, GetAsset, GetAssetsByAuthority, @@ -40,7 +40,6 @@ mod tests { }; use rocks_db::{ columns::{Mint, TokenAccount}, - offchain_data::OffChainData, AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, }; use serde_json::{json, Value}; diff --git a/nft_ingester/tests/bubblegum_tests.rs b/nft_ingester/tests/bubblegum_tests.rs index 2f5aff977..b5d942527 100644 --- a/nft_ingester/tests/bubblegum_tests.rs +++ b/nft_ingester/tests/bubblegum_tests.rs @@ -2,6 +2,7 @@ #[cfg(feature = "integration_tests")] mod tests { use entities::api_req_params::{GetAsset, GetAssetProof, Options}; + use entities::models::OffChainData; use metrics_utils::red::RequestErrorDurationMetrics; use metrics_utils::{ApiMetricsConfig, BackfillerMetricsConfig, IngesterMetricsConfig}; use nft_ingester::config::JsonMiddlewareConfig; @@ -12,7 +13,7 @@ mod tests { buffer::Buffer, transaction_ingester::{self, BackfillTransactionIngester}, }; - use rocks_db::{bubblegum_slots::BubblegumSlotGetter, offchain_data::OffChainData, Storage}; + use rocks_db::{bubblegum_slots::BubblegumSlotGetter, Storage}; use std::fs::File; use std::io::{self, Read}; use std::sync::Arc; diff --git a/nft_ingester/tests/decompress.rs b/nft_ingester/tests/decompress.rs index 8d83afa29..e8aaf7df1 100644 --- a/nft_ingester/tests/decompress.rs +++ b/nft_ingester/tests/decompress.rs @@ -4,6 +4,7 @@ mod tests { use blockbuster::token_metadata::accounts::Metadata; use blockbuster::token_metadata::types::{Collection, Creator, Key}; use entities::api_req_params::{GetAsset, Options}; + use entities::models::OffChainData; use metrics_utils::red::RequestErrorDurationMetrics; use metrics_utils::{ApiMetricsConfig, BackfillerMetricsConfig, IngesterMetricsConfig}; use nft_ingester::config::JsonMiddlewareConfig; @@ -20,7 +21,6 @@ mod tests { use rocks_db::{ bubblegum_slots::BubblegumSlotGetter, columns::{Mint, TokenAccount}, - offchain_data::OffChainData, Storage, }; use solana_sdk::pubkey::Pubkey; diff --git a/nft_ingester/tests/gapfiller_tests.rs b/nft_ingester/tests/gapfiller_tests.rs index 3d3a3a46a..be3fa1485 100644 --- a/nft_ingester/tests/gapfiller_tests.rs +++ b/nft_ingester/tests/gapfiller_tests.rs @@ -1,9 +1,10 @@ use entities::models::{CompleteAssetDetails, Updated}; use futures::stream; -use interface::asset_streaming_and_discovery::AsyncError; +use interface::asset_streaming_and_discovery::{AsyncError, MockAssetDetailsConsumer}; use metrics_utils::red::RequestErrorDurationMetrics; use nft_ingester::gapfiller::process_asset_details_stream; use solana_sdk::pubkey::Pubkey; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use tempfile::TempDir; use tokio::{sync::Mutex, task::JoinSet}; @@ -37,13 +38,24 @@ async fn test_process_asset_details_stream() { let details1 = create_test_complete_asset_details(first_key.clone()); let details2 = create_test_complete_asset_details(second_key.clone()); - let stream = stream::iter(vec![ - Ok(details1), - Ok(details2), - Err(AsyncError::from("test error")), - ]); + let mut mock = MockAssetDetailsConsumer::new(); + mock.expect_get_consumable_stream_in_range() + .returning(move |_, _| { + Ok(Box::pin(stream::iter(vec![ + Ok(details1.clone()), + Ok(details2.clone()), + Err(AsyncError::from("test error")), + ]))) + }); - process_asset_details_stream(storage.clone(), Box::pin(stream)).await; + process_asset_details_stream( + Arc::new(AtomicBool::new(true)), + storage.clone(), + 100, + 200, + mock, + ) + .await; let selected_data = storage .asset_dynamic_data diff --git a/rocks-db/src/asset.rs b/rocks-db/src/asset.rs index 65ea89286..d22822e26 100644 --- a/rocks-db/src/asset.rs +++ b/rocks-db/src/asset.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use bincode::{deserialize, serialize}; use entities::enums::{ChainMutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass}; -use entities::models::{EditionData, Updated}; +use entities::models::{EditionData, OffChainData, Updated}; use log::{error, warn}; use rocksdb::MergeOperands; use serde::{Deserialize, Serialize}; @@ -21,7 +21,7 @@ pub struct AssetSelectedMaps { pub assets_collection: HashMap, pub assets_owner: HashMap, pub assets_leaf: HashMap, - pub offchain_data: HashMap, + pub offchain_data: HashMap, pub urls: HashMap, pub editions: HashMap, } diff --git a/rocks-db/src/asset_streaming_client.rs b/rocks-db/src/asset_streaming_client.rs index 76d0fba97..eb70a9e16 100644 --- a/rocks-db/src/asset_streaming_client.rs +++ b/rocks-db/src/asset_streaming_client.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; -use entities::models::{CompleteAssetDetails, UpdateVersion, Updated}; +use entities::models::{CompleteAssetDetails, OffChainData, UpdateVersion, Updated}; use interface::asset_streaming_and_discovery::{ AssetDetailsStream, AssetDetailsStreamer, AsyncError, }; @@ -187,7 +187,7 @@ async fn get_complete_asset_details( (Some(edition), master_edition) } }; - + let url = dynamic_data.url.clone(); Ok(CompleteAssetDetails { pubkey: static_data.pubkey, specification_asset_class: static_data.specification_asset_class, @@ -204,11 +204,18 @@ async fn get_complete_asset_details( onchain_data, creators: dynamic_data.creators, royalty_amount: dynamic_data.royalty_amount, - url: dynamic_data.url, + url: url.clone(), chain_mutability: dynamic_data.chain_mutability, lamports: dynamic_data.lamports, executable: dynamic_data.executable, metadata_owner: dynamic_data.metadata_owner, + raw_name: dynamic_data.raw_name, + plugins: dynamic_data.plugins, + unknown_plugins: dynamic_data.unknown_plugins, + rent_epoch: dynamic_data.rent_epoch, + num_minted: dynamic_data.num_minted, + current_size: dynamic_data.current_size, + plugins_json_version: dynamic_data.plugins_json_version, authority: Updated::new(authority.slot_updated, None, authority.authority), owner: owner.owner, delegate: owner.delegate, @@ -263,6 +270,8 @@ async fn get_complete_asset_details( .collect(), edition, master_edition, + offchain_data: Storage::column::(backend.clone(), metrics.clone()) + .get(url.value)?, }) } diff --git a/rocks-db/src/batch_client.rs b/rocks-db/src/batch_client.rs index 3780b9d60..545b1c1a2 100644 --- a/rocks-db/src/batch_client.rs +++ b/rocks-db/src/batch_client.rs @@ -299,7 +299,13 @@ impl Storage { lamports: data.lamports, executable: data.executable, metadata_owner: data.metadata_owner, - ..Default::default() + raw_name: data.raw_name, + plugins: data.plugins, + unknown_plugins: data.unknown_plugins, + rent_epoch: data.rent_epoch, + num_minted: data.num_minted, + current_size: data.current_size, + plugins_json_version: data.plugins_json_version, }, )?; @@ -416,6 +422,13 @@ impl Storage { &TokenMetadataEdition::MasterEdition(master_edition), )?; } + if let Some(offchain_data) = data.offchain_data { + self.asset_offchain_data.merge_with_batch_cbor( + &mut batch, + offchain_data.url.clone(), + &offchain_data, + )?; + } self.write_batch(batch).await?; Ok(()) } diff --git a/rocks-db/src/lib.rs b/rocks-db/src/lib.rs index d9243fc87..4ffeec36f 100644 --- a/rocks-db/src/lib.rs +++ b/rocks-db/src/lib.rs @@ -15,7 +15,7 @@ pub use asset::{ }; pub use column::columns; use column::{Column, TypedColumn}; -use entities::models::AssetSignature; +use entities::models::{AssetSignature, OffChainData}; use metrics_utils::red::RequestErrorDurationMetrics; use tokio::sync::Mutex; use tokio::task::JoinSet; @@ -74,7 +74,7 @@ pub struct Storage { pub asset_leaf_data: Column, pub asset_collection_data: Column, pub asset_collection_data_deprecated: Column, - pub asset_offchain_data: Column, + pub asset_offchain_data: Column, pub cl_items: Column, pub cl_leafs: Column, pub bubblegum_slots: Column, @@ -201,7 +201,7 @@ impl Storage { fn create_cf_descriptors() -> Vec { vec![ - Self::new_cf_descriptor::(), + Self::new_cf_descriptor::(), Self::new_cf_descriptor::(), Self::new_cf_descriptor::(), Self::new_cf_descriptor::(), @@ -379,7 +379,7 @@ impl Storage { asset::AssetStaticDetails::merge_keep_existing, ); } - offchain_data::OffChainData::NAME => { + OffChainData::NAME => { cf_options.set_merge_operator_associative( "merge_fn_off_chain_data_keep_existing", asset::AssetStaticDetails::merge_keep_existing, diff --git a/rocks-db/src/offchain_data.rs b/rocks-db/src/offchain_data.rs index 68ac0fba9..876ebf678 100644 --- a/rocks-db/src/offchain_data.rs +++ b/rocks-db/src/offchain_data.rs @@ -1,20 +1,7 @@ use crate::column::TypedColumn; use crate::key_encoders::{decode_string, encode_string}; use crate::Result; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Debug)] -pub enum ChainDataMutability { - Immutable, - Mutable, - Unknown, -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default)] -pub struct OffChainData { - pub url: String, - pub metadata: String, -} +use entities::models::OffChainData; impl TypedColumn for OffChainData { type KeyType = String; diff --git a/rocks-db/src/transaction.rs b/rocks-db/src/transaction.rs index 204a407ba..2a72f5cfe 100644 --- a/rocks-db/src/transaction.rs +++ b/rocks-db/src/transaction.rs @@ -1,12 +1,11 @@ use async_trait::async_trait; -use entities::models::Task; use entities::models::{BufferedTransaction, SignatureWithSlot}; +use entities::models::{OffChainData, Task}; use interface::error::StorageError; use solana_sdk::pubkey::Pubkey; use spl_account_compression::events::ChangeLogEventV1; use spl_account_compression::state::PathNode; -use crate::offchain_data::OffChainData; use crate::{ asset::{AssetCollection, AssetLeaf}, AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, diff --git a/rocks-db/src/transaction_client.rs b/rocks-db/src/transaction_client.rs index 52c873455..9842368c9 100644 --- a/rocks-db/src/transaction_client.rs +++ b/rocks-db/src/transaction_client.rs @@ -1,9 +1,8 @@ use async_trait::async_trait; -use entities::models::SignatureWithSlot; +use entities::models::{OffChainData, SignatureWithSlot}; use interface::error::StorageError; use solana_sdk::pubkey::Pubkey; -use crate::offchain_data::OffChainData; use crate::parameters::Parameter; use crate::{ parameters, diff --git a/tests/setup/src/rocks.rs b/tests/setup/src/rocks.rs index e5c21f38d..5c0e6b439 100644 --- a/tests/setup/src/rocks.rs +++ b/tests/setup/src/rocks.rs @@ -1,14 +1,14 @@ use std::sync::Arc; -use entities::models::Updated; +use entities::models::{OffChainData, Updated}; use rand::{random, Rng}; use solana_sdk::pubkey::Pubkey; use tempfile::TempDir; use metrics_utils::red::RequestErrorDurationMetrics; use rocks_db::{ - asset::AssetCollection, offchain_data::OffChainData, AssetAuthority, AssetDynamicDetails, - AssetOwner, AssetStaticDetails, Storage, + asset::AssetCollection, AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, + Storage, }; use tokio::{sync::Mutex, task::JoinSet};