Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/tycho-common/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,7 @@ mod test {
("attr_1".to_string(), Bytes::from("0x00000000000003e8")),
]),
deleted_attributes: HashSet::new(),
..Default::default()
}),
]),
new_protocol_components: HashMap::from([
Expand Down
19 changes: 17 additions & 2 deletions crates/tycho-common/src/models/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,16 @@ impl ProtocolComponentState {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, DeepSizeOf)]
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, DeepSizeOf)]
pub struct ProtocolComponentStateDelta {
pub component_id: ComponentId,
pub updated_attributes: HashMap<AttrStoreKey, StoreVal>,
pub deleted_attributes: HashSet<AttrStoreKey>,
/// Attribute names first introduced with `ChangeType::Creation` in this delta.
/// Skipped during serialization — this is an indexer-internal hint, not part of the public
/// delta format.
#[serde(skip)]
pub created_attributes: HashSet<AttrStoreKey>,
}

impl ProtocolComponentStateDelta {
Expand All @@ -172,7 +177,12 @@ impl ProtocolComponentStateDelta {
updated_attributes: HashMap<AttrStoreKey, StoreVal>,
deleted_attributes: HashSet<AttrStoreKey>,
) -> Self {
Self { component_id: component_id.to_string(), updated_attributes, deleted_attributes }
Self {
component_id: component_id.to_string(),
updated_attributes,
deleted_attributes,
created_attributes: HashSet::new(),
}
}

/// Merges this update with another one.
Expand All @@ -196,6 +206,7 @@ impl ProtocolComponentStateDelta {
}
for attr in &other.deleted_attributes {
self.updated_attributes.remove(attr);
self.created_attributes.remove(attr);
}
for attr in other.updated_attributes.keys() {
self.deleted_attributes.remove(attr);
Expand All @@ -204,6 +215,8 @@ impl ProtocolComponentStateDelta {
.extend(other.updated_attributes);
self.deleted_attributes
.extend(other.deleted_attributes);
self.created_attributes
.extend(other.created_attributes);
Ok(())
}
}
Expand Down Expand Up @@ -273,6 +286,7 @@ impl From<dto::ProtocolStateDelta> for ProtocolComponentStateDelta {
component_id: value.component_id,
updated_attributes: value.updated_attributes,
deleted_attributes: value.deleted_attributes,
created_attributes: HashSet::new(),
}
}
}
Expand Down Expand Up @@ -332,6 +346,7 @@ mod test {
component_id: id,
updated_attributes: attributes1,
deleted_attributes: HashSet::new(),
..Default::default()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3582,6 +3582,7 @@ mod tests {
Bytes::from(1u64).lpad(32, 0),
)]),
deleted_attributes: HashSet::new(),
..Default::default()
},
)]),
..Default::default()
Expand Down Expand Up @@ -3731,6 +3732,7 @@ mod tests {
Bytes::from(1u64).lpad(32, 0),
)]),
deleted_attributes: HashSet::new(),
..Default::default()
},
)]),
..Default::default()
Expand Down Expand Up @@ -3854,6 +3856,7 @@ mod tests {
Bytes::from(version as u64).lpad(32, 0),
)]),
deleted_attributes: HashSet::new(),
..Default::default()
},
)]),
..Default::default()
Expand Down
5 changes: 4 additions & 1 deletion crates/tycho-indexer/src/extractor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ pub trait Extractor: Send + Sync {

/// Ensures all protocol types this extractor needs are registered in
/// storage. Safe to call multiple times.
async fn ensure_protocol_types(&self);
///
/// # Errors
/// Returns an [`ExtractionError`] if the protocol types could not be persisted.
async fn ensure_protocol_types(&self) -> Result<(), ExtractionError>;

/// Returns the current stream cursor, or an empty string if no block has
/// been processed yet. At startup this reflects the last persisted cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn add_default_attributes(mut changes: BlockChanges, attributes: &[&str]) ->
component_id: c_id.clone(),
updated_attributes: default_attr,
deleted_attributes: HashSet::new(),
created_attributes: HashSet::new(),
},
);
}
Expand Down Expand Up @@ -131,6 +132,7 @@ mod test {
Bytes::from(1_u64.to_be_bytes()),
)]),
deleted_attributes: HashSet::new(),
..Default::default()
},
)]),
protocol_components: HashMap::from([(
Expand Down Expand Up @@ -175,6 +177,7 @@ mod test {
("liquidity".to_string(), Bytes::zero(32)),
]),
deleted_attributes: HashSet::new(),
..Default::default()
},
)]),
protocol_components: changes
Expand Down Expand Up @@ -225,6 +228,7 @@ mod test {
Bytes::from(1_u64.to_be_bytes()),
)]),
deleted_attributes: HashSet::new(),
..Default::default()
},
)]),
..Default::default()
Expand Down
10 changes: 8 additions & 2 deletions crates/tycho-indexer/src/extractor/protobuf_deserialisation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,16 @@ impl TryFromMessage for ProtocolComponentStateDelta {
fn try_from_message(args: Self::Args<'_>) -> Result<Self, ExtractionError> {
let msg = args;

let (mut updates, mut deletions) = (HashMap::new(), HashSet::new());
let (mut updates, mut deletions, mut created) =
(HashMap::new(), HashSet::new(), HashSet::new());

for attribute in msg.attributes.into_iter() {
match ChangeType::try_from_message(attribute.change())? {
ChangeType::Update | ChangeType::Creation => {
ChangeType::Creation => {
created.insert(attribute.name.clone());
updates.insert(attribute.name, Bytes::from(attribute.value));
}
ChangeType::Update => {
updates.insert(attribute.name, Bytes::from(attribute.value));
}
ChangeType::Deletion => {
Expand All @@ -233,6 +238,7 @@ impl TryFromMessage for ProtocolComponentStateDelta {
component_id: msg.component_id,
updated_attributes: updates,
deleted_attributes: deletions,
created_attributes: created,
})
}
}
Expand Down
Loading
Loading