Skip to content

Commit

Permalink
Merge pull request #348 from metaplex-foundation/feature/MTG-1151-cha…
Browse files Browse the repository at this point in the history
…ngelog-fixed

[MTG-1151] changelog fixed
  • Loading branch information
StanChe authored Jan 17, 2025
2 parents d0fcbf6 + cb5f4c6 commit 2fe9bdb
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 193 deletions.
40 changes: 28 additions & 12 deletions nft_ingester/src/api/dapi/change_logs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{collections::HashMap, str::FromStr, sync::Arc};

use interface::{processing_possibility::ProcessingPossibilityChecker, proofs::ProofChecker};
use interface::{
processing_possibility::ProcessingPossibilityChecker, proofs::ProofChecker,
slot_getter::LastProcessedSlotGetter,
};
use metrics_utils::ApiMetricsConfig;
use rocks_db::{
clients::asset_streaming_client::get_required_nodes_for_proof,
Expand All @@ -17,6 +20,8 @@ use crate::{
fetch_asset_data,
};

const OFFSET_SLOTS: u64 = 300; // Roughly 2 minutes in Solana time. After this time we consider pending/non-finalized data as invalid and use finalized version instead.

#[derive(Debug, Default, Clone, Eq, PartialEq)]
struct SimpleChangeLog {
cli_hash: Vec<u8>,
Expand Down Expand Up @@ -80,23 +85,31 @@ pub async fn get_proof_for_assets<
// Batch get cl_items
let cl_items_first_leaf = rocks_db.cl_items.batch_get(cl_item_keys.clone()).await?;

let slot_for_cutoff = if let Ok(Some(last_seen_slot)) = rocks_db.get_last_ingested_slot().await
{
last_seen_slot.wrapping_sub(OFFSET_SLOTS)
} else {
0
};
// Build the leaves map directly by iterating over tree_pubkeys and the fetched items in parallel
let leaves: HashMap<Vec<u8>, (model::ClItemsModel, u64)> = tree_pubkeys
.into_iter()
.zip(cl_items_first_leaf.into_iter())
.filter_map(|((tree_id, pubkey, nonce), leaf_opt)| {
leaf_opt.map(|leaf| {
let hash = leaf.get_updated_hash(slot_for_cutoff);
(
pubkey.to_bytes().to_vec(),
(
model::ClItemsModel {
id: 0,
tree: tree_id.to_bytes().to_vec(),
node_idx: leaf.cli_node_idx as i64,
leaf_idx: leaf.cli_leaf_idx.map(|idx| idx as i64),
seq: leaf.cli_seq as i64,
level: leaf.cli_level as i64,
hash: leaf.cli_hash,
node_idx: leaf.node_idx as i64,
leaf_idx: leaf.leaf_idx.map(|idx| idx as i64),
level: leaf.level as i64,
seq: hash.update_version.and_then(|v| v.get_seq()).unwrap_or_default()
as i64,
hash: hash.value,
},
nonce,
),
Expand All @@ -121,12 +134,15 @@ pub async fn get_proof_for_assets<
.await?
.into_iter()
.flatten()
.map(|node| SimpleChangeLog {
cli_hash: node.cli_hash,
cli_level: node.cli_level as i64,
cli_node_idx: node.cli_node_idx as i64,
cli_seq: node.cli_seq as i64,
cli_tree: node.cli_tree_key.to_bytes().to_vec(),
.map(|node| {
let hash = node.get_updated_hash(slot_for_cutoff);
SimpleChangeLog {
cli_hash: hash.value,
cli_level: node.level as i64,
cli_node_idx: node.node_idx as i64,
cli_seq: hash.update_version.and_then(|v| v.get_seq()).unwrap_or_default() as i64,
cli_tree: node.tree_key.to_bytes().to_vec(),
}
})
.collect::<Vec<_>>();

Expand Down
Loading

0 comments on commit 2fe9bdb

Please sign in to comment.