Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions durable-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub mod errors;
pub mod key;
mod merkle_layer;
mod merkle_worker;
#[cfg_attr(not(test), expect(dead_code, reason = "Incomplete"))]
pub(crate) mod persistence_layer;
pub mod persistence_layer;
pub mod registry;
pub mod repo;
4 changes: 4 additions & 0 deletions durable-storage/src/persistence_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl<'d> HashedData<'d> {
let hash = Hash::hash_bytes(value);
Self { hash, value }
}

pub fn hash(&self) -> Hash {
self.hash
}
}

#[derive(BorrowDecode, Encode)]
Expand Down
173 changes: 100 additions & 73 deletions src/riscv/lib/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
mod chunked_io;

use std::io;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;

use bincode::Decode;
use bincode::Encode;
Expand All @@ -17,12 +15,21 @@ use bincode::error::EncodeError;
use octez_riscv_data::hash::Hash;
use octez_riscv_data::hash::HashError;
use octez_riscv_data::serialisation;
use octez_riscv_durable_storage::commit::CommitId;
use octez_riscv_durable_storage::errors::Error;
use octez_riscv_durable_storage::errors::OperationalError;
use octez_riscv_durable_storage::persistence_layer::HashedData;
use octez_riscv_durable_storage::persistence_layer::PersistenceLayer;
use octez_riscv_durable_storage::repo::DirectoryManager;
use thiserror::Error;

const CHUNK_SIZE: usize = 4096;

#[derive(Error, Debug)]
pub enum StorageError {
#[error("Persistence layer error: {0}")]
PersistenceLayerError(#[from] octez_riscv_durable_storage::errors::Error),

#[error("IO error: {0}")]
IoError(#[from] io::Error),

Expand All @@ -38,136 +45,139 @@ pub enum StorageError {
#[error("Hashing error")]
HashError(#[from] HashError),

#[error("Data for hash {0} not found")]
NotFound(String),
#[error("Data not found")]
NotFound,

#[error("Committed chunk {0} not found")]
ChunkNotFound(String),
}

#[derive(Debug, PartialEq)]
impl From<OperationalError> for StorageError {
fn from(value: OperationalError) -> Self {
match value {
OperationalError::CommitNotFound => StorageError::NotFound,
_ => StorageError::PersistenceLayerError(Error::Operational(value)),
}
}
}

#[derive(Debug)]
pub struct Store {
path: Box<Path>,
persistence_layer: PersistenceLayer,
}

impl Store {
/// Initialise a store. Either create a new directory if `path` does not
/// exist or initialise in an existing directory.
/// Throws `StorageError::InvalidRepo` if `path` is a file.
pub fn init(path: impl AsRef<Path>) -> Result<Self, StorageError> {
let path = path.as_ref().to_path_buf();
if !path.exists() {
std::fs::create_dir(&path)?;
} else if path.metadata()?.is_file() {
return Err(StorageError::InvalidRepo);
}
pub fn init(dir_manager: &DirectoryManager) -> Result<Self, StorageError> {
Ok(Store {
path: path.into_boxed_path(),
persistence_layer: PersistenceLayer::new(&dir_manager)?,
})
}

fn file_name_of_hash(hash: &Hash) -> String {
hex::encode(hash)
}

fn path_of_hash(&self, hash: &Hash) -> PathBuf {
self.path.join(Self::file_name_of_hash(hash))
}

fn write_data_if_new(&self, file_name: PathBuf, data: &[u8]) -> Result<(), StorageError> {
match std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(file_name)
{
Ok(mut f) => f.write_all(data)?,
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => (),
Err(e) => return Err(StorageError::IoError(e)),
}
Ok(())
}

/// Store data and return its hash. The data is written to disk only if
/// previously unseen.
pub fn store(&self, data: &[u8]) -> Result<Hash, StorageError> {
let hash = Hash::hash_bytes(data);
let file_name = self.path_of_hash(&hash);
self.write_data_if_new(file_name, data)?;
Ok(hash)
let hashed_data = HashedData::from_value(data);
self.persistence_layer.blob_set(&hashed_data)?;
Ok(hashed_data.hash())
}

/// Load data corresponding to `hash`, if found.
pub fn load(&self, hash: &Hash) -> Result<Vec<u8>, StorageError> {
let file_name = self.path_of_hash(hash);
std::fs::read(file_name).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
StorageError::NotFound(hex::encode(hash))
} else {
StorageError::IoError(e)
}
})
Ok(self
.persistence_layer
.blob_get(hash)?
.as_ref()
.into_iter()
.copied()
.collect::<Vec<_>>())
}

/// Copy the data corresponding to `hash` to `path`.
pub fn copy(&self, hash: &Hash, path: impl AsRef<Path>) -> Result<(), StorageError> {
let source_path = self.path_of_hash(hash);
let target_path = path.as_ref().join(Self::file_name_of_hash(hash));
std::fs::copy(source_path, target_path)?;
/// Copy a piece of data from this `Store` to another.
pub fn copy(&self, other: &Self, hash: &Hash) -> Result<(), StorageError> {
let bytes = self.load(hash)?;
other.store(&bytes)?;
Ok(())
}
}

#[derive(Debug, PartialEq)]
impl From<PersistenceLayer> for Store {
fn from(persistence_layer: PersistenceLayer) -> Self {
Self { persistence_layer }
}
}

pub struct Repo {
backend: Store,
backend: DirectoryManager,
}

impl Repo {
/// Load or create new repo at `path`.
///
/// Returns `Err(StorageError::InvalidRepo)` if the `path` is a file not a directory.
pub fn load(path: impl AsRef<Path>) -> Result<Repo, StorageError> {
if path.as_ref().is_file() {
return Err(StorageError::InvalidRepo);
};
Ok(Repo {
backend: Store::init(path)?,
backend: DirectoryManager::new(path.as_ref())?,
})
}

pub fn close(self) {}

/// Create a new commit for `bytes` and return the commit id.
/// Create a new commit for `bytes` and return the commit id.
pub fn commit(&mut self, bytes: &[u8]) -> Result<Hash, StorageError> {
let store = Store::init(&self.backend)?;

let mut commit = Vec::with_capacity(bytes.len().div_ceil(CHUNK_SIZE) * Hash::DIGEST_SIZE);

for chunk in bytes.chunks(CHUNK_SIZE) {
let chunk_hash = self.backend.store(chunk)?;
let chunk_hash = store.store(chunk)?;
commit.push(chunk_hash);
}

// A commit contains the list of all chunks needed to reconstruct `data`.
let commit_bytes = serialisation::serialise(&commit)?;
self.backend.store(&commit_bytes)
let commit_id = CommitId::from(store.store(&commit_bytes)?);

store.persistence_layer.commit(&self.backend, &commit_id)?;

Ok(*commit_id.as_hash())
}

/// Commit something serialisable and return the commit ID.
pub fn commit_serialised(&mut self, subject: &impl Encode) -> Result<Hash, StorageError> {
let store = Store::init(&self.backend)?;

let chunk_hashes = {
let mut writer = chunked_io::ChunkWriter::new(&mut self.backend);
let mut writer = chunked_io::ChunkWriter::new(&store);
serialisation::serialise_into(subject, &mut writer)?;
writer.finalise()?
};

// A commit contains the list of all chunks needed to reconstruct the underlying data.
let commit_bytes = serialisation::serialise(&chunk_hashes)?;
self.backend.store(&commit_bytes)
let commit_id = CommitId::from(store.store(&commit_bytes)?);

store.persistence_layer.commit(&self.backend, &commit_id)?;

Ok(*commit_id.as_hash())
}

/// Checkout the bytes committed under `id`, if the commit exists.
pub fn checkout(&self, id: &Hash) -> Result<Vec<u8>, StorageError> {
let bytes = self.backend.load(id)?;
let store = Store::from(PersistenceLayer::checkout(
&self.backend,
&CommitId::from(*id),
)?);

let bytes = store.load(id)?;
let commit: Vec<Hash> = serialisation::deserialise(&bytes)?;
let mut bytes = Vec::new();
for hash in commit {
let mut chunk = self.backend.load(&hash).map_err(|e| {
if let StorageError::NotFound(hash) = e {
StorageError::ChunkNotFound(hash)
let mut chunk = store.load(&hash).map_err(|e| {
if let StorageError::NotFound = e {
StorageError::ChunkNotFound(hash.to_string())
} else {
e
}
Expand All @@ -179,25 +189,42 @@ impl Repo {

/// Checkout something deserialisable from the store.
pub fn checkout_serialised<S: Decode<()>>(&self, id: &Hash) -> Result<S, StorageError> {
let mut reader = chunked_io::ChunkedReader::new(&self.backend, id)?;
let store = Store::from(PersistenceLayer::checkout(
&self.backend,
&CommitId::from(*id),
)?);

let mut reader = chunked_io::ChunkedReader::new(&store, id)?;
Ok(serialisation::deserialise_from(&mut reader)?)
}

/// A snapshot is a new repo to which only `id` has been committed.
pub fn export_snapshot(&self, id: &Hash, path: impl AsRef<Path>) -> Result<(), StorageError> {
// Only export a snapshot to a new or empty directory
let path = path.as_ref();
if !path.exists() || path.read_dir()?.next().is_none() {
std::fs::create_dir_all(path)?;
let dir_manager = if !path.exists() || path.read_dir()?.next().is_none() {
DirectoryManager::new(path)?
} else {
return Err(StorageError::InvalidRepo);
};
let bytes = self.backend.load(id)?;
let snapshot_store = Store::init(&dir_manager)?;

let store = Store::from(PersistenceLayer::checkout(
&self.backend,
&CommitId::from(*id),
)?);

let bytes = store.load(id)?;
let commit: Vec<Hash> = serialisation::deserialise(&bytes)?;
for chunk in commit {
self.backend.copy(&chunk, path)?;
store.copy(&snapshot_store, &chunk)?;
}
self.backend.copy(id, path)?;
snapshot_store.store(&bytes)?;

snapshot_store
.persistence_layer
.commit(&dir_manager, &CommitId::from(*id))?;

Ok(())
}
}
4 changes: 2 additions & 2 deletions src/riscv/lib/src/storage/chunked_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use super::Store;

/// Simple writer that stores data in chunks of size [`CHUNK_SIZE`]
pub struct ChunkWriter<'a> {
store: &'a mut Store,
store: &'a Store,
hashes: Vec<Hash>,
buffer: Vec<u8>,
}

impl<'a> ChunkWriter<'a> {
/// Create a new writer that writes the chunks to the given [`Store`].
pub fn new(store: &'a mut Store) -> Self {
pub fn new(store: &'a Store) -> Self {
Self {
store,
hashes: Vec::new(),
Expand Down
4 changes: 2 additions & 2 deletions src/riscv/lib/tests/test_pvm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn test_repo() {
let unknown_hash: Hash = [0u8; Hash::DIGEST_SIZE].into();
assert!(matches!(
repo.checkout(&unknown_hash),
Err(StorageError::NotFound(_))
Err(StorageError::NotFound)
));

// Check that exporting a snapshot creates a new repo which contains
Expand Down Expand Up @@ -127,7 +127,7 @@ fn test_repo_serialised() {
let unknown_hash: Hash = [0u8; Hash::DIGEST_SIZE].into();
assert!(matches!(
repo.checkout_serialised::<Vec<u8>>(&unknown_hash),
Err(StorageError::NotFound(_))
Err(StorageError::NotFound)
));

// Check that exporting a snapshot creates a new repo which contains
Expand Down