From 941ba6e07593ab477bb18a45fd277cfd904a766b Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Mon, 8 Jun 2026 11:33:46 +0200 Subject: [PATCH 1/4] fix: revert ChangeType::Creation attributes without DB lookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Chain reorgs on Base crashed with "Could not find Component" when a partial block was reverted that contained an attribute first set with ChangeType::Creation (e.g. a new Uniswap V4 tick slot). These attributes have no prior state anywhere — not in the reorg buffer (reverted range) and not in the DB (non-finalized component) — so looking up their previous value is both impossible and unnecessary. Track ChangeType::Creation attributes in a new `created_attributes` field on ProtocolComponentStateDelta, populated during protobuf deserialisation. In handle_revert, filter these out of the buffer/DB lookup entirely and emit ChangeType::Deletion directly instead. Co-Authored-By: Claude Sonnet 4.6 --- crates/tycho-common/src/dto.rs | 1 + crates/tycho-common/src/models/protocol.rs | 21 +- .../extractor/dynamic_contract_indexer/dci.rs | 3 + .../extractor/post_processors/attributes.rs | 4 + .../src/extractor/protobuf_deserialisation.rs | 10 +- .../src/extractor/protocol_extractor.rs | 239 ++++++++++++++++-- .../src/extractor/reorg_buffer.rs | 3 + crates/tycho-indexer/src/testing.rs | 1 + crates/tycho-simulation/src/evm/pending.rs | 1 + crates/tycho-storage/src/postgres/protocol.rs | 2 + 10 files changed, 266 insertions(+), 19 deletions(-) diff --git a/crates/tycho-common/src/dto.rs b/crates/tycho-common/src/dto.rs index 60decb494e..c0f75afa89 100644 --- a/crates/tycho-common/src/dto.rs +++ b/crates/tycho-common/src/dto.rs @@ -2581,6 +2581,7 @@ mod test { ("attr_1".to_string(), Bytes::from("0x00000000000003e8")), ]), deleted_attributes: HashSet::new(), + ..Default::default() }), ]), new_protocol_components: HashMap::from([ diff --git a/crates/tycho-common/src/models/protocol.rs b/crates/tycho-common/src/models/protocol.rs index 58f8bdc223..c626484387 100644 --- a/crates/tycho-common/src/models/protocol.rs +++ b/crates/tycho-common/src/models/protocol.rs @@ -159,11 +159,18 @@ impl ProtocolComponentState { } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, DeepSizeOf)] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, DeepSizeOf)] pub struct ProtocolComponentStateDelta { pub component_id: ComponentId, pub updated_attributes: HashMap, pub deleted_attributes: HashSet, + /// Attribute names first introduced with `ChangeType::Creation` in this delta. + /// + /// Used during chain reorgs: a creation attribute has no prior state, so reverting it means + /// emitting a deletion rather than restoring a previous value. Skipped during serialization + /// — this is an indexer-internal reorg hint, not part of the public delta format. + #[serde(skip)] + pub created_attributes: HashSet, } impl ProtocolComponentStateDelta { @@ -172,7 +179,12 @@ impl ProtocolComponentStateDelta { updated_attributes: HashMap, deleted_attributes: HashSet, ) -> Self { - Self { component_id: component_id.to_string(), updated_attributes, deleted_attributes } + Self { + component_id: component_id.to_string(), + updated_attributes, + deleted_attributes, + created_attributes: HashSet::new(), + } } /// Merges this update with another one. @@ -196,6 +208,7 @@ impl ProtocolComponentStateDelta { } for attr in &other.deleted_attributes { self.updated_attributes.remove(attr); + self.created_attributes.remove(attr); } for attr in other.updated_attributes.keys() { self.deleted_attributes.remove(attr); @@ -204,6 +217,8 @@ impl ProtocolComponentStateDelta { .extend(other.updated_attributes); self.deleted_attributes .extend(other.deleted_attributes); + self.created_attributes + .extend(other.created_attributes); Ok(()) } } @@ -273,6 +288,7 @@ impl From for ProtocolComponentStateDelta { component_id: value.component_id, updated_attributes: value.updated_attributes, deleted_attributes: value.deleted_attributes, + created_attributes: HashSet::new(), } } } @@ -332,6 +348,7 @@ mod test { component_id: id, updated_attributes: attributes1, deleted_attributes: HashSet::new(), + ..Default::default() } } diff --git a/crates/tycho-indexer/src/extractor/dynamic_contract_indexer/dci.rs b/crates/tycho-indexer/src/extractor/dynamic_contract_indexer/dci.rs index e5f9033e1c..f5c50c05d4 100644 --- a/crates/tycho-indexer/src/extractor/dynamic_contract_indexer/dci.rs +++ b/crates/tycho-indexer/src/extractor/dynamic_contract_indexer/dci.rs @@ -3582,6 +3582,7 @@ mod tests { Bytes::from(1u64).lpad(32, 0), )]), deleted_attributes: HashSet::new(), + ..Default::default() }, )]), ..Default::default() @@ -3731,6 +3732,7 @@ mod tests { Bytes::from(1u64).lpad(32, 0), )]), deleted_attributes: HashSet::new(), + ..Default::default() }, )]), ..Default::default() @@ -3854,6 +3856,7 @@ mod tests { Bytes::from(version as u64).lpad(32, 0), )]), deleted_attributes: HashSet::new(), + ..Default::default() }, )]), ..Default::default() diff --git a/crates/tycho-indexer/src/extractor/post_processors/attributes.rs b/crates/tycho-indexer/src/extractor/post_processors/attributes.rs index fd6da32423..b804f1a76c 100644 --- a/crates/tycho-indexer/src/extractor/post_processors/attributes.rs +++ b/crates/tycho-indexer/src/extractor/post_processors/attributes.rs @@ -35,6 +35,7 @@ pub fn add_default_attributes(mut changes: BlockChanges, attributes: &[&str]) -> component_id: c_id.clone(), updated_attributes: default_attr, deleted_attributes: HashSet::new(), + ..Default::default() }, ); } @@ -131,6 +132,7 @@ mod test { Bytes::from(1_u64.to_be_bytes()), )]), deleted_attributes: HashSet::new(), + ..Default::default() }, )]), protocol_components: HashMap::from([( @@ -175,6 +177,7 @@ mod test { ("liquidity".to_string(), Bytes::zero(32)), ]), deleted_attributes: HashSet::new(), + ..Default::default() }, )]), protocol_components: changes @@ -225,6 +228,7 @@ mod test { Bytes::from(1_u64.to_be_bytes()), )]), deleted_attributes: HashSet::new(), + ..Default::default() }, )]), ..Default::default() diff --git a/crates/tycho-indexer/src/extractor/protobuf_deserialisation.rs b/crates/tycho-indexer/src/extractor/protobuf_deserialisation.rs index dc9919bca4..d75014fea3 100644 --- a/crates/tycho-indexer/src/extractor/protobuf_deserialisation.rs +++ b/crates/tycho-indexer/src/extractor/protobuf_deserialisation.rs @@ -216,11 +216,16 @@ impl TryFromMessage for ProtocolComponentStateDelta { fn try_from_message(args: Self::Args<'_>) -> Result { let msg = args; - let (mut updates, mut deletions) = (HashMap::new(), HashSet::new()); + let (mut updates, mut deletions, mut created) = + (HashMap::new(), HashSet::new(), HashSet::new()); for attribute in msg.attributes.into_iter() { match ChangeType::try_from_message(attribute.change())? { - ChangeType::Update | ChangeType::Creation => { + ChangeType::Creation => { + created.insert(attribute.name.clone()); + updates.insert(attribute.name, Bytes::from(attribute.value)); + } + ChangeType::Update => { updates.insert(attribute.name, Bytes::from(attribute.value)); } ChangeType::Deletion => { @@ -233,6 +238,7 @@ impl TryFromMessage for ProtocolComponentStateDelta { component_id: msg.component_id, updated_attributes: updates, deleted_attributes: deletions, + created_attributes: created, }) } } diff --git a/crates/tycho-indexer/src/extractor/protocol_extractor.rs b/crates/tycho-indexer/src/extractor/protocol_extractor.rs index 2819036a90..221084525b 100644 --- a/crates/tycho-indexer/src/extractor/protocol_extractor.rs +++ b/crates/tycho-indexer/src/extractor/protocol_extractor.rs @@ -1312,6 +1312,36 @@ where }); // Handle reverted protocol state + + // First pass: collect attributes first introduced with ChangeType::Creation across the + // entire reverted range. These have no prior state and must be deleted on revert — no + // buffer or DB lookup is needed or possible for them. + let reverted_created_attrs: HashMap> = reverted_state + .iter() + .flat_map(|block_msg| { + block_msg + .block_update() + .txs_with_update + .iter() + .flat_map(|update| { + update + .state_updates + .iter() + .filter(|(c_id, _)| !reverted_components_creations.contains_key(*c_id)) + .flat_map(|(c_id, delta)| { + delta + .created_attributes + .iter() + .map(move |attr| (c_id.clone(), attr.clone())) + }) + }) + }) + .fold(HashMap::new(), |mut acc, (c_id, attr)| { + acc.entry(c_id).or_default().insert(attr); + acc + }); + + // Second pass: build the lookup key set, excluding creation attributes (no prior state). let reverted_protocol_state_keys: HashSet<_> = reverted_state .iter() .flat_map(|block_msg| { @@ -1328,6 +1358,11 @@ where delta .updated_attributes .keys() + .filter(|attr| { + !reverted_created_attrs + .get(c_id.as_str()) + .is_some_and(|created| created.contains(*attr)) + }) .chain(delta.deleted_attributes.iter()) .map(move |key| (c_id, key)) }) @@ -1369,20 +1404,21 @@ where .await .map_err(ExtractionError::Storage)?; - // Then merge the two and cast it to the expected struct - let missing_components_states_map = missing_map - .into_iter() - .map(|(component_id, keys)| { - missing_components_states - .iter() - .find(|comp| comp.component_id == component_id) - .map(|state| (state.clone(), keys)) - .ok_or(ExtractionError::Storage(StorageError::NotFound( - "Component".to_owned(), - component_id.to_string(), - ))) - }) - .collect::, _>>()?; + let missing_components_states_map: Vec<(ProtocolComponentState, Vec)> = + missing_map + .into_iter() + .map(|(component_id, keys)| { + let state = missing_components_states + .iter() + .find(|comp| comp.component_id == component_id) + .cloned() + .ok_or(ExtractionError::Storage(StorageError::NotFound( + "Component".to_owned(), + component_id.to_string(), + )))?; + Ok((state, keys)) + }) + .collect::, ExtractionError>>()?; let mut not_found: HashMap<_, HashSet<_>> = HashMap::new(); let mut db_states: HashMap<(String, String), Bytes> = HashMap::new(); @@ -1402,7 +1438,7 @@ where let empty = HashSet::::new(); - let state_deltas: HashMap = db_states + let mut state_deltas: HashMap = db_states .into_iter() .chain(buffered_state) .fold(HashMap::new(), |mut acc, ((c_id, key), value)| { @@ -1414,12 +1450,29 @@ where .get(&c_id) .unwrap_or(&empty) .clone(), + ..Default::default() }) .updated_attributes .insert(key.clone(), value); acc }); + // Safety net: components with attrs absent from both buffer and DB still need a delta. + for (c_id, deleted_keys) in ¬_found { + state_deltas + .entry(c_id.clone()) + .or_insert_with(|| ProtocolComponentStateDelta::new(c_id, HashMap::new(), deleted_keys.clone())); + } + + // Revert ChangeType::Creation attributes by emitting deletions — they had no prior state. + for (c_id, created_keys) in reverted_created_attrs { + state_deltas + .entry(c_id.clone()) + .or_insert_with(|| ProtocolComponentStateDelta::new(&c_id, HashMap::new(), HashSet::new())) + .deleted_attributes + .extend(created_keys); + } + // Handle component balance changes let reverted_component_balances_keys: HashSet<(&String, Bytes)> = reverted_state .iter() @@ -3166,6 +3219,161 @@ mod test { ); } } + + // Tests that reverting a partial block containing a brand-new attribute on a non-finalized + // component does not crash. Prior to the fix, the code errored with "Could not find Component" + // because the component was non-finalized (absent from DB) and the attribute had no prior + // value in the remaining buffer. The expected behavior is to emit a deletion for the attribute. + #[tokio::test] + async fn test_revert_new_attribute_on_non_finalized_component() { + use ::tycho_substreams::pb::tycho::evm::v1::{ + Attribute, BlockChanges as PbBlockChanges, ChangeType as PbChangeType, EntityChanges, + ProtocolComponent as PbProtocolComponent, ProtocolType, TransactionChanges, + }; + + let mut gw = MockExtractorGateway::new(); + gw.expect_ensure_protocol_types() + .times(1) + .returning(|_| ()); + gw.expect_get_cursor() + .times(1) + .returning(|| Ok(("cursor".into(), Bytes::default()))); + gw.expect_get_block() + .times(1) + .returning(|_| Ok(Block::default())); + gw.expect_advance() + .times(0) + .returning(|_, _, _| Ok(())); + gw.expect_get_contracts() + .returning(|_| Ok(Vec::new())); + // Component is non-finalized: not in DB. + gw.expect_get_protocol_states() + .returning(|_| Ok(Vec::new())); + gw.expect_get_components_balances() + .returning(|_| Ok(HashMap::new())); + gw.expect_get_account_balances() + .returning(|_| Ok(HashMap::new())); + + let extractor = create_extractor(gw).await; + + // Block 1: empty anchor. + extractor + .handle_tick_scoped_data(pb_fixtures::pb_block_scoped_data( + PbBlockChanges { + block: Some(pb_fixtures::pb_blocks(1)), + ..Default::default() + }, + Some("cursor@1"), + Some(1), + )) + .await + .unwrap() + .unwrap(); + + // Block 2: create `pool_x` with initial attributes. Non-finalized (stays in buffer). + extractor + .handle_tick_scoped_data(pb_fixtures::pb_block_scoped_data( + PbBlockChanges { + block: Some(pb_fixtures::pb_blocks(2)), + changes: vec![TransactionChanges { + tx: Some(pb_fixtures::pb_transactions(2, 0)), + component_changes: vec![PbProtocolComponent { + id: "pool_x".to_string(), + change: PbChangeType::Creation.into(), + protocol_type: Some(ProtocolType { + name: "pt_1".to_string(), + ..Default::default() + }), + ..Default::default() + }], + entity_changes: vec![EntityChanges { + component_id: "pool_x".to_string(), + attributes: vec![ + Attribute { + name: "sqrt_price_x96".to_string(), + value: Bytes::from(1000_u64).lpad(32, 0).to_vec(), + change: PbChangeType::Creation.into(), + }, + Attribute { + name: "tick".to_string(), + value: Bytes::from(100_u64).lpad(32, 0).to_vec(), + change: PbChangeType::Creation.into(), + }, + ], + }], + ..Default::default() + }], + ..Default::default() + }, + Some("cursor@2"), + Some(1), + )) + .await + .unwrap() + .unwrap(); + + // Partial block 3: first-ever tick attribute on `pool_x` (ChangeType::Creation). + let mut partial = pb_fixtures::pb_block_scoped_data( + PbBlockChanges { + block: Some(pb_fixtures::pb_blocks(3)), + changes: vec![TransactionChanges { + tx: Some(pb_fixtures::pb_transactions(3, 0)), + entity_changes: vec![EntityChanges { + component_id: "pool_x".to_string(), + attributes: vec![Attribute { + name: "ticks/100/net-liquidity".to_string(), + value: Bytes::from(5000_u64).lpad(32, 0).to_vec(), + change: PbChangeType::Creation.into(), + }], + }], + ..Default::default() + }], + ..Default::default() + }, + Some("cursor@3_p0"), + Some(1), + ); + partial.partial_index = Some(0); + partial.is_partial = true; + extractor + .handle_tick_scoped_data(partial) + .await + .unwrap() + .unwrap(); + + // Revert to block 2 — partial block 3 is reverted. + let revert_msg = extractor + .handle_revert(BlockUndoSignal { + last_valid_block: Some(BlockRef { + id: format!("0x{:0>64x}", 2_u64), + number: 2, + }), + last_valid_cursor: "cursor@2".into(), + }) + .await + .expect("handle_revert should not error for non-finalized component with new attr") + .expect("handle_revert should return a revert message"); + + assert!(revert_msg.revert); + + // The revert delta for pool_x must tell consumers to delete the new tick attribute. + let pool_x_delta = revert_msg + .state_deltas + .get("pool_x") + .expect("state_deltas should contain pool_x"); + assert!( + pool_x_delta + .deleted_attributes + .contains("ticks/100/net-liquidity"), + "Expected ticks/100/net-liquidity in deleted_attributes, got: {:?}", + pool_x_delta.deleted_attributes + ); + assert!( + pool_x_delta.updated_attributes.is_empty(), + "Expected no updated_attributes for pool_x, got: {:?}", + pool_x_delta.updated_attributes + ); + } } /// It is notoriously hard to mock postgres here, we would need to have traits and abstractions @@ -3813,6 +4021,7 @@ mod test_serial_db { ("attr_1".to_string(), Bytes::from(1000_u64).lpad(32, 0)), ]), deleted_attributes: HashSet::new(), + ..Default::default() }), ]), new_protocol_components: HashMap::from([ diff --git a/crates/tycho-indexer/src/extractor/reorg_buffer.rs b/crates/tycho-indexer/src/extractor/reorg_buffer.rs index 42794c31a1..bafc55701c 100644 --- a/crates/tycho-indexer/src/extractor/reorg_buffer.rs +++ b/crates/tycho-indexer/src/extractor/reorg_buffer.rs @@ -521,6 +521,7 @@ mod test { component_id: "State1".to_owned(), updated_attributes: attr, deleted_attributes: HashSet::new(), + ..Default::default() }, )]); let component_balances = HashMap::from([ @@ -583,6 +584,7 @@ mod test { Bytes::from(2_u64.to_be_bytes().to_vec()), )]), deleted_attributes: HashSet::new(), + ..Default::default() }, ), ( @@ -594,6 +596,7 @@ mod test { ("reserve".to_owned(), Bytes::from(30_u64.to_be_bytes().to_vec())), ]), deleted_attributes: HashSet::new(), + ..Default::default() }, ), ]); diff --git a/crates/tycho-indexer/src/testing.rs b/crates/tycho-indexer/src/testing.rs index b339cfb6dd..75de942249 100644 --- a/crates/tycho-indexer/src/testing.rs +++ b/crates/tycho-indexer/src/testing.rs @@ -726,6 +726,7 @@ pub mod fixtures { .into_iter() .collect(), deleted_attributes: HashSet::new(), + ..Default::default() } } diff --git a/crates/tycho-simulation/src/evm/pending.rs b/crates/tycho-simulation/src/evm/pending.rs index 65f65ba4e6..ab51e3d26e 100644 --- a/crates/tycho-simulation/src/evm/pending.rs +++ b/crates/tycho-simulation/src/evm/pending.rs @@ -256,6 +256,7 @@ fn snapshot_to_block_changes( component_id: id.clone(), updated_attributes: comp_with_state.state.attributes.clone(), deleted_attributes: HashSet::new(), + ..Default::default() }, ); diff --git a/crates/tycho-storage/src/postgres/protocol.rs b/crates/tycho-storage/src/postgres/protocol.rs index ad2553dfe6..840c67c9b4 100644 --- a/crates/tycho-storage/src/postgres/protocol.rs +++ b/crates/tycho-storage/src/postgres/protocol.rs @@ -2824,6 +2824,7 @@ mod test { deleted_attributes: vec!["deleted2".to_owned()] .into_iter() .collect(), + ..Default::default() }; let expected = vec![state_delta, other_state_delta]; @@ -2928,6 +2929,7 @@ mod test { deleted_attributes: vec!["to_delete".to_owned()] .into_iter() .collect(), + ..Default::default() }; let expected = vec![state_delta]; From 4b74b3706db5f24b3319a5d9405afeda8550fe3c Mon Sep 17 00:00:00 2001 From: zizou <111426680+zizou0x@users.noreply.github.com> Date: Tue, 9 Jun 2026 10:48:11 +0200 Subject: [PATCH 2/4] refactor: don't use ..Default::default() in core code This is because it might hide future compiler errors and lead to unnexpected behaviour. We prefer to keep it explicit so programmers are forced to make a choice. Using it in tests is fine. --- crates/tycho-common/src/models/protocol.rs | 6 ++---- .../src/extractor/post_processors/attributes.rs | 2 +- crates/tycho-indexer/src/extractor/protocol_extractor.rs | 2 +- crates/tycho-simulation/src/evm/pending.rs | 2 +- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/tycho-common/src/models/protocol.rs b/crates/tycho-common/src/models/protocol.rs index c626484387..528af14c81 100644 --- a/crates/tycho-common/src/models/protocol.rs +++ b/crates/tycho-common/src/models/protocol.rs @@ -165,10 +165,8 @@ pub struct ProtocolComponentStateDelta { pub updated_attributes: HashMap, pub deleted_attributes: HashSet, /// Attribute names first introduced with `ChangeType::Creation` in this delta. - /// - /// Used during chain reorgs: a creation attribute has no prior state, so reverting it means - /// emitting a deletion rather than restoring a previous value. Skipped during serialization - /// — this is an indexer-internal reorg hint, not part of the public delta format. + /// Skipped during serialization — this is an indexer-internal hint, not part of the public + /// delta format. #[serde(skip)] pub created_attributes: HashSet, } diff --git a/crates/tycho-indexer/src/extractor/post_processors/attributes.rs b/crates/tycho-indexer/src/extractor/post_processors/attributes.rs index b804f1a76c..a7824a8173 100644 --- a/crates/tycho-indexer/src/extractor/post_processors/attributes.rs +++ b/crates/tycho-indexer/src/extractor/post_processors/attributes.rs @@ -35,7 +35,7 @@ pub fn add_default_attributes(mut changes: BlockChanges, attributes: &[&str]) -> component_id: c_id.clone(), updated_attributes: default_attr, deleted_attributes: HashSet::new(), - ..Default::default() + created_attributes: HashSet::new(), }, ); } diff --git a/crates/tycho-indexer/src/extractor/protocol_extractor.rs b/crates/tycho-indexer/src/extractor/protocol_extractor.rs index 221084525b..86b1f02e88 100644 --- a/crates/tycho-indexer/src/extractor/protocol_extractor.rs +++ b/crates/tycho-indexer/src/extractor/protocol_extractor.rs @@ -1450,7 +1450,7 @@ where .get(&c_id) .unwrap_or(&empty) .clone(), - ..Default::default() + created_attributes: HashSet::new(), }) .updated_attributes .insert(key.clone(), value); diff --git a/crates/tycho-simulation/src/evm/pending.rs b/crates/tycho-simulation/src/evm/pending.rs index ab51e3d26e..25b9cb3278 100644 --- a/crates/tycho-simulation/src/evm/pending.rs +++ b/crates/tycho-simulation/src/evm/pending.rs @@ -256,7 +256,7 @@ fn snapshot_to_block_changes( component_id: id.clone(), updated_attributes: comp_with_state.state.attributes.clone(), deleted_attributes: HashSet::new(), - ..Default::default() + created_attributes: HashSet::new(), }, ); From 19be3821fb8520bba3518bc67f48fee19a234e23 Mon Sep 17 00:00:00 2001 From: zizou <111426680+zizou0x@users.noreply.github.com> Date: Tue, 9 Jun 2026 10:48:51 +0200 Subject: [PATCH 3/4] docs: add docstring for ExtractorGateway trait --- .../src/extractor/protocol_extractor.rs | 66 ++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/crates/tycho-indexer/src/extractor/protocol_extractor.rs b/crates/tycho-indexer/src/extractor/protocol_extractor.rs index 86b1f02e88..ea60ee69d4 100644 --- a/crates/tycho-indexer/src/extractor/protocol_extractor.rs +++ b/crates/tycho-indexer/src/extractor/protocol_extractor.rs @@ -1595,10 +1595,33 @@ pub struct ExtractorPgGateway { #[automock] #[async_trait] pub trait ExtractorGateway: Send + Sync { + /// Returns the last persisted Substreams cursor together with the hash of the block it was + /// saved at. + /// + /// # Errors + /// Returns [`StorageError::NotFound`] when no cursor has ever been persisted for this + /// extractor, i.e. on the very first run before any block has been processed. Callers use + /// this to distinguish a fresh extractor from a resumed one. async fn get_cursor(&self) -> Result<(Vec, Bytes), StorageError>; - async fn ensure_protocol_types(&self, new_protocol_types: &[ProtocolType]); + /// Idempotently registers `new_protocol_types`, inserting any that do not yet exist and + /// leaving already-present types untouched. + /// + /// This call returns no result: a failure to write is treated as unrecoverable (the extractor + /// cannot operate without its protocol types) and therefore panics rather than surfacing an + /// error. + async fn ensure_protocol_types(&self, new_protocol_types: &[ProtocolType]); //TODO make it not panic, it should return an error instead. + /// Persists every change in `changes` for a single block and records `new_cursor` as the + /// latest processed position. + /// + /// All writes for the block are staged within one transaction. When `force_commit` is `true` + /// the transaction is flushed to the store immediately; otherwise it is only flushed once + /// enough blocks have accumulated to reach the configured batch size. + /// + /// # Errors + /// Returns a [`StorageError`] if any staged write or the commit fails. On error the block's + /// changes are not committed. async fn advance( &self, changes: &BlockChanges, @@ -1606,20 +1629,61 @@ pub trait ExtractorGateway: Send + Sync { force_commit: bool, ) -> Result<(), StorageError>; + /// Returns the current state of the protocol components identified by `component_ids`. + /// + /// Only components that exist in the store are returned: unknown ids are silently omitted + /// rather than producing an error, so the result may be shorter than `component_ids` (and + /// empty if none are found). + /// + /// # Errors + /// Returns a [`StorageError`] only on an underlying store failure, never for missing ids. async fn get_protocol_states<'a>( &self, component_ids: &[&'a str], ) -> Result, StorageError>; + /// Returns the contracts at the given `addresses`, including their storage slots. + /// + /// Only addresses that exist in the store are returned: unknown addresses are silently + /// omitted rather than producing an error, so the result may be shorter than `addresses` + /// (and empty if none are found). + /// + /// # Errors + /// Returns a [`StorageError`] only on an underlying store failure, never for missing + /// addresses. async fn get_contracts(&self, addresses: &[Address]) -> Result, StorageError>; + /// Returns component balances keyed by component id and then by token address. + /// + /// Only components that have stored balances appear in the map: unknown or balance-less + /// `component_ids` are simply absent rather than producing an error, so the map may have + /// fewer keys than `component_ids` (and be empty if none are found). + /// + /// # Errors + /// Returns a [`StorageError`] only on an underlying store failure, never for missing ids. async fn get_components_balances<'a>( &self, component_ids: &[&'a str], ) -> Result>, StorageError>; + /// Returns the block identified by the given hash. + /// + /// Note: despite the `block_number` parameter name, the value is interpreted as a block + /// **hash**, not a number. + /// + /// # Errors + /// Returns [`StorageError::NotFound`] when no block with that hash exists in the store. Unlike + /// the collection getters, a missing block is an error because a single value is expected. async fn get_block(&self, block_number: Bytes) -> Result; + /// Returns account balances keyed by account address and then by token address. + /// + /// Only accounts that have stored balances appear in the map: unknown or balance-less + /// accounts are simply absent rather than producing an error, so the map may have fewer keys + /// than `accounts` (and be empty if none are found). + /// + /// # Errors + /// Returns a [`StorageError`] only on an underlying store failure, never for missing accounts. async fn get_account_balances( &self, accounts: &[Address], From f35d6b86095ec056cca56c31693bd54abc71f9bf Mon Sep 17 00:00:00 2001 From: zizou <111426680+zizou0x@users.noreply.github.com> Date: Tue, 9 Jun 2026 16:42:44 +0200 Subject: [PATCH 4/4] refactor: polish Extractor trait Removes a panic and fix wrong arg naming. --- crates/tycho-indexer/src/extractor/mod.rs | 5 +- .../src/extractor/protocol_extractor.rs | 123 ++++++++++-------- 2 files changed, 70 insertions(+), 58 deletions(-) diff --git a/crates/tycho-indexer/src/extractor/mod.rs b/crates/tycho-indexer/src/extractor/mod.rs index dad1c82f8b..660b32a36c 100644 --- a/crates/tycho-indexer/src/extractor/mod.rs +++ b/crates/tycho-indexer/src/extractor/mod.rs @@ -93,7 +93,10 @@ pub trait Extractor: Send + Sync { /// Ensures all protocol types this extractor needs are registered in /// storage. Safe to call multiple times. - async fn ensure_protocol_types(&self); + /// + /// # Errors + /// Returns an [`ExtractionError`] if the protocol types could not be persisted. + async fn ensure_protocol_types(&self) -> Result<(), ExtractionError>; /// Returns the current stream cursor, or an empty string if no block has /// been processed yet. At startup this reflects the last persisted cursor; diff --git a/crates/tycho-indexer/src/extractor/protocol_extractor.rs b/crates/tycho-indexer/src/extractor/protocol_extractor.rs index ea60ee69d4..a9a2d97549 100644 --- a/crates/tycho-indexer/src/extractor/protocol_extractor.rs +++ b/crates/tycho-indexer/src/extractor/protocol_extractor.rs @@ -196,7 +196,7 @@ where Err(err) => return Err(ExtractionError::Setup(err.to_string())), }; - res.ensure_protocol_types().await; + res.ensure_protocol_types().await?; Ok(res) } @@ -867,7 +867,7 @@ where } /// Make sure that the protocol types are present in the database. - async fn ensure_protocol_types(&self) { + async fn ensure_protocol_types(&self) -> Result<(), ExtractionError> { let protocol_types: Vec = self .protocol_types .values() @@ -876,7 +876,8 @@ where self.gateway .inner .ensure_protocol_types(&protocol_types) - .await; + .await?; + Ok(()) } async fn get_cursor(&self) -> String { @@ -1337,7 +1338,9 @@ where }) }) .fold(HashMap::new(), |mut acc, (c_id, attr)| { - acc.entry(c_id).or_default().insert(attr); + acc.entry(c_id) + .or_default() + .insert(attr); acc }); @@ -1404,21 +1407,20 @@ where .await .map_err(ExtractionError::Storage)?; - let missing_components_states_map: Vec<(ProtocolComponentState, Vec)> = - missing_map - .into_iter() - .map(|(component_id, keys)| { - let state = missing_components_states - .iter() - .find(|comp| comp.component_id == component_id) - .cloned() - .ok_or(ExtractionError::Storage(StorageError::NotFound( - "Component".to_owned(), - component_id.to_string(), - )))?; - Ok((state, keys)) - }) - .collect::, ExtractionError>>()?; + let missing_components_states_map: Vec<(ProtocolComponentState, Vec)> = missing_map + .into_iter() + .map(|(component_id, keys)| { + let state = missing_components_states + .iter() + .find(|comp| comp.component_id == component_id) + .cloned() + .ok_or(ExtractionError::Storage(StorageError::NotFound( + "Component".to_owned(), + component_id.to_string(), + )))?; + Ok((state, keys)) + }) + .collect::, ExtractionError>>()?; let mut not_found: HashMap<_, HashSet<_>> = HashMap::new(); let mut db_states: HashMap<(String, String), Bytes> = HashMap::new(); @@ -1461,14 +1463,18 @@ where for (c_id, deleted_keys) in ¬_found { state_deltas .entry(c_id.clone()) - .or_insert_with(|| ProtocolComponentStateDelta::new(c_id, HashMap::new(), deleted_keys.clone())); + .or_insert_with(|| { + ProtocolComponentStateDelta::new(c_id, HashMap::new(), deleted_keys.clone()) + }); } // Revert ChangeType::Creation attributes by emitting deletions — they had no prior state. for (c_id, created_keys) in reverted_created_attrs { state_deltas .entry(c_id.clone()) - .or_insert_with(|| ProtocolComponentStateDelta::new(&c_id, HashMap::new(), HashSet::new())) + .or_insert_with(|| { + ProtocolComponentStateDelta::new(&c_id, HashMap::new(), HashSet::new()) + }) .deleted_attributes .extend(created_keys); } @@ -1607,10 +1613,12 @@ pub trait ExtractorGateway: Send + Sync { /// Idempotently registers `new_protocol_types`, inserting any that do not yet exist and /// leaving already-present types untouched. /// - /// This call returns no result: a failure to write is treated as unrecoverable (the extractor - /// cannot operate without its protocol types) and therefore panics rather than surfacing an - /// error. - async fn ensure_protocol_types(&self, new_protocol_types: &[ProtocolType]); //TODO make it not panic, it should return an error instead. + /// # Errors + /// Returns a [`StorageError`] if the protocol types could not be persisted. + async fn ensure_protocol_types( + &self, + new_protocol_types: &[ProtocolType], + ) -> Result<(), StorageError>; /// Persists every change in `changes` for a single block and records `new_cursor` as the /// latest processed position. @@ -1668,13 +1676,10 @@ pub trait ExtractorGateway: Send + Sync { /// Returns the block identified by the given hash. /// - /// Note: despite the `block_number` parameter name, the value is interpreted as a block - /// **hash**, not a number. - /// /// # Errors /// Returns [`StorageError::NotFound`] when no block with that hash exists in the store. Unlike /// the collection getters, a missing block is an error because a single value is expected. - async fn get_block(&self, block_number: Bytes) -> Result; + async fn get_block(&self, block_hash: Bytes) -> Result; /// Returns account balances keyed by account address and then by token address. /// @@ -1743,11 +1748,13 @@ impl ExtractorGateway for ExtractorPgGateway { } } - async fn ensure_protocol_types(&self, new_protocol_types: &[ProtocolType]) { + async fn ensure_protocol_types( + &self, + new_protocol_types: &[ProtocolType], + ) -> Result<(), StorageError> { self.state_gateway .add_protocol_types(new_protocol_types) .await - .expect("Couldn't insert protocol types"); } async fn advance( @@ -2090,7 +2097,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -2109,7 +2116,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -2159,7 +2166,7 @@ mod test { gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -2306,7 +2313,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -2358,7 +2365,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -2391,7 +2398,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -2624,7 +2631,7 @@ mod test { extractor_gw .expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); extractor_gw .expect_get_cursor() .times(1) @@ -2756,7 +2763,7 @@ mod test { extractor_gw .expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); extractor_gw .expect_get_cursor() .times(1) @@ -2873,7 +2880,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -2989,7 +2996,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -3017,7 +3024,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -3061,7 +3068,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -3172,7 +3179,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -3298,7 +3305,7 @@ mod test { let mut gw = MockExtractorGateway::new(); gw.expect_ensure_protocol_types() .times(1) - .returning(|_| ()); + .returning(|_| Ok(())); gw.expect_get_cursor() .times(1) .returning(|| Ok(("cursor".into(), Bytes::default()))); @@ -3323,10 +3330,7 @@ mod test { // Block 1: empty anchor. extractor .handle_tick_scoped_data(pb_fixtures::pb_block_scoped_data( - PbBlockChanges { - block: Some(pb_fixtures::pb_blocks(1)), - ..Default::default() - }, + PbBlockChanges { block: Some(pb_fixtures::pb_blocks(1)), ..Default::default() }, Some("cursor@1"), Some(1), )) @@ -3355,12 +3359,16 @@ mod test { attributes: vec![ Attribute { name: "sqrt_price_x96".to_string(), - value: Bytes::from(1000_u64).lpad(32, 0).to_vec(), + value: Bytes::from(1000_u64) + .lpad(32, 0) + .to_vec(), change: PbChangeType::Creation.into(), }, Attribute { name: "tick".to_string(), - value: Bytes::from(100_u64).lpad(32, 0).to_vec(), + value: Bytes::from(100_u64) + .lpad(32, 0) + .to_vec(), change: PbChangeType::Creation.into(), }, ], @@ -3386,7 +3394,9 @@ mod test { component_id: "pool_x".to_string(), attributes: vec![Attribute { name: "ticks/100/net-liquidity".to_string(), - value: Bytes::from(5000_u64).lpad(32, 0).to_vec(), + value: Bytes::from(5000_u64) + .lpad(32, 0) + .to_vec(), change: PbChangeType::Creation.into(), }], }], @@ -3408,10 +3418,7 @@ mod test { // Revert to block 2 — partial block 3 is reverted. let revert_msg = extractor .handle_revert(BlockUndoSignal { - last_valid_block: Some(BlockRef { - id: format!("0x{:0>64x}", 2_u64), - number: 2, - }), + last_valid_block: Some(BlockRef { id: format!("0x{:0>64x}", 2_u64), number: 2 }), last_valid_cursor: "cursor@2".into(), }) .await @@ -3433,7 +3440,9 @@ mod test { pool_x_delta.deleted_attributes ); assert!( - pool_x_delta.updated_attributes.is_empty(), + pool_x_delta + .updated_attributes + .is_empty(), "Expected no updated_attributes for pool_x, got: {:?}", pool_x_delta.updated_attributes );