diff --git a/durable-storage/src/lib.rs b/durable-storage/src/lib.rs index ba993d41407..3b79b030ed7 100644 --- a/durable-storage/src/lib.rs +++ b/durable-storage/src/lib.rs @@ -35,7 +35,6 @@ pub mod errors; pub mod key; mod merkle_layer; mod merkle_worker; -#[cfg_attr(not(test), expect(dead_code, reason = "Incomplete"))] -pub(crate) mod persistence_layer; +pub mod persistence_layer; pub mod registry; pub mod repo; diff --git a/durable-storage/src/persistence_layer.rs b/durable-storage/src/persistence_layer.rs index 1c4b3309792..d8c67c2af33 100644 --- a/durable-storage/src/persistence_layer.rs +++ b/durable-storage/src/persistence_layer.rs @@ -65,6 +65,10 @@ impl<'d> HashedData<'d> { let hash = Hash::hash_bytes(value); Self { hash, value } } + + pub fn hash(&self) -> Hash { + self.hash + } } #[derive(BorrowDecode, Encode)] diff --git a/src/riscv/lib/src/storage.rs b/src/riscv/lib/src/storage.rs index 9d2cfd10e36..5a4fa062f01 100644 --- a/src/riscv/lib/src/storage.rs +++ b/src/riscv/lib/src/storage.rs @@ -6,9 +6,7 @@ mod chunked_io; use std::io; -use std::io::Write; use std::path::Path; -use std::path::PathBuf; use bincode::Decode; use bincode::Encode; @@ -17,12 +15,21 @@ use bincode::error::EncodeError; use octez_riscv_data::hash::Hash; use octez_riscv_data::hash::HashError; use octez_riscv_data::serialisation; +use octez_riscv_durable_storage::commit::CommitId; +use octez_riscv_durable_storage::errors::Error; +use octez_riscv_durable_storage::errors::OperationalError; +use octez_riscv_durable_storage::persistence_layer::HashedData; +use octez_riscv_durable_storage::persistence_layer::PersistenceLayer; +use octez_riscv_durable_storage::repo::DirectoryManager; use thiserror::Error; const CHUNK_SIZE: usize = 4096; #[derive(Error, Debug)] pub enum StorageError { + #[error("Persistence layer error: {0}")] + PersistenceLayerError(#[from] octez_riscv_durable_storage::errors::Error), + #[error("IO error: {0}")] IoError(#[from] io::Error), @@ -38,136 +45,139 @@ pub enum StorageError { #[error("Hashing error")] HashError(#[from] HashError), - #[error("Data for hash {0} not found")] - NotFound(String), + #[error("Data not found")] + NotFound, #[error("Committed chunk {0} not found")] ChunkNotFound(String), } -#[derive(Debug, PartialEq)] +impl From for StorageError { + fn from(value: OperationalError) -> Self { + match value { + OperationalError::CommitNotFound => StorageError::NotFound, + _ => StorageError::PersistenceLayerError(Error::Operational(value)), + } + } +} + +#[derive(Debug)] pub struct Store { - path: Box, + persistence_layer: PersistenceLayer, } impl Store { - /// Initialise a store. Either create a new directory if `path` does not - /// exist or initialise in an existing directory. - /// Throws `StorageError::InvalidRepo` if `path` is a file. - pub fn init(path: impl AsRef) -> Result { - let path = path.as_ref().to_path_buf(); - if !path.exists() { - std::fs::create_dir(&path)?; - } else if path.metadata()?.is_file() { - return Err(StorageError::InvalidRepo); - } + pub fn init(dir_manager: &DirectoryManager) -> Result { Ok(Store { - path: path.into_boxed_path(), + persistence_layer: PersistenceLayer::new(&dir_manager)?, }) } - fn file_name_of_hash(hash: &Hash) -> String { - hex::encode(hash) - } - - fn path_of_hash(&self, hash: &Hash) -> PathBuf { - self.path.join(Self::file_name_of_hash(hash)) - } - - fn write_data_if_new(&self, file_name: PathBuf, data: &[u8]) -> Result<(), StorageError> { - match std::fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(file_name) - { - Ok(mut f) => f.write_all(data)?, - Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => (), - Err(e) => return Err(StorageError::IoError(e)), - } - Ok(()) - } - /// Store data and return its hash. The data is written to disk only if /// previously unseen. pub fn store(&self, data: &[u8]) -> Result { - let hash = Hash::hash_bytes(data); - let file_name = self.path_of_hash(&hash); - self.write_data_if_new(file_name, data)?; - Ok(hash) + let hashed_data = HashedData::from_value(data); + self.persistence_layer.blob_set(&hashed_data)?; + Ok(hashed_data.hash()) } /// Load data corresponding to `hash`, if found. pub fn load(&self, hash: &Hash) -> Result, StorageError> { - let file_name = self.path_of_hash(hash); - std::fs::read(file_name).map_err(|e| { - if e.kind() == io::ErrorKind::NotFound { - StorageError::NotFound(hex::encode(hash)) - } else { - StorageError::IoError(e) - } - }) + Ok(self + .persistence_layer + .blob_get(hash)? + .as_ref() + .into_iter() + .copied() + .collect::>()) } - /// Copy the data corresponding to `hash` to `path`. - pub fn copy(&self, hash: &Hash, path: impl AsRef) -> Result<(), StorageError> { - let source_path = self.path_of_hash(hash); - let target_path = path.as_ref().join(Self::file_name_of_hash(hash)); - std::fs::copy(source_path, target_path)?; + /// Copy a piece of data from this `Store` to another. + pub fn copy(&self, other: &Self, hash: &Hash) -> Result<(), StorageError> { + let bytes = self.load(hash)?; + other.store(&bytes)?; Ok(()) } } -#[derive(Debug, PartialEq)] +impl From for Store { + fn from(persistence_layer: PersistenceLayer) -> Self { + Self { persistence_layer } + } +} + pub struct Repo { - backend: Store, + backend: DirectoryManager, } impl Repo { /// Load or create new repo at `path`. + /// + /// Returns `Err(StorageError::InvalidRepo)` if the `path` is a file not a directory. pub fn load(path: impl AsRef) -> Result { + if path.as_ref().is_file() { + return Err(StorageError::InvalidRepo); + }; Ok(Repo { - backend: Store::init(path)?, + backend: DirectoryManager::new(path.as_ref())?, }) } pub fn close(self) {} - /// Create a new commit for `bytes` and return the commit id. + /// Create a new commit for `bytes` and return the commit id. pub fn commit(&mut self, bytes: &[u8]) -> Result { + let store = Store::init(&self.backend)?; + let mut commit = Vec::with_capacity(bytes.len().div_ceil(CHUNK_SIZE) * Hash::DIGEST_SIZE); for chunk in bytes.chunks(CHUNK_SIZE) { - let chunk_hash = self.backend.store(chunk)?; + let chunk_hash = store.store(chunk)?; commit.push(chunk_hash); } // A commit contains the list of all chunks needed to reconstruct `data`. let commit_bytes = serialisation::serialise(&commit)?; - self.backend.store(&commit_bytes) + let commit_id = CommitId::from(store.store(&commit_bytes)?); + + store.persistence_layer.commit(&self.backend, &commit_id)?; + + Ok(*commit_id.as_hash()) } /// Commit something serialisable and return the commit ID. pub fn commit_serialised(&mut self, subject: &impl Encode) -> Result { + let store = Store::init(&self.backend)?; + let chunk_hashes = { - let mut writer = chunked_io::ChunkWriter::new(&mut self.backend); + let mut writer = chunked_io::ChunkWriter::new(&store); serialisation::serialise_into(subject, &mut writer)?; writer.finalise()? }; // A commit contains the list of all chunks needed to reconstruct the underlying data. let commit_bytes = serialisation::serialise(&chunk_hashes)?; - self.backend.store(&commit_bytes) + let commit_id = CommitId::from(store.store(&commit_bytes)?); + + store.persistence_layer.commit(&self.backend, &commit_id)?; + + Ok(*commit_id.as_hash()) } /// Checkout the bytes committed under `id`, if the commit exists. pub fn checkout(&self, id: &Hash) -> Result, StorageError> { - let bytes = self.backend.load(id)?; + let store = Store::from(PersistenceLayer::checkout( + &self.backend, + &CommitId::from(*id), + )?); + + let bytes = store.load(id)?; let commit: Vec = serialisation::deserialise(&bytes)?; let mut bytes = Vec::new(); for hash in commit { - let mut chunk = self.backend.load(&hash).map_err(|e| { - if let StorageError::NotFound(hash) = e { - StorageError::ChunkNotFound(hash) + let mut chunk = store.load(&hash).map_err(|e| { + if let StorageError::NotFound = e { + StorageError::ChunkNotFound(hash.to_string()) } else { e } @@ -179,7 +189,12 @@ impl Repo { /// Checkout something deserialisable from the store. pub fn checkout_serialised>(&self, id: &Hash) -> Result { - let mut reader = chunked_io::ChunkedReader::new(&self.backend, id)?; + let store = Store::from(PersistenceLayer::checkout( + &self.backend, + &CommitId::from(*id), + )?); + + let mut reader = chunked_io::ChunkedReader::new(&store, id)?; Ok(serialisation::deserialise_from(&mut reader)?) } @@ -187,17 +202,29 @@ impl Repo { pub fn export_snapshot(&self, id: &Hash, path: impl AsRef) -> Result<(), StorageError> { // Only export a snapshot to a new or empty directory let path = path.as_ref(); - if !path.exists() || path.read_dir()?.next().is_none() { - std::fs::create_dir_all(path)?; + let dir_manager = if !path.exists() || path.read_dir()?.next().is_none() { + DirectoryManager::new(path)? } else { return Err(StorageError::InvalidRepo); }; - let bytes = self.backend.load(id)?; + let snapshot_store = Store::init(&dir_manager)?; + + let store = Store::from(PersistenceLayer::checkout( + &self.backend, + &CommitId::from(*id), + )?); + + let bytes = store.load(id)?; let commit: Vec = serialisation::deserialise(&bytes)?; for chunk in commit { - self.backend.copy(&chunk, path)?; + store.copy(&snapshot_store, &chunk)?; } - self.backend.copy(id, path)?; + snapshot_store.store(&bytes)?; + + snapshot_store + .persistence_layer + .commit(&dir_manager, &CommitId::from(*id))?; + Ok(()) } } diff --git a/src/riscv/lib/src/storage/chunked_io.rs b/src/riscv/lib/src/storage/chunked_io.rs index a475f207388..996bce3532b 100644 --- a/src/riscv/lib/src/storage/chunked_io.rs +++ b/src/riscv/lib/src/storage/chunked_io.rs @@ -16,14 +16,14 @@ use super::Store; /// Simple writer that stores data in chunks of size [`CHUNK_SIZE`] pub struct ChunkWriter<'a> { - store: &'a mut Store, + store: &'a Store, hashes: Vec, buffer: Vec, } impl<'a> ChunkWriter<'a> { /// Create a new writer that writes the chunks to the given [`Store`]. - pub fn new(store: &'a mut Store) -> Self { + pub fn new(store: &'a Store) -> Self { Self { store, hashes: Vec::new(), diff --git a/src/riscv/lib/tests/test_pvm_storage.rs b/src/riscv/lib/tests/test_pvm_storage.rs index c3276f8b85b..56c95eaa2de 100644 --- a/src/riscv/lib/tests/test_pvm_storage.rs +++ b/src/riscv/lib/tests/test_pvm_storage.rs @@ -62,7 +62,7 @@ fn test_repo() { let unknown_hash: Hash = [0u8; Hash::DIGEST_SIZE].into(); assert!(matches!( repo.checkout(&unknown_hash), - Err(StorageError::NotFound(_)) + Err(StorageError::NotFound) )); // Check that exporting a snapshot creates a new repo which contains @@ -127,7 +127,7 @@ fn test_repo_serialised() { let unknown_hash: Hash = [0u8; Hash::DIGEST_SIZE].into(); assert!(matches!( repo.checkout_serialised::>(&unknown_hash), - Err(StorageError::NotFound(_)) + Err(StorageError::NotFound) )); // Check that exporting a snapshot creates a new repo which contains