Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
418 changes: 20 additions & 398 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"crates/tycho-storage",
"crates/tycho-ethereum",
"crates/tycho-indexer",
"crates/tycho-protobuf",
"crates/tycho-simulation",
"crates/tycho-test",
"crates/tycho-integration-test",
Expand All @@ -32,6 +33,7 @@ readme = "README.md"
tycho = { path = "crates/tycho", version = "0.283.1" }
tycho-common = { path = "crates/tycho-common", version = "0.283.1" }
tycho-client = { path = "crates/tycho-client", version = "0.283.1" }
tycho-protobuf = { path = "crates/tycho-protobuf", version = "0.283.1" }
tycho-storage = { path = "crates/tycho-storage", version = "0.283.1" }
tycho-ethereum = { path = "crates/tycho-ethereum", version = "0.283.1" }
tycho-indexer = { path = "crates/tycho-indexer", version = "0.283.1" }
Expand Down
259 changes: 258 additions & 1 deletion crates/tycho-common/src/models/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::warn;
use crate::{
dto,
models::{
contract::{AccountBalance, AccountDelta},
contract::{AccountBalance, AccountDelta, AccountToContractChanges},
protocol::{ComponentBalance, ProtocolComponent, ProtocolComponentStateDelta},
token::Token,
Address, Balance, BlockHash, Chain, Code, ComponentId, EntryPointId, MergeError, StoreKey,
Expand Down Expand Up @@ -688,6 +688,263 @@ impl std::fmt::Display for TracedEntryPoint {
}
}

/// Storage changes grouped by transaction.
#[derive(Debug, PartialEq, Default, Clone, DeepSizeOf)]
pub struct TxWithContractChanges {
pub tx: Transaction,
pub contract_changes: AccountToContractChanges,
}

#[derive(Debug, PartialEq, Default, Clone, DeepSizeOf)]
pub struct BlockChanges {
pub extractor: String,
pub chain: Chain,
pub block: Block,
pub finalized_block_height: u64,
pub revert: bool,
pub new_tokens: HashMap<Address, Token>,
/// Vec of updates at this block, aggregated by tx and sorted by tx index in ascending order
pub txs_with_update: Vec<TxWithChanges>,
// Raw block contract changes. This is intended as DCI input and is to be omitted from the
// reorg buffer and aggregation into the `BlockAggregatedChanges` object.
pub block_contract_changes: Vec<TxWithContractChanges>,
/// Required here so that it is part of the reorg buffer and thus inserted into storage once
/// finalized.
/// Populated by the `DynamicContractIndexer`
pub trace_results: Vec<TracedEntryPoint>,
/// The index of the partial block. None if it's a full block.
pub partial_block_index: Option<u32>,
}

impl BlockChanges {
pub fn new(
extractor: String,
chain: Chain,
block: Block,
finalized_block_height: u64,
revert: bool,
txs_with_update: Vec<TxWithChanges>,
block_contract_changes: Vec<TxWithContractChanges>,
) -> Self {
BlockChanges {
extractor,
chain,
block,
finalized_block_height,
revert,
new_tokens: HashMap::new(),
txs_with_update,
block_contract_changes,
trace_results: Vec::new(),
partial_block_index: None,
}
}

/// Aggregates component and account updates.
///
/// This function aggregates all protocol updates into a [`BlockAggregatedChanges`] object. This
/// new object should have all individual changes merged into only one final/compacted change
/// per component and account. This means there is only one state delta and component balance
/// per component, and one account delta and account balance per account. DCI trace results are
/// also aggregated into a result per entry point.
///
/// Note - all non-protocol specific data in the BlockChanges object are lost during
/// aggregation. This means block_storage_changes is dropped.
///
/// # Errors
///
/// This returns a `MergeError` if there was a problem during merge.
pub fn into_aggregated(
self,
db_committed_block_height: Option<u64>,
) -> Result<BlockAggregatedChanges, MergeError> {
if db_committed_block_height.is_some_and(|h| h > self.finalized_block_height) {
return Err(MergeError::InvalidState(format!(
"Database committed block height {:?} is greater than finalized_block_height {}",
db_committed_block_height, self.finalized_block_height
)));
}

let mut iter = self.txs_with_update.into_iter();

let first_state = iter.next().unwrap_or_default();

let aggregated_changes = iter
.try_fold(first_state, |mut acc_state, new_state| {
acc_state.merge(new_state.clone())?;
Ok::<_, MergeError>(acc_state.clone())
})?;

// Aggregate trace_results
let mut aggregated_trace_results = HashMap::new();
for result in self.trace_results {
let external_id = result.entry_point_id();
aggregated_trace_results
.entry(external_id)
.and_modify(|existing: &mut TracingResult| {
existing.merge(result.tracing_result.clone())
})
.or_insert(result.tracing_result);
}

Ok(BlockAggregatedChanges {
extractor: self.extractor,
chain: self.chain,
block: self.block,
db_committed_block_height,
finalized_block_height: self.finalized_block_height,
revert: self.revert,
new_protocol_components: aggregated_changes.protocol_components,
new_tokens: self.new_tokens,
deleted_protocol_components: HashMap::new(),
state_deltas: aggregated_changes.state_updates,
account_deltas: aggregated_changes.account_deltas,
component_balances: aggregated_changes.balance_changes,
account_balances: aggregated_changes.account_balance_changes,
component_tvl: HashMap::new(),
dci_update: DCIUpdate {
new_entrypoints: aggregated_changes.entrypoints,
new_entrypoint_params: aggregated_changes.entrypoint_params,
trace_results: aggregated_trace_results,
},
partial_block_index: self.partial_block_index,
})
}

pub fn protocol_components(&self) -> Vec<ProtocolComponent> {
self.txs_with_update
.iter()
.flat_map(|tx_u| {
tx_u.protocol_components
.values()
.cloned()
})
.collect()
}

/// Returns true if the block is a partial block.
pub fn is_partial_block(&self) -> bool {
self.partial_block_index.is_some()
}

/// Sets the partial block index.
pub fn set_partial_block_index(&mut self, index: Option<u32>) {
self.partial_block_index = index;
}

/// Sets every transaction's `block_hash` in `txs_with_update` and `block_contract_changes`
/// to this block's hash. Used after merging partials so all txs refer to the same block
/// (e.g. the final block hash).
pub fn normalize_block_hash(&mut self) {
let h = self.block.hash.clone();
for tx_with_changes in self.txs_with_update.iter_mut() {
tx_with_changes.tx.block_hash = h.clone();
}
for tx_with_contract in self.block_contract_changes.iter_mut() {
tx_with_contract.tx.block_hash = h.clone();
}
}

/// Merges another partial block into this one, preserving later changes.
///
/// The partial block with the higher index represents later changes and takes precedence.
/// Merges `new_tokens`, `txs_with_update` (sorted by index), `block_contract_changes`,
/// and `trace_results`. When both blocks have the same token address, the token from the
/// block with the higher partial index is kept.
///
/// Works regardless of merge order: `partial_0.merge_partial(partial_1)` and
/// `partial_1.merge_partial(partial_0)` produce equivalent results.
///
/// # Errors
/// - Non-partial block: Either block is not marked as partial
/// - Extractor mismatch: Blocks from different extractors
/// - Chain mismatch: Blocks from different chains
/// - Block mismatch: Different block numbers (hash may differ for temp vs final partial)
/// - Revert mismatch: Different revert status
pub fn merge_partial(self, other: Self) -> Result<Self, MergeError> {
let Some(self_index) = self.partial_block_index else {
return Err(MergeError::InvalidState("self is not a partial block".to_string()));
};

let Some(other_index) = other.partial_block_index else {
return Err(MergeError::InvalidState("other is not a partial block".to_string()));
};

if self.extractor != other.extractor {
return Err(MergeError::IdMismatch(
"partial blocks (extractor)".to_string(),
self.extractor.clone(),
other.extractor.clone(),
));
}

if self.chain != other.chain {
return Err(MergeError::IdMismatch(
"partial blocks (chain)".to_string(),
format!("{:?}", self.chain),
format!("{:?}", other.chain),
));
}

if self.block.number != other.block.number {
return Err(MergeError::BlockMismatch(
"partial blocks".to_string(),
self.block.hash.clone(),
other.block.hash.clone(),
));
}

if self.revert != other.revert {
return Err(MergeError::InvalidState(format!(
"different revert status: {} vs {}",
self.revert, other.revert
)));
}

let (mut current, previous) = if self_index > other_index {
(self, other)
} else if self_index < other_index {
(other, self)
} else {
return Err(MergeError::InvalidState(format!(
"same partial block index: {self_index}"
)));
};

for (addr, token) in previous.new_tokens {
current
.new_tokens
.entry(addr)
.or_insert(token);
}

current
.txs_with_update
.extend(previous.txs_with_update);
current
.txs_with_update
.sort_by_key(|tx| tx.tx.index);

current
.block_contract_changes
.extend(previous.block_contract_changes);

current.normalize_block_hash();

current
.trace_results
.extend(previous.trace_results);

Ok(current)
}
}

impl BlockScoped for BlockChanges {
fn block(&self) -> Block {
self.block.clone()
}
}

#[cfg(test)]
pub mod fixtures {
use std::str::FromStr;
Expand Down
2 changes: 2 additions & 0 deletions crates/tycho-common/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pub mod error;
pub mod protocol;
pub mod token;

pub use blockchain::{BlockChanges, TxWithContractChanges};

use std::{collections::HashMap, fmt::Display, str::FromStr};

use deepsize::DeepSizeOf;
Expand Down
2 changes: 1 addition & 1 deletion crates/tycho-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] }
rand.workspace = true
serde_yaml = "0.9.32"
tracing-opentelemetry = { version = "0.23", default-features = false }
tycho-substreams = "0.6.0"
tycho-protobuf.workspace = true
utoipa = { version = "4.2.0", features = ["chrono"] }
utoipa-swagger-ui = { version = "6.0.0", features = ["actix-web"] }
zstd = "0.13"
Expand Down
9 changes: 9 additions & 0 deletions crates/tycho-indexer/src/extractor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ pub enum ExtractionError {
DCICacheError(#[from] DCICacheError),
}

impl From<tycho_protobuf::error::DecodeError> for ExtractionError {
fn from(e: tycho_protobuf::error::DecodeError) -> Self {
match e {
tycho_protobuf::error::DecodeError::Empty => Self::Empty,
tycho_protobuf::error::DecodeError::Decode(msg) => Self::DecodeError(msg),
}
}
}

#[derive(Error, Debug)]
pub enum RPCError {
#[error("RPC setup error: {0}")]
Expand Down
Loading
Loading