Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions chain-state/src/da_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,31 @@ async fn check_data_logs(chain_state: Arc<ChainState>, from: u64, to: u64) -> Re
}

async fn check_data_upload(chain_state: Arc<ChainState>, l: u64, r: u64) -> Result<()> {
// Fetch epoch info to validate window
let current_epoch = chain_state.da_signers.epoch_number().call().await?.as_u64();
let epoch_window_size = chain_state.da_entrance.epoch_window_size().call().await?.as_u64();

let filter: ethers::types::Filter = chain_state
.da_entrance
.data_upload_filter()
.from_block(l)
.to_block(r)
.address(chain_state.da_entrance.address().into())
.filter;

for log in chain_state.provider.get_logs(&filter).await? {
match DataUploadFilter::decode_log(&RawLog {
topics: log.topics,
data: log.data.to_vec(),
}) {
Ok(event) => {
let epoch = event.epoch.as_u64();

// Skip if epoch is too old (already pruned or will be soon)
if epoch + epoch_window_size < current_epoch {
continue;
}

let quorum_id = event.quorum_id.as_u64();
let maybe_blob_status = chain_state
.db
Expand Down Expand Up @@ -140,20 +151,31 @@ async fn check_data_upload(chain_state: Arc<ChainState>, l: u64, r: u64) -> Resu
}

async fn check_data_verified(chain_state: Arc<ChainState>, l: u64, r: u64) -> Result<()> {
// Fetch epoch info to validate window
let current_epoch = chain_state.da_signers.epoch_number().call().await?.as_u64();
let epoch_window_size = chain_state.da_entrance.epoch_window_size().call().await?.as_u64();

let filter: ethers::types::Filter = chain_state
.da_entrance
.erasure_commitment_verified_filter()
.from_block(l)
.to_block(r)
.address(chain_state.da_entrance.address().into())
.filter;

for log in chain_state.provider.get_logs(&filter).await? {
match ErasureCommitmentVerifiedFilter::decode_log(&RawLog {
topics: log.topics,
data: log.data.to_vec(),
}) {
Ok(event) => {
let epoch = event.epoch.as_u64();

// Skip if epoch is too old
if epoch + epoch_window_size < current_epoch {
continue;
}

let quorum_id = event.quorum_id.as_u64();
let maybe_blob_status = chain_state
.db
Expand Down
11 changes: 11 additions & 0 deletions grpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,17 @@ impl SignerService {
req: &SignRequest,
storage_root: [u8; 32],
) -> Result<(), Status> {
// --- Added Epoch Validation ---
let current_epoch = self.chain_state.da_signers.epoch_number().call().await
.map_err(|e| Status::new(Code::Internal, e.to_string()))?.as_u64();
let epoch_window_size = self.chain_state.da_entrance.epoch_window_size().call().await
.map_err(|e| Status::new(Code::Internal, e.to_string()))?.as_u64();

if req.epoch + epoch_window_size < current_epoch {
return Err(Status::new(Code::InvalidArgument, "epoch too old"));
}
// ------------------------------

let maybe_blob_status = self
.db
.read()
Expand Down
11 changes: 11 additions & 0 deletions storage/src/blob_status_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub trait BlobStatusDB {
quorum_id: u64,
storage_root: [u8; 32],
) -> Result<Option<BlobStatus>>;
// Added prune method to trait
async fn prune(&self, epoch: u64) -> Result<()>;
}

fn get_blob_key(epoch: u64, quorum_id: u64, storage_root: [u8; 32]) -> Vec<u8> {
Expand Down Expand Up @@ -80,4 +82,13 @@ impl BlobStatusDB for Storage {
}
Ok(None)
}

// Added implementation of prune
async fn prune(&self, epoch: u64) -> Result<()> {
let prefix: Vec<u8> = epoch.to_be_bytes().to_vec();
let mut tx = self.db.transaction();
tx.delete_prefix(COL_BLOB_STATUS, &prefix);
self.db.write(tx)?;
Ok(())
}
}
9 changes: 8 additions & 1 deletion storage/src/slice_db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::BTreeSet, iter::once};

use crate::COL_SLICE;
// Added COL_BLOB_STATUS here
use crate::{COL_SLICE, COL_BLOB_STATUS};

use super::Storage;
use anyhow::{bail, Result};
Expand Down Expand Up @@ -241,11 +242,17 @@ impl SliceDB for Storage {
let blob_prefix: Vec<u8> = once(BLOB_PREFIX).chain(epoch.to_be_bytes()).collect();
let slice_prefix: Vec<u8> = once(SLICE_PREFIX).chain(epoch.to_be_bytes()).collect();
let data_prefix: Vec<u8> = once(DATA_PREFIX).chain(epoch.to_be_bytes()).collect();

// Added this line to create the status prefix
let blob_status_prefix: Vec<u8> = epoch.to_be_bytes().to_vec();

let mut tx = self.db.transaction();
tx.delete_prefix(COL_SLICE, &blob_prefix);
tx.delete_prefix(COL_SLICE, &slice_prefix);
tx.delete_prefix(COL_SLICE, &data_prefix);

// Added this line to prune COL_BLOB_STATUS
tx.delete_prefix(COL_BLOB_STATUS, &blob_status_prefix);

self.db.write(tx)?;
Ok(())
Expand Down