From 1eec9cdedaa6369d1f6d2c601aa343e1999cc900 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 24 Jun 2025 16:27:58 +1000 Subject: [PATCH 1/3] check if work info returned by contract is zero --- .../src/validators/synthetic_data/mod.rs | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/crates/validator/src/validators/synthetic_data/mod.rs b/crates/validator/src/validators/synthetic_data/mod.rs index bceab470..c5539858 100644 --- a/crates/validator/src/validators/synthetic_data/mod.rs +++ b/crates/validator/src/validators/synthetic_data/mod.rs @@ -1,7 +1,7 @@ use crate::metrics::MetricsContext; use crate::store::redis::RedisStore; use crate::validators::synthetic_data::types::{InvalidationType, RejectionInfo}; -use alloy::primitives::U256; +use alloy::primitives::{Address, U256}; use anyhow::{Context as _, Error, Result}; use chrono; use futures::future; @@ -483,12 +483,21 @@ impl SyntheticDataValidator { .get(&key) .await .map_err(|e| Error::msg(format!("Failed to get work info: {}", e)))?; - work_info - .map(|work_info| { - serde_json::from_str(&work_info) - .map_err(|e| Error::msg(format!("Failed to parse work info: {}", e))) - }) - .transpose() + + match work_info { + Some(work_info_str) => { + let work_info: WorkInfo = serde_json::from_str(&work_info_str) + .map_err(|e| Error::msg(format!("Failed to parse work info: {}", e)))?; + + // Check if work_info has invalid values + if work_info.node_id == Address::ZERO || work_info.work_units == U256::ZERO { + Ok(None) + } else { + Ok(Some(work_info)) + } + } + None => Ok(None), + } } async fn get_file_name_for_work_key( @@ -896,7 +905,14 @@ impl SyntheticDataValidator { let pool_id = self.pool_id; async move { match validator.get_work_info(pool_id, &work_key).await { - Ok(work_info) => Some((work_key, work_info)), + Ok(work_info) => { + if work_info.node_id == Address::ZERO || work_info.work_units == U256::ZERO { + error!("Got zero/default work info for key {}, likely not yet in contract", work_key); + None + } else { + Some((work_key, work_info)) + } + } Err(e) => { error!("Failed to get work info for {}: {}", work_key, e); None From aeff76d73df61c54da7911468879684a9b6f73c3 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 24 Jun 2025 16:55:26 +1000 Subject: [PATCH 2/3] fix test --- .../synthetic_data_validator.rs | 8 +++++ .../src/validators/synthetic_data/mod.rs | 35 ++++++++++++++----- .../validators/synthetic_data/tests/mod.rs | 9 +++-- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/crates/shared/src/web3/contracts/implementations/work_validators/synthetic_data_validator.rs b/crates/shared/src/web3/contracts/implementations/work_validators/synthetic_data_validator.rs index 4a76d6a2..a3a2d36f 100644 --- a/crates/shared/src/web3/contracts/implementations/work_validators/synthetic_data_validator.rs +++ b/crates/shared/src/web3/contracts/implementations/work_validators/synthetic_data_validator.rs @@ -20,6 +20,14 @@ pub struct WorkInfo { pub work_units: U256, } +impl WorkInfo { + pub fn is_valid(&self) -> bool { + self.node_id != Address::ZERO + && self.work_units != U256::ZERO + && self.provider != Address::ZERO + } +} + impl SyntheticDataWorkValidator

{ pub fn new(address: Address, provider: P, abi_file_path: &str) -> Self { let instance = Contract::new(address, provider, abi_file_path); diff --git a/crates/validator/src/validators/synthetic_data/mod.rs b/crates/validator/src/validators/synthetic_data/mod.rs index c5539858..3e879165 100644 --- a/crates/validator/src/validators/synthetic_data/mod.rs +++ b/crates/validator/src/validators/synthetic_data/mod.rs @@ -1,7 +1,7 @@ use crate::metrics::MetricsContext; use crate::store::redis::RedisStore; use crate::validators::synthetic_data::types::{InvalidationType, RejectionInfo}; -use alloy::primitives::{Address, U256}; +use alloy::primitives::U256; use anyhow::{Context as _, Error, Result}; use chrono; use futures::future; @@ -414,6 +414,7 @@ impl SyntheticDataValidator { let validation_info = self.get_work_validation_info_from_redis(work_key).await?; Ok(validation_info.map(|info| info.status)) } + #[cfg(test)] async fn get_work_validation_info_from_redis( &self, @@ -483,14 +484,14 @@ impl SyntheticDataValidator { .get(&key) .await .map_err(|e| Error::msg(format!("Failed to get work info: {}", e)))?; - + match work_info { Some(work_info_str) => { let work_info: WorkInfo = serde_json::from_str(&work_info_str) .map_err(|e| Error::msg(format!("Failed to parse work info: {}", e)))?; - + // Check if work_info has invalid values - if work_info.node_id == Address::ZERO || work_info.work_units == U256::ZERO { + if !work_info.is_valid() { Ok(None) } else { Ok(Some(work_info)) @@ -906,11 +907,12 @@ impl SyntheticDataValidator { async move { match validator.get_work_info(pool_id, &work_key).await { Ok(work_info) => { - if work_info.node_id == Address::ZERO || work_info.work_units == U256::ZERO { - error!("Got zero/default work info for key {}, likely not yet in contract", work_key); - None - } else { - Some((work_key, work_info)) + match !work_info.is_valid() { + true => { + error!("Got zero/default work info for key {}, likely not yet in contract", work_key); + None + } + false => Some((work_key, work_info)), } } Err(e) => { @@ -1263,6 +1265,7 @@ impl SyntheticDataValidator { } pub async fn process_group_status_check(&self, group: ToplocGroup) -> Result<(), Error> { + println!("processing group status check: {:?}", group); let toploc_config = self .find_matching_toploc_config(&group.prefix) .ok_or(Error::msg(format!( @@ -1274,6 +1277,8 @@ impl SyntheticDataValidator { .get_group_file_validation_status(&group.group_file_name) .await?; let toploc_config_name = toploc_config.name(); + println!("toploc config name: {:?}", toploc_config_name); + println!("status: {:?}", status); if let Some(metrics) = &self.metrics { metrics.record_group_validation_status( @@ -1298,6 +1303,9 @@ impl SyntheticDataValidator { let mut toploc_nodes_to_invalidate = Vec::new(); let mut nodes_with_wrong_work_unit_claims = Vec::new(); + println!("node_work_units: {:?}", node_work_units); + println!("total_claimed_units: {:?}", total_claimed_units); + println!("toploc status: {:?}", status.status); if status.status == ValidationResult::Reject { let rejected_nodes = self.handle_group_toploc_rejection(&group, &status).await?; @@ -1315,6 +1323,15 @@ impl SyntheticDataValidator { nodes_with_wrong_work_unit_claims.extend(wrong_claim_nodes); } + println!( + "toploc_nodes_to_invalidate: {:?}", + toploc_nodes_to_invalidate + ); + println!( + "nodes_with_wrong_work_unit_claims: {:?}", + nodes_with_wrong_work_unit_claims + ); + for work_key in &group.sorted_work_keys { if let Some(work_info) = self.get_work_info_from_redis(work_key).await? { let node_id_str = work_info.node_id.to_string(); diff --git a/crates/validator/src/validators/synthetic_data/tests/mod.rs b/crates/validator/src/validators/synthetic_data/tests/mod.rs index 1de68e08..b4c5efbc 100644 --- a/crates/validator/src/validators/synthetic_data/tests/mod.rs +++ b/crates/validator/src/validators/synthetic_data/tests/mod.rs @@ -136,6 +136,7 @@ async fn test_build_validation_plan() -> Result<(), Error> { let work_info = WorkInfo { node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + work_units: U256::from(1000), ..Default::default() }; for work_key in work_keys.clone() { @@ -337,7 +338,7 @@ async fn test_group_e2e_accept() -> Result<(), Error> { const FILE_SHA: &str = "c257e3d3fe866a00df1285f8bbbe601fed6b85229d983bbbb75e19a068346641"; const GROUP_ID: &str = "3450756714426841564"; - const NODE_ADDRESS: &str = "0x0000000000000000000000000000000000000000"; + const NODE_ADDRESS: &str = "0xA1DDe6E4d2F127960e7C61f90a8b354Bc306bd2a"; let mock_storage = MockStorageProvider::new(); mock_storage @@ -372,7 +373,7 @@ async fn test_group_e2e_accept() -> Result<(), Error> { format!("/statusgroup/dataset/samplingn-{}-1-0.parquet", GROUP_ID).as_str(), ) .with_status(200) - .with_body(r#"{"status": "accept"}"#) + .with_body(r#"{"status": "accept", "input_flops": 1, "output_flops": 1000}"#) .create(); let storage_provider = Arc::new(mock_storage); @@ -403,6 +404,7 @@ async fn test_group_e2e_accept() -> Result<(), Error> { let work_info = WorkInfo { node_id: Address::from_str(NODE_ADDRESS).unwrap(), + work_units: U256::from(1000), ..Default::default() }; for work_key in work_keys.clone() { @@ -781,6 +783,7 @@ async fn test_incomplete_group_recovery() -> Result<(), Error> { // Add work info for only the first file (making the group incomplete) let work_info = WorkInfo { node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + work_units: U256::from(1000), ..Default::default() }; validator @@ -880,6 +883,7 @@ async fn test_expired_incomplete_group_soft_invalidation() -> Result<(), Error> // Add work info for only the first file (making the group incomplete) let work_info = WorkInfo { node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + work_units: U256::from(1000), ..Default::default() }; validator @@ -982,6 +986,7 @@ async fn test_incomplete_group_status_tracking() -> Result<(), Error> { // Add work info for only 1 of 3 expected files let work_info = WorkInfo { node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + work_units: U256::from(1000), ..Default::default() }; validator From 847f78a457f69f2b47362f78f65486bd62f351b2 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 24 Jun 2025 17:16:57 +1000 Subject: [PATCH 3/3] adjust work info samples --- .../src/validators/synthetic_data/tests/mod.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/validator/src/validators/synthetic_data/tests/mod.rs b/crates/validator/src/validators/synthetic_data/tests/mod.rs index b4c5efbc..806eaec5 100644 --- a/crates/validator/src/validators/synthetic_data/tests/mod.rs +++ b/crates/validator/src/validators/synthetic_data/tests/mod.rs @@ -135,7 +135,8 @@ async fn test_build_validation_plan() -> Result<(), Error> { ]; let work_info = WorkInfo { - node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + node_id: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), + provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), work_units: U256::from(1000), ..Default::default() }; @@ -405,6 +406,7 @@ async fn test_group_e2e_accept() -> Result<(), Error> { let work_info = WorkInfo { node_id: Address::from_str(NODE_ADDRESS).unwrap(), work_units: U256::from(1000), + provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), ..Default::default() }; for work_key in work_keys.clone() { @@ -477,12 +479,12 @@ async fn test_group_e2e_work_unit_mismatch() -> Result<(), Error> { ..Default::default() }; - const HONEST_NODE_ADDRESS: &str = "0x0000000000000000000000000000000000000001"; + const HONEST_NODE_ADDRESS: &str = "0x182555b0Ab39EE313f22c07dbe88D950385b1f69"; const HONEST_FILE_SHA: &str = "c257e3d3fe866a00df1285f8bbbe601fed6b85229d983bbbb75e19a068346641"; const EXCESSIVE_FILE_SHA: &str = "88e4672c19e5a10bff2e23d223f8bfc38ae1425feaa18db9480e631a4fd98edf"; - const EXCESSIVE_NODE_ADDRESS: &str = "0x0000000000000000000000000000000000000002"; + const EXCESSIVE_NODE_ADDRESS: &str = "0x182555b0Ab39EE313f22c07dbe88D950385b1f68"; const GROUP_ID: &str = "3456714426841564"; let mock_storage = MockStorageProvider::new(); @@ -564,11 +566,13 @@ async fn test_group_e2e_work_unit_mismatch() -> Result<(), Error> { let work_info_1 = WorkInfo { node_id: Address::from_str(HONEST_NODE_ADDRESS).unwrap(), work_units: U256::from(EXPECTED_WORK_UNITS), + provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), ..Default::default() }; let work_info_2 = WorkInfo { node_id: Address::from_str(EXCESSIVE_NODE_ADDRESS).unwrap(), work_units: U256::from(EXCESSIVE_WORK_UNITS), + provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f67").unwrap(), ..Default::default() }; @@ -784,6 +788,7 @@ async fn test_incomplete_group_recovery() -> Result<(), Error> { let work_info = WorkInfo { node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), work_units: U256::from(1000), + provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), ..Default::default() }; validator @@ -884,6 +889,7 @@ async fn test_expired_incomplete_group_soft_invalidation() -> Result<(), Error> let work_info = WorkInfo { node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), work_units: U256::from(1000), + provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), ..Default::default() }; validator @@ -985,8 +991,9 @@ async fn test_incomplete_group_status_tracking() -> Result<(), Error> { // Add work info for only 1 of 3 expected files let work_info = WorkInfo { - node_id: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + node_id: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), work_units: U256::from(1000), + provider: Address::from_str("0x182555b0Ab39EE313f22c07dbe88D950385b1f68").unwrap(), ..Default::default() }; validator