Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! You can also [`connect`](Store::connect) to a remote store that is listening
//! to rpc requests.
use std::{io, net::SocketAddr, ops::Deref};
use std::{io, net::SocketAddr};

use bao_tree::io::EncodeError;
use iroh::Endpoint;
Expand Down Expand Up @@ -250,14 +250,6 @@ pub struct Store {
client: ApiClient,
}

impl Deref for Store {
type Target = blobs::Blobs;

fn deref(&self) -> &Self::Target {
blobs::Blobs::ref_from_sender(&self.client)
}
}

impl Store {
/// The tags API.
pub fn tags(&self) -> &Tags {
Expand Down
91 changes: 53 additions & 38 deletions src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,56 @@
//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
//!
//! The main entry point is the [`Blobs`] struct.
use std::{
collections::BTreeMap,
future::{Future, IntoFuture},
io,
num::NonZeroU64,
path::{Path, PathBuf},
pin::Pin,
};

use std::collections::BTreeMap;
use std::future::Future;
use std::future::IntoFuture;
use std::io;
use std::num::NonZeroU64;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;

use bao_tree::io::fsm::ResponseDecoder;
use bao_tree::io::fsm::ResponseDecoderNext;
pub use bao_tree::io::mixed::EncodedItem;
use bao_tree::{
io::{
fsm::{ResponseDecoder, ResponseDecoderNext},
BaoContentItem, Leaf,
},
BaoTree, ChunkNum, ChunkRanges,
};
use bao_tree::io::BaoContentItem;
use bao_tree::io::Leaf;
use bao_tree::BaoTree;
use bao_tree::ChunkNum;
use bao_tree::ChunkRanges;
use bytes::Bytes;
use genawaiter::sync::Gen;
use iroh_io::{AsyncStreamReader, TokioStreamReader};
use irpc::channel::{mpsc, oneshot};
use n0_future::{future, stream, Stream, StreamExt};
use iroh_io::AsyncStreamReader;
use iroh_io::TokioStreamReader;
use irpc::channel::mpsc;
use irpc::channel::oneshot;
use n0_future::future;
use n0_future::stream;
use n0_future::Stream;
use n0_future::StreamExt;
use quinn::SendStream;
use range_collections::{range_set::RangeSetRange, RangeSet2};
use range_collections::range_set::RangeSetRange;
use range_collections::RangeSet2;
use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use tokio::io::AsyncWriteExt;
use tracing::trace;
mod reader;
pub use reader::BlobReader;

use super::proto::BatchResponse;
use super::proto::BlobStatusRequest;
use super::proto::ClearProtectedRequest;
use super::proto::CreateTempTagRequest;
use super::proto::ExportBaoRequest;
use super::proto::ExportRangesItem;
use super::proto::ImportBaoRequest;
use super::proto::ImportByteStreamRequest;
use super::proto::ImportBytesRequest;
use super::proto::ImportPathRequest;
use super::proto::ListRequest;
use super::proto::Scope;
// Public reexports from the proto module.
//
// Due to the fact that the proto module is hidden from docs by default,
Expand All @@ -45,23 +64,19 @@ pub use super::proto::{
ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
};
use super::{
proto::{
BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
},
remote::HashSeqChunk,
tags::TagInfo,
ApiClient, RequestResult, Tags,
};
use crate::{
api::proto::{BatchRequest, ImportByteStreamUpdate},
provider::events::ClientResult,
store::IROH_BLOCK_SIZE,
util::temp_tag::TempTag,
BlobFormat, Hash, HashAndFormat,
};
use super::remote::HashSeqChunk;
use super::tags::TagInfo;
use super::ApiClient;
use super::RequestResult;
use super::Tags;
use crate::api::proto::BatchRequest;
use crate::api::proto::ImportByteStreamUpdate;
use crate::provider::events::ClientResult;
use crate::store::IROH_BLOCK_SIZE;
use crate::util::temp_tag::TempTag;
use crate::BlobFormat;
use crate::Hash;
use crate::HashAndFormat;

/// Options for adding bytes.
#[derive(Debug)]
Expand Down
20 changes: 11 additions & 9 deletions src/api/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ async fn split_request<'a>(
};
let first = GetRequest::blob(req.hash);
execute_get(pool, Arc::new(first), providers, store, progress).await?;
let size = store.observe(req.hash).await?.size();
let size = store.blobs().observe(req.hash).await?.size();
anyhow::ensure!(size % 32 == 0, "Size is not a multiple of 32");
let n = size / 32;
Box::new(
Expand Down Expand Up @@ -547,8 +547,8 @@ mod tests {
let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
let tt1 = store1.add_slice("hello world").await?;
let tt2 = store2.add_slice("hello world 2").await?;
let tt1 = store1.blobs().add_slice("hello world").await?;
let tt2 = store2.blobs().add_slice("hello world 2").await?;
let node1_addr = r1.endpoint().node_addr().initialized().await;
let node1_id = node1_addr.node_id;
let node2_addr = r2.endpoint().node_addr().initialized().await;
Expand All @@ -567,8 +567,8 @@ mod tests {
while let Some(item) = progress.next().await {
println!("Got item: {item:?}");
}
assert_eq!(store3.get_bytes(tt1.hash).await?.deref(), b"hello world");
assert_eq!(store3.get_bytes(tt2.hash).await?.deref(), b"hello world 2");
assert_eq!(store3.blobs().get_bytes(tt1.hash).await?.deref(), b"hello world");
assert_eq!(store3.blobs().get_bytes(tt2.hash).await?.deref(), b"hello world 2");
Ok(())
}

Expand All @@ -579,10 +579,11 @@ mod tests {
let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
let tt1 = store1.add_slice(vec![1; 10000000]).await?;
let tt2 = store2.add_slice(vec![2; 10000000]).await?;
let tt1 = store1.blobs().add_slice(vec![1; 10000000]).await?;
let tt2 = store2.blobs().add_slice(vec![2; 10000000]).await?;
let hs = [tt1.hash, tt2.hash].into_iter().collect::<HashSeq>();
let root = store1
.blobs()
.add_bytes_with_opts(AddBytesOptions {
data: hs.clone().into(),
format: crate::BlobFormat::HashSeq,
Expand Down Expand Up @@ -648,10 +649,11 @@ mod tests {
let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
let tt1 = store1.add_slice(vec![1; 10000000]).await?;
let tt2 = store2.add_slice(vec![2; 10000000]).await?;
let tt1 = store1.blobs().add_slice(vec![1; 10000000]).await?;
let tt2 = store2.blobs().add_slice(vec![2; 10000000]).await?;
let hs = [tt1.hash, tt2.hash].into_iter().collect::<HashSeq>();
let root = store1
.blobs()
.add_bytes_with_opts(AddBytesOptions {
data: hs.clone().into(),
format: crate::BlobFormat::HashSeq,
Expand Down
12 changes: 8 additions & 4 deletions src/api/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,13 @@ impl Remote {
) -> anyhow::Result<LocalInfo> {
let request = request.into();
let root = request.hash;
let bitfield = self.store().observe(root).await?;
let bitfield = self.store().blobs().observe(root).await?;
let children = if !request.ranges.is_blob() {
let opts = ExportBaoOptions {
hash: root,
ranges: bitfield.ranges.clone(),
};
let bao = self.store().export_bao_with_opts(opts, 32);
let bao = self.store().blobs().export_bao_with_opts(opts, 32);
let mut by_index = BTreeMap::new();
let mut stream = bao.hashes_with_index();
while let Some(item) = stream.next().await {
Expand All @@ -447,7 +447,7 @@ impl Remote {
// we don't have the hash, so we can't store the bitfield
continue;
};
let bitfield = self.store().observe(*hash).await?;
let bitfield = self.store().blobs().observe(*hash).await?;
bitfields.insert(*hash, bitfield);
hash_seq.insert(child, *hash);
}
Expand Down Expand Up @@ -592,6 +592,7 @@ impl Remote {
let root_ranges = request_ranges.next().expect("infinite iterator");
if !root_ranges.is_empty() {
self.store()
.blobs()
.export_bao(root, root_ranges.clone())
.write_quinn_with_progress(&mut send, &mut context, &root, 0)
.await?;
Expand All @@ -601,13 +602,14 @@ impl Remote {
send.finish()?;
return Ok(Default::default());
}
let hash_seq = self.store().get_bytes(root).await?;
let hash_seq = self.store().blobs().get_bytes(root).await?;
let hash_seq = HashSeq::try_from(hash_seq)?;
for (child, (child_hash, child_ranges)) in
hash_seq.into_iter().zip(request_ranges).enumerate()
{
if !child_ranges.is_empty() {
self.store()
.blobs()
.export_bao(child_hash, child_ranges.clone())
.write_quinn_with_progress(
&mut send,
Expand Down Expand Up @@ -681,6 +683,7 @@ impl Remote {
let mut next_child = Ok(at_start_child);
let hash_seq = HashSeq::try_from(
store
.blobs()
.get_bytes(root)
.await
.map_err(|e| LocalFailureSnafu.into_error(e.into()))?,
Expand Down Expand Up @@ -891,6 +894,7 @@ async fn get_blob_ranges_impl(
let buffer_size = get_buffer_size(size);
trace!(%size, %buffer_size, "get blob");
let handle = store
.blobs()
.import_bao(hash, size, buffer_size)
.await
.map_err(|e| LocalFailureSnafu.into_error(e.into()))?;
Expand Down
5 changes: 3 additions & 2 deletions src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub trait SimpleStore {

impl SimpleStore for crate::api::Store {
async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
Ok(self.get_bytes(hash).await?)
Ok(self.blobs().get_bytes(hash).await?)
}
}

Expand Down Expand Up @@ -190,11 +190,12 @@ impl Collection {
pub async fn store(self, db: &Store) -> anyhow::Result<TempTag> {
let (links, meta) = self.into_parts();
let meta_bytes = postcard::to_stdvec(&meta)?;
let meta_tag = db.add_bytes(meta_bytes).temp_tag().await?;
let meta_tag = db.blobs().add_bytes(meta_bytes).temp_tag().await?;
let links_bytes = std::iter::once(*meta_tag.hash())
.chain(links)
.collect::<HashSeq>();
let links_tag = db
.blobs()
.add_bytes_with_opts(AddBytesOptions {
data: links_bytes.into(),
format: BlobFormat::HashSeq,
Expand Down
12 changes: 6 additions & 6 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ pub async fn handle_get(
let hash_seq = match &hash_seq {
Some(b) => b,
None => {
let bytes = store.get_bytes(hash).await?;
let bytes = store.blobs().get_bytes(hash).await?;
let hs = HashSeq::try_from(bytes)?;
hash_seq = Some(hs);
hash_seq.as_ref().unwrap()
Expand Down Expand Up @@ -460,7 +460,7 @@ pub async fn handle_push(
let root_ranges = request_ranges.next().expect("infinite iterator");
if !root_ranges.is_empty() {
// todo: send progress from import_bao_quinn or rename to import_bao_quinn_with_progress
store
store.blobs()
.import_bao_quinn(hash, root_ranges.clone(), &mut reader.inner)
.await?;
}
Expand All @@ -469,13 +469,13 @@ pub async fn handle_push(
return Ok(());
}
// todo: we assume here that the hash sequence is complete. For some requests this might not be the case. We would need `LazyHashSeq` for that, but it is buggy as of now!
let hash_seq = store.get_bytes(hash).await?;
let hash_seq = store.blobs().get_bytes(hash).await?;
let hash_seq = HashSeq::try_from(hash_seq)?;
for (child_hash, child_ranges) in hash_seq.into_iter().zip(request_ranges) {
if child_ranges.is_empty() {
continue;
}
store
store.blobs()
.import_bao_quinn(child_hash, child_ranges.clone(), &mut reader.inner)
.await?;
}
Expand All @@ -490,7 +490,7 @@ pub(crate) async fn send_blob(
ranges: ChunkRanges,
writer: &mut ProgressWriter,
) -> ExportBaoResult<()> {
store
store.blobs()
.export_bao(hash, ranges)
.write_quinn_with_progress(&mut writer.inner, &mut writer.context, &hash, index)
.await
Expand All @@ -504,7 +504,7 @@ pub async fn handle_observe(
request: ObserveRequest,
writer: &mut ProgressWriter,
) -> Result<()> {
let mut stream = store.observe(request.hash).stream().await?;
let mut stream = store.blobs().observe(request.hash).stream().await?;
let mut old = stream
.next()
.await
Expand Down
Loading