diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index ecfa9692..d362708e 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -312,34 +312,13 @@ impl LogStoreWrite for LogManager { .get_tx_by_seq_number(tx_seq)? .ok_or_else(|| anyhow!("finalize_tx with tx missing: tx_seq={}", tx_seq))?; - self.padding_rear_data(&tx)?; - - let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size); - // TODO: Check completeness without loading all data in memory. - // TODO: Should we double check the tx merkle root? - if self.check_data_completed(tx.start_entry_index, tx_end_index)? { - let same_root_seq_list = self - .tx_store - .get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; - // Check if there are other same-root transaction not finalized. - if same_root_seq_list.first() == Some(&tx_seq) { - // If this is the first tx with this data root, copy and finalize all same-root txs. - self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?; - } else { - // If this is not the first tx with this data root, and the first one is not finalized. - let maybe_first_seq = same_root_seq_list.first().cloned(); - if let Some(first_seq) = maybe_first_seq { - if !self.check_tx_completed(first_seq)? { - self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?; - } - } - } - - self.tx_store.finalize_tx(tx_seq)?; - Ok(()) - } else { - bail!("finalize tx with data missing: tx_seq={}", tx_seq) - } + self.finalize_tx_common(&tx).map_err(|err| { + anyhow!( + "finalize tx with data missing: tx_seq={} err={}", + tx.seq, + err + ) + }) } fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result { @@ -358,36 +337,15 @@ impl LogStoreWrite for LogManager { return Ok(false); } - self.padding_rear_data(&tx)?; - - // TODO: Check completeness without loading all data in memory. - // TODO: Should we double check the tx merkle root? - let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size); - if self.check_data_completed(tx.start_entry_index, tx_end_index)? { - let same_root_seq_list = self - .tx_store - .get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; - // Check if there are other same-root transaction not finalized. - - if same_root_seq_list.first() == Some(&tx_seq) { - self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?; - } else { - // If this is not the first tx with this data root, copy and finalize the first one. - let maybe_first_seq = same_root_seq_list.first().cloned(); - if let Some(first_seq) = maybe_first_seq { - if !self.check_tx_completed(first_seq)? { - self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?; - } - } - } - - self.tx_store.finalize_tx(tx_seq)?; - - metrics::FINALIZE_TX_WITH_HASH.update_since(start_time); - Ok(true) - } else { - bail!("finalize tx hash with data missing: tx_seq={}", tx_seq) - } + self.finalize_tx_common(&tx).map_err(|err| { + anyhow!( + "finalize tx hash with data missing: tx_seq={} err={}", + tx.seq, + err + ) + })?; + metrics::FINALIZE_TX_WITH_HASH.update_since(start_time); + Ok(true) } fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> { @@ -597,6 +555,16 @@ impl LogStoreRead for LogManager { Ok(seq_list.first().cloned()) } + fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result> { + let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?; + for (index, tx_seq) in seq_list.iter().enumerate() { + if !self.tx_store.check_tx_pruned(*tx_seq)? { + return Ok(seq_list[index..].to_vec()); + } + } + Ok(Vec::new()) + } + fn get_chunk_with_proof_by_tx_and_index( &self, tx_seq: u64, @@ -1234,6 +1202,36 @@ impl LogManager { Ok(()) } + fn finalize_tx_common(&self, tx: &Transaction) -> Result<()> { + let tx_seq = tx.seq; + self.padding_rear_data(tx)?; + + let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size); + // TODO: Check completeness without loading all data in memory. + // TODO: Should we double check the tx merkle root? + if self.check_data_completed(tx.start_entry_index, tx_end_index)? { + let same_root_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; + // Check if there are other same-root transaction not finalized. + if same_root_seq_list.first() == Some(&tx_seq) { + // If this is the first tx with this data root, copy and finalize all same-root txs. + self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?; + } else { + // If this is not the first tx with this data root, and the first one is not finalized. + let maybe_first_seq = same_root_seq_list.first().cloned(); + if let Some(first_seq) = maybe_first_seq { + if !self.check_tx_completed(first_seq)? { + self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?; + } + } + } + + self.tx_store.finalize_tx(tx_seq)?; + Ok(()) + } else { + bail!("data missing") + } + } + fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec) -> Result<()> { let start_time = Instant::now(); diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 06b24b86..6dc1e417 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -42,6 +42,9 @@ pub trait LogStoreRead: LogStoreChunkRead { need_available: bool, ) -> Result>; + /// Return seq list filtered to unpruned entries. + fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result>; + /// If all txs are not finalized, return the first one if need available is false. /// Otherwise, return the first finalized tx. fn get_tx_by_data_root( diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index ee2b3c41..1f86f656 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -199,11 +199,33 @@ impl TransactionStore { #[instrument(skip(self))] pub fn prune_tx(&self, tx_seq: u64) -> Result<()> { - Ok(self.data_kvdb.put( + let tx = self + .get_tx_by_seq_number(tx_seq)? + .ok_or_else(|| anyhow!("prune_tx with tx missing: tx_seq={}", tx_seq))?; + + let mut flow_db_tx = self.flow_kvdb.transaction(); + let mut data_db_tx = self.data_kvdb.transaction(); + data_db_tx.put( COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TxStatus::Pruned.into()], - )?) + ); + + let mut tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; + tx_seq_list.retain(|seq| *seq != tx_seq); + if tx_seq_list.is_empty() { + flow_db_tx.delete(COL_TX_DATA_ROOT_INDEX, tx.data_merkle_root.as_bytes()); + } else { + flow_db_tx.put( + COL_TX_DATA_ROOT_INDEX, + tx.data_merkle_root.as_bytes(), + &tx_seq_list.as_ssz_bytes(), + ); + } + + self.data_kvdb.write(data_db_tx)?; + self.flow_kvdb.write(flow_db_tx)?; + Ok(()) } pub fn get_tx_status(&self, tx_seq: u64) -> Result> {