Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 56 additions & 58 deletions node/storage/src/log_store/log_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,34 +312,13 @@ impl LogStoreWrite for LogManager {
.get_tx_by_seq_number(tx_seq)?
.ok_or_else(|| anyhow!("finalize_tx with tx missing: tx_seq={}", tx_seq))?;

self.padding_rear_data(&tx)?;

let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
// TODO: Check completeness without loading all data in memory.
// TODO: Should we double check the tx merkle root?
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
let same_root_seq_list = self
.tx_store
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
// Check if there are other same-root transaction not finalized.
if same_root_seq_list.first() == Some(&tx_seq) {
// If this is the first tx with this data root, copy and finalize all same-root txs.
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
} else {
// If this is not the first tx with this data root, and the first one is not finalized.
let maybe_first_seq = same_root_seq_list.first().cloned();
if let Some(first_seq) = maybe_first_seq {
if !self.check_tx_completed(first_seq)? {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
}
}
}

self.tx_store.finalize_tx(tx_seq)?;
Ok(())
} else {
bail!("finalize tx with data missing: tx_seq={}", tx_seq)
}
self.finalize_tx_common(&tx).map_err(|err| {
anyhow!(
"finalize tx with data missing: tx_seq={} err={}",
tx.seq,
err
)
})
}

fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
Expand All @@ -358,36 +337,15 @@ impl LogStoreWrite for LogManager {
return Ok(false);
}

self.padding_rear_data(&tx)?;

// TODO: Check completeness without loading all data in memory.
// TODO: Should we double check the tx merkle root?
let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
let same_root_seq_list = self
.tx_store
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
// Check if there are other same-root transaction not finalized.

if same_root_seq_list.first() == Some(&tx_seq) {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
} else {
// If this is not the first tx with this data root, copy and finalize the first one.
let maybe_first_seq = same_root_seq_list.first().cloned();
if let Some(first_seq) = maybe_first_seq {
if !self.check_tx_completed(first_seq)? {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
}
}
}

self.tx_store.finalize_tx(tx_seq)?;

metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
Ok(true)
} else {
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
}
self.finalize_tx_common(&tx).map_err(|err| {
anyhow!(
"finalize tx hash with data missing: tx_seq={} err={}",
tx.seq,
err
)
})?;
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
Ok(true)
}

fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> {
Expand Down Expand Up @@ -597,6 +555,16 @@ impl LogStoreRead for LogManager {
Ok(seq_list.first().cloned())
}

fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Vec<u64>> {
let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
for (index, tx_seq) in seq_list.iter().enumerate() {
if !self.tx_store.check_tx_pruned(*tx_seq)? {
return Ok(seq_list[index..].to_vec());
}
}
Ok(Vec::new())
}

fn get_chunk_with_proof_by_tx_and_index(
&self,
tx_seq: u64,
Expand Down Expand Up @@ -1234,6 +1202,36 @@ impl LogManager {
Ok(())
}

fn finalize_tx_common(&self, tx: &Transaction) -> Result<()> {
let tx_seq = tx.seq;
self.padding_rear_data(tx)?;

let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
// TODO: Check completeness without loading all data in memory.
// TODO: Should we double check the tx merkle root?
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
let same_root_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
// Check if there are other same-root transaction not finalized.
if same_root_seq_list.first() == Some(&tx_seq) {
// If this is the first tx with this data root, copy and finalize all same-root txs.
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
} else {
// If this is not the first tx with this data root, and the first one is not finalized.
let maybe_first_seq = same_root_seq_list.first().cloned();
if let Some(first_seq) = maybe_first_seq {
if !self.check_tx_completed(first_seq)? {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
}
}
}

self.tx_store.finalize_tx(tx_seq)?;
Ok(())
} else {
bail!("data missing")
}
}

fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let start_time = Instant::now();

Expand Down
3 changes: 3 additions & 0 deletions node/storage/src/log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub trait LogStoreRead: LogStoreChunkRead {
need_available: bool,
) -> Result<Option<u64>>;

/// Return seq list filtered to unpruned entries.
fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result<Vec<u64>>;

/// If all txs are not finalized, return the first one if need available is false.
/// Otherwise, return the first finalized tx.
fn get_tx_by_data_root(
Expand Down
26 changes: 24 additions & 2 deletions node/storage/src/log_store/tx_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,33 @@ impl TransactionStore {

#[instrument(skip(self))]
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self.data_kvdb.put(
let tx = self
.get_tx_by_seq_number(tx_seq)?
.ok_or_else(|| anyhow!("prune_tx with tx missing: tx_seq={}", tx_seq))?;

let mut flow_db_tx = self.flow_kvdb.transaction();
let mut data_db_tx = self.data_kvdb.transaction();
data_db_tx.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TxStatus::Pruned.into()],
)?)
);

let mut tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
tx_seq_list.retain(|seq| *seq != tx_seq);
if tx_seq_list.is_empty() {
flow_db_tx.delete(COL_TX_DATA_ROOT_INDEX, tx.data_merkle_root.as_bytes());
} else {
flow_db_tx.put(
COL_TX_DATA_ROOT_INDEX,
tx.data_merkle_root.as_bytes(),
&tx_seq_list.as_ssz_bytes(),
);
}

self.data_kvdb.write(data_db_tx)?;
self.flow_kvdb.write(flow_db_tx)?;
Ok(())
}

pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
Expand Down
Loading