Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ pub struct WorkInfo {
pub work_units: U256,
}

impl WorkInfo {
pub fn is_valid(&self) -> bool {
self.node_id != Address::ZERO
&& self.work_units != U256::ZERO
&& self.provider != Address::ZERO
}
}

impl<P: alloy_provider::Provider> SyntheticDataWorkValidator<P> {
pub fn new(address: Address, provider: P, abi_file_path: &str) -> Self {
let instance = Contract::new(address, provider, abi_file_path);
Expand Down
47 changes: 40 additions & 7 deletions crates/validator/src/validators/synthetic_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ impl SyntheticDataValidator<WalletProvider> {
let validation_info = self.get_work_validation_info_from_redis(work_key).await?;
Ok(validation_info.map(|info| info.status))
}

#[cfg(test)]
async fn get_work_validation_info_from_redis(
&self,
Expand Down Expand Up @@ -483,12 +484,21 @@ impl SyntheticDataValidator<WalletProvider> {
.get(&key)
.await
.map_err(|e| Error::msg(format!("Failed to get work info: {}", e)))?;
work_info
.map(|work_info| {
serde_json::from_str(&work_info)
.map_err(|e| Error::msg(format!("Failed to parse work info: {}", e)))
})
.transpose()

match work_info {
Some(work_info_str) => {
let work_info: WorkInfo = serde_json::from_str(&work_info_str)
.map_err(|e| Error::msg(format!("Failed to parse work info: {}", e)))?;

// Check if work_info has invalid values
if !work_info.is_valid() {
Ok(None)
} else {
Ok(Some(work_info))
}
}
None => Ok(None),
}
}

async fn get_file_name_for_work_key(
Expand Down Expand Up @@ -896,7 +906,15 @@ impl SyntheticDataValidator<WalletProvider> {
let pool_id = self.pool_id;
async move {
match validator.get_work_info(pool_id, &work_key).await {
Ok(work_info) => Some((work_key, work_info)),
Ok(work_info) => {
match !work_info.is_valid() {
true => {
error!("Got zero/default work info for key {}, likely not yet in contract", work_key);
None
}
false => Some((work_key, work_info)),
Comment thread
JannikSt marked this conversation as resolved.
}
}
Err(e) => {
error!("Failed to get work info for {}: {}", work_key, e);
None
Expand Down Expand Up @@ -1247,6 +1265,7 @@ impl SyntheticDataValidator<WalletProvider> {
}

pub async fn process_group_status_check(&self, group: ToplocGroup) -> Result<(), Error> {
println!("processing group status check: {:?}", group);
let toploc_config = self
.find_matching_toploc_config(&group.prefix)
.ok_or(Error::msg(format!(
Expand All @@ -1258,6 +1277,8 @@ impl SyntheticDataValidator<WalletProvider> {
.get_group_file_validation_status(&group.group_file_name)
.await?;
let toploc_config_name = toploc_config.name();
println!("toploc config name: {:?}", toploc_config_name);
println!("status: {:?}", status);

if let Some(metrics) = &self.metrics {
metrics.record_group_validation_status(
Expand All @@ -1282,6 +1303,9 @@ impl SyntheticDataValidator<WalletProvider> {

let mut toploc_nodes_to_invalidate = Vec::new();
let mut nodes_with_wrong_work_unit_claims = Vec::new();
println!("node_work_units: {:?}", node_work_units);
println!("total_claimed_units: {:?}", total_claimed_units);
println!("toploc status: {:?}", status.status);

if status.status == ValidationResult::Reject {
let rejected_nodes = self.handle_group_toploc_rejection(&group, &status).await?;
Expand All @@ -1299,6 +1323,15 @@ impl SyntheticDataValidator<WalletProvider> {
nodes_with_wrong_work_unit_claims.extend(wrong_claim_nodes);
}

println!(
"toploc_nodes_to_invalidate: {:?}",
toploc_nodes_to_invalidate
);
println!(
"nodes_with_wrong_work_unit_claims: {:?}",
nodes_with_wrong_work_unit_claims
);

for work_key in &group.sorted_work_keys {
if let Some(work_info) = self.get_work_info_from_redis(work_key).await? {
let node_id_str = work_info.node_id.to_string();
Expand Down
24 changes: 18 additions & 6 deletions crates/validator/src/validators/synthetic_data/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ async fn test_build_validation_plan() -> Result<(), Error> {
];

let work_info = WorkInfo {
node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
node_id: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
work_units: U256::from(1000),
..Default::default()
};
for work_key in work_keys.clone() {
Expand Down Expand Up @@ -337,7 +339,7 @@ async fn test_group_e2e_accept() -> Result<(), Error> {

const FILE_SHA: &str = "c257e3d3fe866a00df1285f8bbbe601fed6b85229d983bbbb75e19a068346641";
const GROUP_ID: &str = "3450756714426841564";
const NODE_ADDRESS: &str = "0x0000000000000000000000000000000000000000";
const NODE_ADDRESS: &str = "0xA1DDe6E4d2F127960e7C61f90a8b354Bc306bd2a";

let mock_storage = MockStorageProvider::new();
mock_storage
Expand Down Expand Up @@ -372,7 +374,7 @@ async fn test_group_e2e_accept() -> Result<(), Error> {
format!("/statusgroup/dataset/samplingn-{}-1-0.parquet", GROUP_ID).as_str(),
)
.with_status(200)
.with_body(r#"{"status": "accept"}"#)
.with_body(r#"{"status": "accept", "input_flops": 1, "output_flops": 1000}"#)
.create();

let storage_provider = Arc::new(mock_storage);
Expand Down Expand Up @@ -403,6 +405,8 @@ async fn test_group_e2e_accept() -> Result<(), Error> {

let work_info = WorkInfo {
node_id: Address::from_str(NODE_ADDRESS).unwrap(),
work_units: U256::from(1000),
provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
..Default::default()
};
for work_key in work_keys.clone() {
Expand Down Expand Up @@ -475,12 +479,12 @@ async fn test_group_e2e_work_unit_mismatch() -> Result<(), Error> {
..Default::default()
};

const HONEST_NODE_ADDRESS: &str = "0x0000000000000000000000000000000000000001";
const HONEST_NODE_ADDRESS: &str = "0x182555b0Ab39EE313f22c07dbe88D950385b1f69";
const HONEST_FILE_SHA: &str =
"c257e3d3fe866a00df1285f8bbbe601fed6b85229d983bbbb75e19a068346641";
const EXCESSIVE_FILE_SHA: &str =
"88e4672c19e5a10bff2e23d223f8bfc38ae1425feaa18db9480e631a4fd98edf";
const EXCESSIVE_NODE_ADDRESS: &str = "0x0000000000000000000000000000000000000002";
const EXCESSIVE_NODE_ADDRESS: &str = "0x182555b0Ab39EE313f22c07dbe88D950385b1f68";
const GROUP_ID: &str = "3456714426841564";

let mock_storage = MockStorageProvider::new();
Expand Down Expand Up @@ -562,11 +566,13 @@ async fn test_group_e2e_work_unit_mismatch() -> Result<(), Error> {
let work_info_1 = WorkInfo {
node_id: Address::from_str(HONEST_NODE_ADDRESS).unwrap(),
work_units: U256::from(EXPECTED_WORK_UNITS),
provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
..Default::default()
};
let work_info_2 = WorkInfo {
node_id: Address::from_str(EXCESSIVE_NODE_ADDRESS).unwrap(),
work_units: U256::from(EXCESSIVE_WORK_UNITS),
provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f67").unwrap(),
..Default::default()
};

Expand Down Expand Up @@ -781,6 +787,8 @@ async fn test_incomplete_group_recovery() -> Result<(), Error> {
// Add work info for only the first file (making the group incomplete)
let work_info = WorkInfo {
node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
work_units: U256::from(1000),
provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
..Default::default()
};
validator
Expand Down Expand Up @@ -880,6 +888,8 @@ async fn test_expired_incomplete_group_soft_invalidation() -> Result<(), Error>
// Add work info for only the first file (making the group incomplete)
let work_info = WorkInfo {
node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
work_units: U256::from(1000),
provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
..Default::default()
};
validator
Expand Down Expand Up @@ -981,7 +991,9 @@ async fn test_incomplete_group_status_tracking() -> Result<(), Error> {

// Add work info for only 1 of 3 expected files
let work_info = WorkInfo {
node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
node_id: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
work_units: U256::from(1000),
provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(),
..Default::default()
};
validator
Expand Down