diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..226dec96 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[target.wasm32-unknown-unknown] +runner = "wasm-bindgen-test-runner" +rustflags = ['--cfg', 'getrandom_backend="wasm_js"'] diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f1df4405..2f00e212 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -279,3 +279,40 @@ jobs: - uses: actions/checkout@v4 - run: pip install --user codespell[toml] - run: codespell --ignore-words-list=ans,atmost,crate,inout,ratatui,ser,stayin,swarmin,worl --skip=CHANGELOG.md + + wasm_build: + name: Build & test wasm32 + runs-on: ubuntu-latest + env: + RUSTFLAGS: '--cfg getrandom_backend="wasm_js"' + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install Node.js + uses: actions/setup-node@v4 + with: + node-version: 20 + + - name: Install stable toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Add wasm target + run: rustup target add wasm32-unknown-unknown + + - name: Install wasm-tools + uses: bytecodealliance/actions/wasm-tools/setup@v1 + + - name: Install wasm-pack + uses: taiki-e/install-action@v2 + with: + tool: wasm-bindgen,wasm-pack + + - name: wasm32 build + run: cargo build --target wasm32-unknown-unknown --no-default-features + + # If the Wasm file contains any 'import "env"' declarations, then + # some non-Wasm-compatible code made it into the final code. + - name: Ensure no 'import "env"' in wasm + run: | + ! wasm-tools print --skeleton target/wasm32-unknown-unknown/debug/iroh_docs.wasm | grep 'import "env"' \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b64429dc..26792051 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -350,8 +350,7 @@ dependencies = [ [[package]] name = "bao-tree" version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff16d65e48353db458be63ee395c03028f24564fd48668389bd65fd945f5ac36" +source = "git+https://github.com/n0-computer/bao-tree?branch=main#109ce557ec3feafb29a4702a0d1d195b1d52a706" dependencies = [ "blake3", "bytes", @@ -1816,7 +1815,7 @@ dependencies = [ "iroh-quinn-proto", "iroh-quinn-udp", "iroh-relay", - "n0-future", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "n0-snafu", "n0-watcher", "nested_enum_utils", @@ -1870,13 +1869,13 @@ dependencies = [ [[package]] name = "iroh-blobs" version = "0.96.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7a58699e59c2deb116df7420bc6755bf5f445603f71998f274a99f1985a5bc5" +source = "git+https://github.com/n0-computer/iroh-blobs?branch=Frando%2Fwasm-nopatch#c1243d2ba54edacd2c4633bc121640afde93aabf" dependencies = [ "anyhow", "arrayvec", "bao-tree", "bytes", + "cfg_aliases", "chrono", "data-encoding", "derive_more 2.0.1", @@ -1889,8 +1888,8 @@ dependencies = [ "iroh-metrics", "iroh-quinn", "iroh-tickets", - "irpc", - "n0-future", + "irpc 0.10.0 (git+https://github.com/n0-computer/irpc?branch=main)", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "n0-snafu", "nested_enum_utils", "postcard", @@ -1904,7 +1903,6 @@ dependencies = [ "smallvec", "snafu", "tokio", - "tokio-util", "tracing", ] @@ -1916,6 +1914,7 @@ dependencies = [ "async-channel", "blake3", "bytes", + "cfg_aliases", "data-encoding", "derive_more 2.0.1", "ed25519-dalek", @@ -1929,8 +1928,8 @@ dependencies = [ "iroh-metrics", "iroh-quinn", "iroh-tickets", - "irpc", - "n0-future", + "irpc 0.10.0 (git+https://github.com/n0-computer/irpc?branch=main)", + "n0-future 0.3.0 (git+https://github.com/n0-computer/n0-future?branch=Frando%2Ftime-serde)", "nested_enum_utils", "num_enum", "parking_lot", @@ -1975,8 +1974,8 @@ dependencies = [ "iroh", "iroh-base", "iroh-metrics", - "irpc", - "n0-future", + "irpc 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "n0-snafu", "nested_enum_utils", "postcard", @@ -2113,7 +2112,7 @@ dependencies = [ "iroh-quinn", "iroh-quinn-proto", "lru 0.16.1", - "n0-future", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "n0-snafu", "nested_enum_utils", "num_enum", @@ -2171,13 +2170,28 @@ name = "irpc" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52cf44fdb253f2a3e22e5ecfa8efa466929f8b7cdd4fc0f958f655406e8cdab6" +dependencies = [ + "futures-util", + "irpc-derive 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde", + "thiserror 2.0.12", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "irpc" +version = "0.10.0" +source = "git+https://github.com/n0-computer/irpc?branch=main#83c9dcbfe000f016e9a77419472a8b717036d5c8" dependencies = [ "anyhow", "futures-buffered", "futures-util", "iroh-quinn", - "irpc-derive", - "n0-future", + "irpc-derive 0.8.0 (git+https://github.com/n0-computer/irpc?branch=main)", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "postcard", "rcgen", "rustls", @@ -2200,6 +2214,16 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "irpc-derive" +version = "0.8.0" +source = "git+https://github.com/n0-computer/irpc?branch=main#83c9dcbfe000f016e9a77419472a8b717036d5c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -2417,6 +2441,26 @@ dependencies = [ "web-time", ] +[[package]] +name = "n0-future" +version = "0.3.0" +source = "git+https://github.com/n0-computer/n0-future?branch=Frando%2Ftime-serde#2044fc5b6bad8237cf288c968dc5fcf74cc617fa" +dependencies = [ + "cfg_aliases", + "derive_more 1.0.0", + "futures-buffered", + "futures-lite", + "futures-util", + "js-sys", + "pin-project", + "send_wrapper", + "tokio", + "tokio-util", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-time", +] + [[package]] name = "n0-snafu" version = "0.2.2" @@ -2437,7 +2481,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34c65e127e06e5a2781b28df6a33ea474a7bddc0ac0cfea888bd20c79a1b6516" dependencies = [ "derive_more 2.0.1", - "n0-future", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "snafu", ] @@ -2531,7 +2575,7 @@ dependencies = [ "iroh-quinn-udp", "js-sys", "libc", - "n0-future", + "n0-future 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "n0-watcher", "nested_enum_utils", "netdev", @@ -2884,9 +2928,9 @@ dependencies = [ [[package]] name = "positioned-io" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8078ce4d22da5e8f57324d985cc9befe40c49ab0507a192d6be9e59584495c9" +checksum = "d4ec4b80060f033312b99b6874025d9503d2af87aef2dd4c516e253fbfcdada7" dependencies = [ "libc", "winapi", @@ -3222,9 +3266,9 @@ dependencies = [ [[package]] name = "reflink-copy" -version = "0.1.26" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78c81d000a2c524133cc00d2f92f019d399e57906c3b7119271a2495354fe895" +checksum = "23bbed272e39c47a095a5242218a67412a220006842558b03fe2935e8f3d7b92" dependencies = [ "cfg-if", "libc", @@ -4203,7 +4247,6 @@ dependencies = [ "io-uring", "libc", "mio", - "parking_lot", "pin-project-lite", "signal-hook-registry", "slab", @@ -4281,12 +4324,10 @@ checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", - "futures-io", "futures-sink", "futures-util", "hashbrown 0.15.4", "pin-project-lite", - "slab", "tokio", ] @@ -4750,6 +4791,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" dependencies = [ "js-sys", + "serde", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index ed468b7f..98b9b566 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,20 +31,20 @@ futures-buffered = "0.2.4" futures-lite = "2.3.0" futures-util = { version = "0.3.25" } hex = "0.4" -iroh = { version = "0.94" } +iroh = { version = "0.94", default-features = false } iroh-tickets = { version = "0.1"} -iroh-blobs = { version = "0.96" } -iroh-gossip = { version = "0.94", features = ["net"] } +iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "Frando/wasm-nopatch", default-features = false } +iroh-gossip = { version = "0.94", features = ["net"], default-features = false } iroh-metrics = { version = "0.36", default-features = false } -irpc = { version = "0.10.0" } -n0-future = "0.3" +irpc = { git = "https://github.com/n0-computer/irpc", branch = "main", default-features = false } +n0-future = { version = "0.3", features = ["serde"], git = "https://github.com/n0-computer/n0-future", branch = "Frando/time-serde" } num_enum = "0.7" postcard = { version = "1", default-features = false, features = [ "alloc", "use-std", "experimental-derive", ] } -quinn = { package = "iroh-quinn", version = "0.14.0" } +quinn = { package = "iroh-quinn", version = "0.14.0", optional = true } rand = "0.9.2" redb = { version = "2.6.3" } self_cell = "1.0.3" @@ -74,8 +74,10 @@ tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } tracing-test = "0.2.5" [features] -default = ["metrics"] +default = ["metrics", "rpc", "fs-store"] metrics = ["iroh-metrics/metrics", "iroh/metrics"] +rpc = ["dep:quinn", "irpc/rpc", "iroh-blobs/rpc"] +fs-store = ["iroh-blobs/fs-store"] [package.metadata.docs.rs] all-features = true @@ -93,3 +95,6 @@ missing_debug_implementations = "warn" # do. To enable for a crate set `#![cfg_attr(iroh_docsrs, # feature(doc_cfg))]` in the crate. unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)"] } + +[build-dependencies] +cfg_aliases = "0.2.1" diff --git a/build.rs b/build.rs new file mode 100644 index 00000000..7aae5682 --- /dev/null +++ b/build.rs @@ -0,0 +1,9 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Convenience aliases + wasm_browser: { all(target_family = "wasm", target_os = "unknown") }, + } +} diff --git a/src/actor.rs b/src/actor.rs index 7b9f9241..a2357b5e 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -4,7 +4,6 @@ use std::{ collections::{hash_map, HashMap}, num::NonZeroU64, sync::Arc, - thread::JoinHandle, time::Duration, }; @@ -13,8 +12,11 @@ use bytes::Bytes; use futures_util::FutureExt; use iroh_blobs::Hash; use irpc::channel::mpsc; +use n0_future::task::JoinSet; use serde::{Deserialize, Serialize}; -use tokio::{sync::oneshot, task::JoinSet}; +use tokio::sync::oneshot; +#[cfg(wasm_browser)] +use tracing::Instrument; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -226,7 +228,11 @@ struct OpenReplica { #[derive(Debug, Clone)] pub struct SyncHandle { tx: async_channel::Sender, - join_handle: Arc>>, + #[cfg(wasm_browser)] + #[allow(unused)] + join_handle: Arc>>, + #[cfg(not(wasm_browser))] + join_handle: Arc>>, metrics: Arc, } @@ -270,17 +276,23 @@ impl SyncHandle { tasks: Default::default(), metrics: metrics.clone(), }; + + let span = error_span!("sync", %me); + #[cfg(wasm_browser)] + let join_handle = n0_future::task::spawn(actor.run_async().instrument(span)); + + #[cfg(not(wasm_browser))] let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) .spawn(move || { - let span = error_span!("sync", %me); let _enter = span.enter(); - if let Err(err) = actor.run() { + if let Err(err) = actor.run_in_thread() { error!("Sync actor closed with error: {err:?}"); } }) .expect("failed to spawn thread"); + let join_handle = Arc::new(Some(join_handle)); SyncHandle { tx: action_tx, @@ -594,14 +606,26 @@ impl SyncHandle { impl Drop for SyncHandle { fn drop(&mut self) { // this means we're dropping the last reference + #[allow(unused)] if let Some(handle) = Arc::get_mut(&mut self.join_handle) { - // this call is the reason tx can not be a tokio mpsc channel. - // we have no control about where drop is called, yet tokio send_blocking panics - // when called from inside a tokio runtime. - self.tx.send_blocking(Action::Shutdown { reply: None }).ok(); - let handle = handle.take().expect("this can only run once"); - if let Err(err) = handle.join() { - warn!(?err, "Failed to join sync actor"); + #[cfg(wasm_browser)] + { + let tx = self.tx.clone(); + n0_future::task::spawn(async move { + tx.send(Action::Shutdown { reply: None }).await.ok(); + }); + } + #[cfg(not(wasm_browser))] + { + // this call is the reason tx can not be a tokio mpsc channel. + // we have no control about where drop is called, yet tokio send_blocking panics + // when called from inside a tokio runtime. + self.tx.send_blocking(Action::Shutdown { reply: None }).ok(); + let handle = handle.take().expect("this can only run once"); + + if let Err(err) = handle.join() { + warn!(?err, "Failed to join sync actor"); + } } } } @@ -617,7 +641,8 @@ struct Actor { } impl Actor { - fn run(self) -> Result<()> { + #[cfg(not(wasm_browser))] + fn run_in_thread(self) -> Result<()> { let rt = tokio::runtime::Builder::new_current_thread() .enable_time() .build()?; @@ -628,7 +653,7 @@ impl Actor { async fn run_async(mut self) { let reply = loop { - let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY); tokio::pin!(timeout); let action = tokio::select! { _ = &mut timeout => { @@ -764,37 +789,46 @@ impl Actor { hash, len, reply, - } => send_reply_with(reply, self, move |this| { - let author = get_author(&mut this.store, &author)?; - let mut replica = this.states.replica(namespace, &mut this.store)?; - replica.insert(&key, &author, hash, len)?; - this.metrics.new_entries_local.inc(); - this.metrics.new_entries_local_size.inc_by(len); - Ok(()) - }), + } => { + send_reply_with_async(reply, self, async move |this| { + let author = get_author(&mut this.store, &author)?; + let mut replica = this.states.replica(namespace, &mut this.store)?; + replica.insert(&key, &author, hash, len).await?; + this.metrics.new_entries_local.inc(); + this.metrics.new_entries_local_size.inc_by(len); + Ok(()) + }) + .await + } ReplicaAction::DeletePrefix { author, key, reply } => { - send_reply_with(reply, self, |this| { + send_reply_with_async(reply, self, async |this| { let author = get_author(&mut this.store, &author)?; let mut replica = this.states.replica(namespace, &mut this.store)?; - let res = replica.delete_prefix(&key, &author)?; + let res = replica.delete_prefix(&key, &author).await?; Ok(res) }) + .await } ReplicaAction::InsertRemote { entry, from, content_status, reply, - } => send_reply_with(reply, self, move |this| { - let mut replica = this - .states - .replica_if_syncing(&namespace, &mut this.store)?; - let len = entry.content_len(); - replica.insert_remote_entry(entry, from, content_status)?; - this.metrics.new_entries_remote.inc(); - this.metrics.new_entries_remote_size.inc_by(len); - Ok(()) - }), + } => { + send_reply_with_async(reply, self, async move |this| { + let mut replica = this + .states + .replica_if_syncing(&namespace, &mut this.store)?; + let len = entry.content_len(); + replica + .insert_remote_entry(entry, from, content_status) + .await?; + this.metrics.new_entries_remote.inc(); + this.metrics.new_entries_remote_size.inc_by(len); + Ok(()) + }) + .await + } ReplicaAction::SyncInitialMessage { reply } => { send_reply_with(reply, self, move |this| { @@ -1049,6 +1083,14 @@ fn send_reply_with( sender.send(f(this)).map_err(send_reply_error) } +async fn send_reply_with_async( + sender: oneshot::Sender>, + this: &mut Actor, + f: impl AsyncFnOnce(&mut Actor) -> Result, +) -> Result<(), SendReplyError> { + sender.send(f(this).await).map_err(send_reply_error) +} + fn send_reply_error(_err: T) -> SendReplyError { SendReplyError } diff --git a/src/api.rs b/src/api.rs index fc5e9c07..b0b3ac45 100644 --- a/src/api.rs +++ b/src/api.rs @@ -4,7 +4,6 @@ use std::{ future::Future, - net::SocketAddr, path::Path, pin::Pin, sync::{ @@ -14,18 +13,14 @@ use std::{ task::{ready, Poll}, }; -use anyhow::{Context, Result}; +use anyhow::Result; use bytes::Bytes; use iroh::EndpointAddr; use iroh_blobs::{ api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress}, Hash, }; -use irpc::rpc::Handler; -use n0_future::{ - task::{self, AbortOnDropHandle}, - FutureExt, Stream, StreamExt, -}; +use n0_future::{FutureExt, Stream, StreamExt}; use self::{ actor::RpcActor, @@ -67,19 +62,25 @@ impl DocsApi { } /// Connect to a remote docs service - pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result { + #[cfg(feature = "rpc")] + pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Result { Ok(DocsApi { inner: Client::quinn(endpoint, addr), }) } /// Listen for incoming RPC connections - pub fn listen(&self, endpoint: quinn::Endpoint) -> Result> { + #[cfg(feature = "rpc")] + pub fn listen( + &self, + endpoint: quinn::Endpoint, + ) -> Result> { + use anyhow::Context; let local = self .inner .as_local() .context("cannot listen on remote API")?; - let handler: Handler = Arc::new(move |msg, _rx, tx| { + let handler: irpc::rpc::Handler = Arc::new(move |msg, _rx, tx| { let local = local.clone(); Box::pin(async move { match msg { @@ -114,8 +115,8 @@ impl DocsApi { } }) }); - let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler)); - Ok(AbortOnDropHandle::new(join_handle)) + let join_handle = n0_future::task::spawn(irpc::rpc::listen(endpoint, handler)); + Ok(n0_future::task::AbortOnDropHandle::new(join_handle)) } /// Creates a new document author. diff --git a/src/api/actor.rs b/src/api/actor.rs index 5a8fce4b..9d365c10 100644 --- a/src/api/actor.rs +++ b/src/api/actor.rs @@ -429,7 +429,7 @@ impl RpcActor { return; } }; - tokio::task::spawn(async move { + n0_future::task::spawn(async move { loop { tokio::select! { msg = stream.next() => { diff --git a/src/api/protocol.rs b/src/api/protocol.rs index 1575474a..ca77fea9 100644 --- a/src/api/protocol.rs +++ b/src/api/protocol.rs @@ -304,7 +304,7 @@ pub struct AuthorDeleteResponse; // Use the macro to generate both the DocsProtocol and DocsMessage enums // plus implement Channels for each type -#[rpc_requests(message = DocsMessage)] +#[rpc_requests(message = DocsMessage, rpc_feature = "rpc")] #[derive(Serialize, Deserialize, Debug)] pub enum DocsProtocol { #[rpc(tx = oneshot::Sender>)] diff --git a/src/engine.rs b/src/engine.rs index 06efe0ae..3a3833da 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2,24 +2,20 @@ //! //! [`crate::Replica`] is also called documents here. -use std::{ - path::PathBuf, - str::FromStr, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use futures_lite::{Stream, StreamExt}; use iroh::{Endpoint, EndpointAddr, PublicKey}; use iroh_blobs::{ api::{blobs::BlobStatus, downloader::Downloader, Store}, - store::fs::options::{ProtectCb, ProtectOutcome}, + store::{ProtectCb, ProtectOutcome}, Hash, }; use iroh_gossip::net::Gossip; +use n0_future::task::AbortOnDropHandle; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; -use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error, error_span, Instrument}; use self::live::{LiveActor, ToLiveActor}; @@ -128,7 +124,7 @@ impl Engine { live_actor_tx.clone(), sync.metrics().clone(), ); - let actor_handle = tokio::task::spawn( + let actor_handle = n0_future::task::spawn( async move { if let Err(err) = actor.run().await { error!("sync actor failed: {err:?}"); @@ -355,7 +351,8 @@ pub enum DefaultAuthorStorage { /// Memory storage. Mem, /// File based persistent storage. - Persistent(PathBuf), + #[cfg(feature = "fs-store")] + Persistent(std::path::PathBuf), } impl DefaultAuthorStorage { @@ -373,7 +370,11 @@ impl DefaultAuthorStorage { docs_store.import_author(author).await?; Ok(author_id) } + #[cfg(feature = "fs-store")] Self::Persistent(ref path) => { + use std::str::FromStr; + + use anyhow::Context; if path.exists() { let data = tokio::fs::read_to_string(path).await.with_context(|| { format!( @@ -407,12 +408,14 @@ impl DefaultAuthorStorage { } /// Save a new default author. - pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> { + pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> { match self { Self::Mem => { // persistence is not possible for the mem storage so this is a noop. } + #[cfg(feature = "fs-store")] Self::Persistent(ref path) => { + use anyhow::Context; tokio::fs::write(path, author_id.to_string()) .await .with_context(|| { @@ -483,7 +486,7 @@ impl ProtectCallbackHandler { /// in any doc. /// /// [`Builder::protect_handler`]: crate::protocol::Builder::protect_handler - /// [`GcConfig`]: iroh_blobs::store::fs::options::GcConfig + /// [`GcConfig`]: iroh_blobs::store::GcConfig pub fn new() -> (Self, ProtectCb) { let (tx, rx) = mpsc::channel(4); let cb = ProtectCallbackSender(tx).into_cb(); diff --git a/src/engine/gossip.rs b/src/engine/gossip.rs index bb892287..bfaf0324 100644 --- a/src/engine/gossip.rs +++ b/src/engine/gossip.rs @@ -9,10 +9,8 @@ use iroh_gossip::{ api::{Event, GossipReceiver, GossipSender, JoinOptions}, net::Gossip, }; -use tokio::{ - sync::mpsc, - task::{AbortHandle, JoinSet}, -}; +use n0_future::task::{AbortHandle, JoinSet}; +use tokio::sync::mpsc; use tracing::{debug, instrument, warn}; use super::live::{Op, ToLiveActor}; diff --git a/src/engine/live.rs b/src/engine/live.rs index 9a4b11f1..b8355344 100644 --- a/src/engine/live.rs +++ b/src/engine/live.rs @@ -3,7 +3,6 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, - time::SystemTime, }; use anyhow::{Context, Result}; @@ -20,11 +19,9 @@ use iroh_blobs::{ Hash, HashAndFormat, }; use iroh_gossip::net::Gossip; +use n0_future::{task::JoinSet, time::SystemTime}; use serde::{Deserialize, Serialize}; -use tokio::{ - sync::{self, mpsc, oneshot}, - task::JoinSet, -}; +use tokio::sync::{self, mpsc, oneshot}; use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; // use super::gossip::{GossipActor, ToGossipActor}; diff --git a/src/engine/state.rs b/src/engine/state.rs index a6ff6666..c246fc47 100644 --- a/src/engine/state.rs +++ b/src/engine/state.rs @@ -1,10 +1,8 @@ -use std::{ - collections::BTreeMap, - time::{Instant, SystemTime}, -}; +use std::collections::BTreeMap; use anyhow::Result; use iroh::EndpointId; +use n0_future::time::{Instant, SystemTime}; use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; @@ -36,16 +34,14 @@ pub enum Origin { } /// The state we're in for a node and a namespace -#[derive(Debug, Clone)] +#[derive(Default, Debug, Clone)] pub enum SyncState { + #[default] Idle, - Running { start: SystemTime, origin: Origin }, -} - -impl Default for SyncState { - fn default() -> Self { - Self::Idle - } + Running { + start: SystemTime, + origin: Origin, + }, } /// Contains an entry for each active (syncing) namespace, and in there an entry for each node we diff --git a/src/net.rs b/src/net.rs index 1b40ee28..8f0ae61b 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,11 +1,9 @@ //! Network implementation of the iroh-docs protocol -use std::{ - future::Future, - time::{Duration, Instant}, -}; +use std::{future::Future, time::Duration}; use iroh::{Endpoint, EndpointAddr, PublicKey}; +use n0_future::time::Instant; use serde::{Deserialize, Serialize}; use tracing::{debug, error_span, trace, Instrument}; diff --git a/src/net/codec.rs b/src/net/codec.rs index 81dbc0ef..c2ed08b5 100644 --- a/src/net/codec.rs +++ b/src/net/codec.rs @@ -322,6 +322,7 @@ mod tests { let alice_replica_id = alice_replica.id(); alice_replica .hash_and_insert("hello bob", &author, "from alice") + .await .unwrap(); let mut bob_store = store::Store::memory(); @@ -329,6 +330,7 @@ mod tests { let bob_replica_id = bob_replica.id(); bob_replica .hash_and_insert("hello alice", &author, "from bob") + .await .unwrap(); assert_eq!( @@ -429,6 +431,7 @@ mod tests { #[tokio::test] #[traced_test] + #[cfg(feature = "fs-store")] async fn test_sync_many_authors_fs() -> Result<()> { let tmpdir = tempfile::tempdir()?; let alice_store = store::fs::Store::persistent(tmpdir.path().join("a.db"))?; @@ -438,9 +441,9 @@ mod tests { type Message = (AuthorId, Vec, Hash); - fn insert_messages( + async fn insert_messages( mut rng: impl CryptoRng, - replica: &mut crate::sync::Replica, + replica: &mut crate::sync::Replica<'_>, num_authors: usize, msgs_per_author: usize, key_value_fn: impl Fn(&AuthorId, usize) -> (String, String), @@ -453,7 +456,10 @@ mod tests { for i in 0..msgs_per_author { for author in authors.iter() { let (key, value) = key_value_fn(&author.id(), i); - let hash = replica.hash_and_insert(key.clone(), author, value).unwrap(); + let hash = replica + .hash_and_insert(key.clone(), author, value) + .await + .unwrap(); res.push((author.id(), key.as_bytes().to_vec(), hash)); } } @@ -509,7 +515,8 @@ mod tests { format!("from alice by {author}: {i}"), ) }, - ); + ) + .await; all_messages.extend_from_slice(&alice_messages); let mut bob_replica = bob_store.new_replica(namespace.clone()).unwrap(); @@ -524,7 +531,8 @@ mod tests { format!("from bob by {author}: {i}"), ) }, - ); + ) + .await; all_messages.extend_from_slice(&bob_messages); all_messages.sort(); @@ -622,6 +630,7 @@ mod tests { #[tokio::test] #[traced_test] + #[cfg(feature = "fs-store")] async fn test_sync_timestamps_fs() -> Result<()> { let tmpdir = tempfile::tempdir()?; let alice_store = store::fs::Store::persistent(tmpdir.path().join("a.db"))?; @@ -646,10 +655,12 @@ mod tests { // Insert into alice let hash_alice = alice_replica .hash_and_insert(&key, &author, &value_alice) + .await .unwrap(); // Insert into bob let hash_bob = bob_replica .hash_and_insert(&key, &author, &value_bob) + .await .unwrap(); assert_eq!( diff --git a/src/protocol.rs b/src/protocol.rs index 74cc4497..8d369a76 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,6 +1,6 @@ //! [`ProtocolHandler`] implementation for the docs [`Engine`]. -use std::{path::PathBuf, sync::Arc}; +use std::sync::Arc; use anyhow::Result; use iroh::{endpoint::Connection, protocol::ProtocolHandler, Endpoint}; @@ -13,6 +13,14 @@ use crate::{ store::Store, }; +#[derive(Default, Debug)] +enum Storage { + #[default] + Memory, + #[cfg(feature = "fs-store")] + Persistent(std::path::PathBuf), +} + /// Docs protocol. #[derive(Debug, Clone)] pub struct Docs { @@ -28,9 +36,10 @@ impl Docs { /// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage /// in the given directory. - pub fn persistent(path: PathBuf) -> Builder { + #[cfg(feature = "fs-store")] + pub fn persistent(path: std::path::PathBuf) -> Builder { Builder { - path: Some(path), + storage: Storage::Persistent(path), protect_cb: None, } } @@ -75,7 +84,7 @@ impl ProtocolHandler for Docs { /// Builder for the docs protocol. #[derive(Debug, Default)] pub struct Builder { - path: Option, + storage: Storage, protect_cb: Option, } @@ -95,13 +104,17 @@ impl Builder { blobs: BlobsStore, gossip: Gossip, ) -> anyhow::Result { - let replica_store = match self.path { - Some(ref path) => Store::persistent(path.join("docs.redb"))?, - None => Store::memory(), + let replica_store = match &self.storage { + Storage::Memory => Store::memory(), + #[cfg(feature = "fs-store")] + Storage::Persistent(path) => Store::persistent(path.join("docs.redb"))?, }; - let author_store = match self.path { - Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")), - None => DefaultAuthorStorage::Mem, + let author_store = match &self.storage { + Storage::Memory => DefaultAuthorStorage::Mem, + #[cfg(feature = "fs-store")] + Storage::Persistent(path) => { + DefaultAuthorStorage::Persistent(path.join("default-author")) + } }; let downloader = blobs.downloader(&endpoint); let engine = Engine::spawn( diff --git a/src/ranger.rs b/src/ranger.rs index c520eca1..6d5ef87c 100644 --- a/src/ranger.rs +++ b/src/ranger.rs @@ -1,7 +1,7 @@ //! Implementation of Set Reconcilliation based on //! "Range-Based Set Reconciliation" by Aljoscha Meyer. -use std::{fmt::Debug, pin::Pin}; +use std::fmt::Debug; use n0_future::StreamExt; use serde::{Deserialize, Serialize}; @@ -288,6 +288,7 @@ pub trait Store: Sized { /// /// This will remove just the entry with the given key, but will not perform prefix deletion. #[cfg(test)] + #[allow(unused)] fn entry_remove(&mut self, key: &E::Key) -> Result, Self::Error>; /// Remove all entries whose key start with a prefix and for which the `predicate` callback @@ -330,11 +331,8 @@ pub trait Store: Sized { ) -> Result>, Self::Error> where F: Fn(&Self, &E, ContentStatus) -> bool, - F2: FnMut(&Self, E, ContentStatus), - F3: for<'a> Fn( - &'a E, - ) - -> Pin + Send + 'a>>, + F2: AsyncFnMut(&Self, E, ContentStatus), + F3: for<'a> AsyncFn(&'a E) -> ContentStatus, { let mut out = Vec::new(); @@ -397,7 +395,7 @@ pub trait Store: Sized { // TODO: Get rid of the clone? let outcome = self.put(entry.clone())?; if let InsertOutcome::Inserted { .. } = outcome { - on_insert_cb(self, entry, content_status); + on_insert_cb(self, entry, content_status).await; } } } @@ -1419,8 +1417,8 @@ mod tests { &Default::default(), msg, &bob_validate_cb, - |_, _, _| (), - |_| Box::pin(async move { ContentStatus::Complete }), + async |_, _, _| (), + async |_| ContentStatus::Complete, ) .await .unwrap() @@ -1431,8 +1429,8 @@ mod tests { &Default::default(), msg, &alice_validate_cb, - |_, _, _| (), - |_| Box::pin(async move { ContentStatus::Complete }), + async |_, _, _| (), + async |_| ContentStatus::Complete, ) .await .unwrap(); diff --git a/src/store/fs.rs b/src/store/fs.rs index 6a0f6c54..b68d3e12 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -6,15 +6,14 @@ use std::{ iter::{Chain, Flatten}, num::NonZeroU64, ops::Bound, - path::Path, }; use anyhow::{anyhow, Result}; use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_blobs::Hash; use rand::CryptoRng; -use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable}; -use tracing::{info, warn}; +use redb::{Database, ReadableMultimapTable, ReadableTable}; +use tracing::warn; use super::{ pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore, @@ -98,16 +97,17 @@ impl Store { /// Create or open a store from a `path` to a database file. /// /// The file will be created if it does not exist, otherwise it will be opened. - pub fn persistent(path: impl AsRef) -> Result { + #[cfg(feature = "fs-store")] + pub fn persistent(path: impl AsRef) -> Result { let mut db = match Database::create(&path) { Ok(db) => db, - Err(DatabaseError::UpgradeRequired(1)) => return Err( + Err(redb::DatabaseError::UpgradeRequired(1)) => return Err( anyhow!("Opening the database failed: Upgrading from old format is no longer supported. Use iroh-docs 0.92 to perform the upgrade, then upgrade to the latest release again.") ), Err(err) => return Err(err.into()), }; match db.upgrade() { - Ok(true) => info!("Database was upgraded to redb v3 compatible format"), + Ok(true) => tracing::info!("Database was upgraded to redb v3 compatible format"), Ok(false) => {} Err(err) => warn!("Database upgrade to redb v3 compatible format failed: {err:#}"), } @@ -984,13 +984,13 @@ fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry { SignedEntry::new(entry_signature, entry) } -#[cfg(test)] +#[cfg(all(test, feature = "fs-store"))] mod tests { - use super::{tables::LATEST_PER_AUTHOR_TABLE, *}; + use super::*; use crate::ranger::Store as _; - #[test] - fn test_ranges() -> Result<()> { + #[tokio::test] + async fn test_ranges() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let mut store = Store::persistent(dbfile.path())?; @@ -1001,8 +1001,8 @@ mod tests { // test author prefix relation for all-255 keys let key1 = vec![255, 255]; let key2 = vec![255, 255, 255]; - replica.hash_and_insert(&key1, &author, b"v1")?; - replica.hash_and_insert(&key2, &author, b"v2")?; + replica.hash_and_insert(&key1, &author, b"v1").await?; + replica.hash_and_insert(&key2, &author, b"v2").await?; let res = store .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))? .collect::>>()?; @@ -1094,7 +1094,7 @@ mod tests { } fn copy_and_modify( - source: &Path, + source: &std::path::Path, modify: impl Fn(&redb::WriteTransaction) -> Result<()>, ) -> Result { let dbfile = tempfile::NamedTempFile::new()?; @@ -1107,8 +1107,10 @@ mod tests { Ok(dbfile) } - #[test] - fn test_migration_001_populate_latest_table() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_migration_001_populate_latest_table() -> Result<()> { + use super::tables::LATEST_PER_AUTHOR_TABLE; let dbfile = tempfile::NamedTempFile::new()?; let namespace = NamespaceSecret::new(&mut rand::rng()); @@ -1118,9 +1120,9 @@ mod tests { let author1 = store.new_author(&mut rand::rng())?; let author2 = store.new_author(&mut rand::rng())?; let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert(b"k1", &author1, b"v1")?; - replica.hash_and_insert(b"k2", &author2, b"v1")?; - replica.hash_and_insert(b"k3", &author1, b"v1")?; + replica.hash_and_insert(b"k1", &author1, b"v1").await?; + replica.hash_and_insert(b"k2", &author2, b"v1").await?; + replica.hash_and_insert(b"k3", &author1, b"v1").await?; let expected = store .get_latest_for_each_author(namespace.id())? @@ -1151,6 +1153,7 @@ mod tests { } #[test] + #[cfg(feature = "fs-store")] fn test_migration_004_populate_by_key_index() -> Result<()> { use redb::ReadableTableMetadata; let dbfile = tempfile::NamedTempFile::new()?; diff --git a/src/store/fs/tables.rs b/src/store/fs/tables.rs index d683d038..34744162 100644 --- a/src/store/fs/tables.rs +++ b/src/store/fs/tables.rs @@ -1,9 +1,8 @@ #![allow(missing_docs)] // Table Definitions -use std::time::Instant; - use bytes::Bytes; +use n0_future::time::Instant; use redb::{ MultimapTable, MultimapTableDefinition, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, Table, TableDefinition, WriteTransaction, @@ -61,7 +60,7 @@ pub type RecordsByKeyIdOwned = ([u8; 32], Bytes, [u8; 32]); /// Value: `(u64, [u8; 32])` # ([`Nanos`], &[`PeerIdBytes`]) representing the last time a peer was used. pub const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> = MultimapTableDefinition::new("sync-peers-1"); -/// Number of seconds elapsed since [`std::time::SystemTime::UNIX_EPOCH`]. Used to register the +/// Number of seconds elapsed since [`n0_future::time::SystemTime::UNIX_EPOCH`]. Used to register the /// last time a peer was useful in a document. // NOTE: resolution is nanoseconds, stored as a u64 since this covers ~500years from unix epoch, // which should be more than enough diff --git a/src/sync.rs b/src/sync.rs index ddb140b6..35c91c71 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -11,12 +11,13 @@ use std::{ fmt::Debug, ops::{Deref, DerefMut}, sync::Arc, - time::{Duration, SystemTime}, + time::Duration, }; use bytes::{Bytes, BytesMut}; use ed25519_dalek::{Signature, SignatureError}; use iroh_blobs::Hash; +use n0_future::{time::SystemTime, IterExt}; use serde::{Deserialize, Serialize}; pub use crate::heads::AuthorHeads; @@ -129,16 +130,22 @@ impl Subscribers { pub fn unsubscribe(&mut self, sender: &async_channel::Sender) { self.0.retain(|s| !same_channel(s, sender)); } - pub fn send(&mut self, event: Event) { - self.0 - .retain(|sender| sender.send_blocking(event.clone()).is_ok()) + pub async fn send(&mut self, event: Event) { + self.0 = std::mem::take(&mut self.0) + .into_iter() + .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx)) + .join_all() + .await + .into_iter() + .flatten() + .collect(); } pub fn len(&self) -> usize { self.0.len() } - pub fn send_with(&mut self, f: impl FnOnce() -> Event) { + pub async fn send_with(&mut self, f: impl FnOnce() -> Event) { if !self.0.is_empty() { - self.send(f()) + self.send(f()).await } } } @@ -361,7 +368,7 @@ where /// /// Returns the number of entries removed as a consequence of this insertion, /// or an error either if the entry failed to validate or if a store operation failed. - pub fn insert( + pub async fn insert( &mut self, key: impl AsRef<[u8]>, author: &Author, @@ -377,7 +384,7 @@ where let entry = Entry::new(id, record); let secret = self.secret_key()?; let signed_entry = entry.sign(secret, author); - self.insert_entry(signed_entry, InsertOrigin::Local) + self.insert_entry(signed_entry, InsertOrigin::Local).await } /// Delete entries that match the given `author` and key `prefix`. @@ -386,7 +393,7 @@ where /// entries whose key starts with or is equal to the given `prefix`. /// /// Returns the number of entries deleted. - pub fn delete_prefix( + pub async fn delete_prefix( &mut self, prefix: impl AsRef<[u8]>, author: &Author, @@ -395,7 +402,7 @@ where let id = RecordIdentifier::new(self.id(), author.id(), prefix); let entry = Entry::new_empty(id); let signed_entry = entry.sign(self.secret_key()?, author); - self.insert_entry(signed_entry, InsertOrigin::Local) + self.insert_entry(signed_entry, InsertOrigin::Local).await } /// Insert an entry into this replica which was received from a remote peer. @@ -405,7 +412,7 @@ where /// /// Returns the number of entries removed as a consequence of this insertion, /// or an error if the entry failed to validate or if a store operation failed. - pub fn insert_remote_entry( + pub async fn insert_remote_entry( &mut self, entry: SignedEntry, received_from: PeerIdBytes, @@ -417,13 +424,13 @@ where from: received_from, remote_content_status: content_status, }; - self.insert_entry(entry, origin) + self.insert_entry(entry, origin).await } /// Insert a signed entry into the database. /// /// Returns the number of entries removed as a consequence of this insertion. - fn insert_entry( + async fn insert_entry( &mut self, entry: SignedEntry, origin: InsertOrigin, @@ -462,7 +469,7 @@ where } }; - self.info.subscribers.send(insert_event); + self.info.subscribers.send(insert_event).await; Ok(removed_count) } @@ -471,7 +478,7 @@ where /// /// This does not store the content, just the record of it. /// Returns the calculated hash. - pub fn hash_and_insert( + pub async fn hash_and_insert( &mut self, key: impl AsRef<[u8]>, author: &Author, @@ -480,7 +487,7 @@ where self.info.ensure_open()?; let len = data.as_ref().len() as u64; let hash = Hash::new(data); - self.insert(key, author, hash, len)?; + self.insert(key, author, hash, len).await?; Ok(hash) } @@ -537,29 +544,29 @@ where validate_entry(now, store, my_namespace, entry, &origin).is_ok() }, // on_insert callback: is called when an entry was actually inserted in the store - |_store, entry, content_status| { + async |_store, entry, content_status| { // We use `send_with` to only clone the entry if we have active subscriptions. - self.info.subscribers.send_with(|| { - let should_download = download_policy.matches(entry.entry()); - Event::RemoteInsert { - from: from_peer, - namespace: my_namespace, - entry: entry.clone(), - should_download, - remote_content_status: content_status, - } - }) + self.info + .subscribers + .send_with(|| { + let should_download = download_policy.matches(entry.entry()); + Event::RemoteInsert { + from: from_peer, + namespace: my_namespace, + entry: entry.clone(), + should_download, + remote_content_status: content_status, + } + }) + .await }, // content_status callback: get content status for outgoing entries - move |entry| { - let cb = cb.clone(); - Box::pin(async move { - if let Some(cb) = cb.as_ref() { - cb(entry.content_hash()).await - } else { - ContentStatus::Missing - } - }) + async move |entry| { + if let Some(cb) = cb.as_ref() { + cb(entry.content_hash()).await + } else { + ContentStatus::Missing + } }, ) .await?; @@ -1205,23 +1212,24 @@ mod tests { store::{OpenError, Query, SortBy, SortDirection, Store}, }; - #[test] - fn test_basics_memory() -> Result<()> { + #[tokio::test] + async fn test_basics_memory() -> Result<()> { let store = store::Store::memory(); - test_basics(store)?; + test_basics(store).await?; Ok(()) } - #[test] - fn test_basics_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_basics_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_basics(store)?; + test_basics(store).await?; Ok(()) } - fn test_basics(mut store: Store) -> Result<()> { + async fn test_basics(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let alice = Author::new(&mut rng); let bob = Author::new(&mut rng); @@ -1235,11 +1243,9 @@ mod tests { let mut my_replica = store.new_replica(myspace.clone())?; for i in 0..10 { - my_replica.hash_and_insert( - format!("/{i}"), - &alice, - format!("{i}: hello from alice"), - )?; + my_replica + .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice")) + .await?; } for i in 0..10 { @@ -1253,13 +1259,17 @@ mod tests { // Test multiple records for the same key let mut my_replica = store.new_replica(myspace.clone())?; - my_replica.hash_and_insert("/cool/path", &alice, "round 1")?; + my_replica + .hash_and_insert("/cool/path", &alice, "round 1") + .await?; let _entry = store .get_exact(myspace.id(), alice.id(), "/cool/path", false)? .unwrap(); // Second let mut my_replica = store.new_replica(myspace.clone())?; - my_replica.hash_and_insert("/cool/path", &alice, "round 2")?; + my_replica + .hash_and_insert("/cool/path", &alice, "round 2") + .await?; let _entry = store .get_exact(myspace.id(), alice.id(), "/cool/path", false)? .unwrap(); @@ -1290,7 +1300,9 @@ mod tests { // insert record from different author let mut my_replica = store.new_replica(myspace.clone())?; - let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?; + let _entry = my_replica + .hash_and_insert("/cool/path", &bob, "bob round 1") + .await?; // Get All by author let entries: Vec<_> = store @@ -1392,20 +1404,21 @@ mod tests { Ok(()) } - #[test] - fn test_content_hashes_iterator_memory() -> Result<()> { + #[tokio::test] + async fn test_content_hashes_iterator_memory() -> Result<()> { let store = store::Store::memory(); - test_content_hashes_iterator(store) + test_content_hashes_iterator(store).await } - #[test] - fn test_content_hashes_iterator_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_content_hashes_iterator_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_content_hashes_iterator(store) + test_content_hashes_iterator(store).await } - fn test_content_hashes_iterator(mut store: Store) -> Result<()> { + async fn test_content_hashes_iterator(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let mut expected = HashSet::new(); let n_replicas = 3; @@ -1417,7 +1430,7 @@ mod tests { for j in 0..n_entries { let key = format!("{j}"); let data = format!("{i}:{j}"); - let hash = replica.hash_and_insert(key, &author, data)?; + let hash = replica.hash_and_insert(key, &author, data).await?; expected.insert(hash); } } @@ -1517,23 +1530,24 @@ mod tests { } } - #[test] - fn test_timestamps_memory() -> Result<()> { + #[tokio::test] + async fn test_timestamps_memory() -> Result<()> { let store = store::Store::memory(); - test_timestamps(store)?; + test_timestamps(store).await?; Ok(()) } - #[test] - fn test_timestamps_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_timestamps_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_timestamps(store)?; + test_timestamps(store).await?; Ok(()) } - fn test_timestamps(mut store: Store) -> Result<()> { + async fn test_timestamps(mut store: Store) -> Result<()> { let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); let namespace = NamespaceSecret::new(&mut rng); let _replica = store.new_replica(namespace.clone())?; @@ -1552,6 +1566,7 @@ mod tests { replica .insert_entry(entry.clone(), InsertOrigin::Local) + .await .unwrap(); store.close_replica(namespace.id()); let res = store @@ -1567,7 +1582,7 @@ mod tests { }; let mut replica = store.open_replica(&namespace.id())?; - let res = replica.insert_entry(entry2, InsertOrigin::Local); + let res = replica.insert_entry(entry2, InsertOrigin::Local).await; store.close_replica(namespace.id()); assert!(matches!(res, Err(InsertError::NewerEntryExists))); let res = store @@ -1588,6 +1603,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_replica_sync_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; @@ -1607,12 +1623,12 @@ mod tests { let myspace = NamespaceSecret::new(&mut rng); let mut alice = alice_store.new_replica(myspace.clone())?; for el in &alice_set { - alice.hash_and_insert(el, &author, el.as_bytes())?; + alice.hash_and_insert(el, &author, el.as_bytes()).await?; } let mut bob = bob_store.new_replica(myspace.clone())?; for el in &bob_set { - bob.hash_and_insert(el, &author, el.as_bytes())?; + bob.hash_and_insert(el, &author, el.as_bytes()).await?; } let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?; @@ -1641,6 +1657,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_replica_timestamp_sync_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; @@ -1664,9 +1681,9 @@ mod tests { let key = b"key"; let alice_value = b"alice"; let bob_value = b"bob"; - let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?; + let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?; // system time increased - sync should overwrite - let bob_hash = bob.hash_and_insert(key, &author, bob_value)?; + let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?; sync(&mut alice, &mut bob).await?; assert_eq!( get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?, @@ -1682,8 +1699,8 @@ mod tests { let alice_value_2 = b"alice2"; // system time increased - sync should overwrite - let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?; - let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?; + let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?; + let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?; sync(&mut alice, &mut bob).await?; assert_eq!( get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?, @@ -1698,8 +1715,8 @@ mod tests { Ok(()) } - #[test] - fn test_future_timestamp() -> Result<()> { + #[tokio::test] + async fn test_future_timestamp() -> Result<()> { let mut rng = rand::rng(); let mut store = store::Store::memory(); let author = Author::new(&mut rng); @@ -1710,7 +1727,9 @@ mod tests { let t = system_time_now(); let record = Record::from_data(b"1", t); let entry0 = SignedEntry::from_parts(&namespace, &author, key, record); - replica.insert_entry(entry0.clone(), InsertOrigin::Local)?; + replica + .insert_entry(entry0.clone(), InsertOrigin::Local) + .await?; assert_eq!( get_entry(&mut store, namespace.id(), author.id(), key)?, @@ -1721,7 +1740,9 @@ mod tests { let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000; let record = Record::from_data(b"2", t); let entry1 = SignedEntry::from_parts(&namespace, &author, key, record); - replica.insert_entry(entry1.clone(), InsertOrigin::Local)?; + replica + .insert_entry(entry1.clone(), InsertOrigin::Local) + .await?; assert_eq!( get_entry(&mut store, namespace.id(), author.id(), key)?, entry1 @@ -1731,7 +1752,9 @@ mod tests { let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT; let record = Record::from_data(b"2", t); let entry2 = SignedEntry::from_parts(&namespace, &author, key, record); - replica.insert_entry(entry2.clone(), InsertOrigin::Local)?; + replica + .insert_entry(entry2.clone(), InsertOrigin::Local) + .await?; assert_eq!( get_entry(&mut store, namespace.id(), author.id(), key)?, entry2 @@ -1741,7 +1764,7 @@ mod tests { let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000; let record = Record::from_data(b"2", t); let entry3 = SignedEntry::from_parts(&namespace, &author, key, record); - let res = replica.insert_entry(entry3, InsertOrigin::Local); + let res = replica.insert_entry(entry3, InsertOrigin::Local).await; assert!(matches!( res, Err(InsertError::Validation( @@ -1756,42 +1779,43 @@ mod tests { Ok(()) } - #[test] - fn test_insert_empty() -> Result<()> { + #[tokio::test] + async fn test_insert_empty() -> Result<()> { let mut store = store::Store::memory(); let mut rng = rand::rng(); let alice = Author::new(&mut rng); let myspace = NamespaceSecret::new(&mut rng); let mut replica = store.new_replica(myspace.clone())?; let hash = Hash::new(b""); - let res = replica.insert(b"foo", &alice, hash, 0); + let res = replica.insert(b"foo", &alice, hash, 0).await; assert!(matches!(res, Err(InsertError::EntryIsEmpty))); store.flush()?; Ok(()) } - #[test] - fn test_prefix_delete_memory() -> Result<()> { + #[tokio::test] + async fn test_prefix_delete_memory() -> Result<()> { let store = store::Store::memory(); - test_prefix_delete(store)?; + test_prefix_delete(store).await?; Ok(()) } - #[test] - fn test_prefix_delete_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_prefix_delete_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_prefix_delete(store)?; + test_prefix_delete(store).await?; Ok(()) } - fn test_prefix_delete(mut store: Store) -> Result<()> { + async fn test_prefix_delete(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let alice = Author::new(&mut rng); let myspace = NamespaceSecret::new(&mut rng); let mut replica = store.new_replica(myspace.clone())?; - let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?; - let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?; + let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?; + let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?; // sanity checks assert_eq!( @@ -1805,7 +1829,7 @@ mod tests { // delete let mut replica = store.new_replica(myspace.clone())?; - let deleted = replica.delete_prefix(b"foo", &alice)?; + let deleted = replica.delete_prefix(b"foo", &alice).await?; assert_eq!(deleted, 2); assert_eq!( store.get_exact(myspace.id(), alice.id(), b"foobar", false)?, @@ -1832,6 +1856,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_replica_sync_delete_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; @@ -1849,12 +1874,12 @@ mod tests { let myspace = NamespaceSecret::new(&mut rng); let mut alice = alice_store.new_replica(myspace.clone())?; for el in &alice_set { - alice.hash_and_insert(el, &author, el.as_bytes())?; + alice.hash_and_insert(el, &author, el.as_bytes()).await?; } let mut bob = bob_store.new_replica(myspace.clone())?; for el in &bob_set { - bob.hash_and_insert(el, &author, el.as_bytes())?; + bob.hash_and_insert(el, &author, el.as_bytes()).await?; } sync(&mut alice, &mut bob).await?; @@ -1866,8 +1891,9 @@ mod tests { let mut alice = alice_store.new_replica(myspace.clone())?; let mut bob = bob_store.new_replica(myspace.clone())?; - alice.delete_prefix("foo", &author)?; - bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?; + alice.delete_prefix("foo", &author).await?; + bob.hash_and_insert("fooz", &author, "fooz".as_bytes()) + .await?; sync(&mut alice, &mut bob).await?; check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?; check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?; @@ -1876,27 +1902,28 @@ mod tests { Ok(()) } - #[test] - fn test_replica_remove_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_remove_memory() -> Result<()> { let alice_store = store::Store::memory(); - test_replica_remove(alice_store) + test_replica_remove(alice_store).await } - #[test] - fn test_replica_remove_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_remove_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; - test_replica_remove(alice_store) + test_replica_remove(alice_store).await } - fn test_replica_remove(mut store: Store) -> Result<()> { + async fn test_replica_remove(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let namespace = NamespaceSecret::new(&mut rng); let author = Author::new(&mut rng); let mut replica = store.new_replica(namespace.clone())?; // insert entry - let hash = replica.hash_and_insert(b"foo", &author, b"bar")?; + let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?; let res = store .get_many(namespace.id(), Query::all())? .collect::>(); @@ -1919,7 +1946,7 @@ mod tests { // may recreate replica let mut replica = store.new_replica(namespace.clone())?; - replica.insert(b"foo", &author, hash, 3)?; + replica.insert(b"foo", &author, hash, 3).await?; let res = store .get_many(namespace.id(), Query::all())? .collect::>(); @@ -1928,20 +1955,21 @@ mod tests { Ok(()) } - #[test] - fn test_replica_delete_edge_cases_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_delete_edge_cases_memory() -> Result<()> { let store = store::Store::memory(); - test_replica_delete_edge_cases(store) + test_replica_delete_edge_cases(store).await } - #[test] - fn test_replica_delete_edge_cases_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_delete_edge_cases_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_delete_edge_cases(store) + test_replica_delete_edge_cases(store).await } - fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> { + async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let author = Author::new(&mut rng); let namespace = NamespaceSecret::new(&mut rng); @@ -1956,23 +1984,23 @@ mod tests { for suffix in edgecases { let key = [prefix, suffix].to_vec(); expected.push(key.clone()); - replica.insert(&key, &author, hash, len)?; + replica.insert(&key, &author, hash, len).await?; } assert_keys(&mut store, namespace.id(), expected); let mut replica = store.new_replica(namespace.clone())?; - replica.delete_prefix([prefix], &author)?; + replica.delete_prefix([prefix], &author).await?; assert_keys(&mut store, namespace.id(), vec![]); } let mut replica = store.new_replica(namespace.clone())?; let key = vec![1u8, 0u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let key = vec![1u8, 1u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let key = vec![1u8, 2u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let prefix = vec![1u8, 1u8]; - replica.delete_prefix(prefix, &author)?; + replica.delete_prefix(prefix, &author).await?; assert_keys( &mut store, namespace.id(), @@ -1981,11 +2009,11 @@ mod tests { let mut replica = store.new_replica(namespace.clone())?; let key = vec![0u8, 255u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let key = vec![0u8, 0u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let prefix = vec![0u8]; - replica.delete_prefix(prefix, &author)?; + replica.delete_prefix(prefix, &author).await?; assert_keys( &mut store, namespace.id(), @@ -1995,27 +2023,28 @@ mod tests { Ok(()) } - #[test] - fn test_latest_iter_memory() -> Result<()> { + #[tokio::test] + async fn test_latest_iter_memory() -> Result<()> { let store = store::Store::memory(); - test_latest_iter(store) + test_latest_iter(store).await } - #[test] - fn test_latest_iter_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_latest_iter_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_latest_iter(store) + test_latest_iter(store).await } - fn test_latest_iter(mut store: Store) -> Result<()> { + async fn test_latest_iter(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let author0 = Author::new(&mut rng); let author1 = Author::new(&mut rng); let namespace = NamespaceSecret::new(&mut rng); let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert(b"a0.1", &author0, b"hi")?; + replica.hash_and_insert(b"a0.1", &author0, b"hi").await?; let latest = store .get_latest_for_each_author(namespace.id())? .collect::>>()?; @@ -2023,8 +2052,8 @@ mod tests { assert_eq!(latest[0].2, b"a0.1".to_vec()); let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert(b"a1.1", &author1, b"hi")?; - replica.hash_and_insert(b"a0.2", &author0, b"hi")?; + replica.hash_and_insert(b"a1.1", &author1, b"hi").await?; + replica.hash_and_insert(b"a0.2", &author0, b"hi").await?; let latest = store .get_latest_for_each_author(namespace.id())? .collect::>>()?; @@ -2035,24 +2064,25 @@ mod tests { Ok(()) } - #[test] - fn test_replica_byte_keys_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_byte_keys_memory() -> Result<()> { let store = store::Store::memory(); - test_replica_byte_keys(store)?; + test_replica_byte_keys(store).await?; Ok(()) } - #[test] - fn test_replica_byte_keys_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_byte_keys_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_byte_keys(store)?; + test_replica_byte_keys(store).await?; Ok(()) } - fn test_replica_byte_keys(mut store: Store) -> Result<()> { + async fn test_replica_byte_keys(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let author = Author::new(&mut rng); let namespace = NamespaceSecret::new(&mut rng); @@ -2062,11 +2092,11 @@ mod tests { let key = vec![1u8, 0u8]; let mut replica = store.new_replica(namespace.clone())?; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]); let key = vec![1u8, 2u8]; let mut replica = store.new_replica(namespace.clone())?; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; assert_keys( &mut store, namespace.id(), @@ -2075,7 +2105,7 @@ mod tests { let key = vec![0u8, 255u8]; let mut replica = store.new_replica(namespace.clone())?; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; assert_keys( &mut store, namespace.id(), @@ -2085,21 +2115,22 @@ mod tests { Ok(()) } - #[test] - fn test_replica_capability_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_capability_memory() -> Result<()> { let store = store::Store::memory(); - test_replica_capability(store) + test_replica_capability(store).await } - #[test] - fn test_replica_capability_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_capability_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_capability(store) + test_replica_capability(store).await } #[allow(clippy::redundant_pattern_matching)] - fn test_replica_capability(mut store: Store) -> Result<()> { + async fn test_replica_capability(mut store: Store) -> Result<()> { let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); let author = store.new_author(&mut rng)?; let namespace = NamespaceSecret::new(&mut rng); @@ -2108,18 +2139,18 @@ mod tests { let capability = Capability::Read(namespace.id()); store.import_namespace(capability)?; let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(matches!(res, Err(InsertError::ReadOnly))); // import write capability - insert must succeed let capability = Capability::Write(namespace.clone()); store.import_namespace(capability)?; let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(matches!(res, Ok(_))); store.close_replica(namespace.id()); let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(res.is_ok()); // import read capability again - insert must still succeed @@ -2127,7 +2158,7 @@ mod tests { store.import_namespace(capability)?; store.close_replica(namespace.id()); let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(res.is_ok()); store.flush()?; Ok(()) @@ -2140,6 +2171,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_actor_capability_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; @@ -2217,7 +2249,7 @@ mod tests { replica1.info.subscribe(events1_sender); replica2.info.subscribe(events2_sender); - replica1.hash_and_insert(b"foo", &author, b"init")?; + replica1.hash_and_insert(b"foo", &author, b"init").await?; let from1 = replica1.sync_initial_message()?; let from2 = replica2 @@ -2233,7 +2265,7 @@ mod tests { // now we will receive the entry from rpelica1. we will insert a newer entry now, while the // sync is already running. this means the entry from replica1 will be rejected. we make // sure that no InsertRemote event is emitted for this entry. - replica2.hash_and_insert(b"foo", &author, b"update")?; + replica2.hash_and_insert(b"foo", &author, b"update").await?; let from2 = replica2 .sync_process_message(from1, peer1, &mut state2) .await @@ -2254,24 +2286,25 @@ mod tests { Ok(()) } - #[test] - fn test_replica_queries_mem() -> Result<()> { + #[tokio::test] + async fn test_replica_queries_mem() -> Result<()> { let store = store::Store::memory(); - test_replica_queries(store)?; + test_replica_queries(store).await?; Ok(()) } - #[test] - fn test_replica_queries_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_queries_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_queries(store)?; + test_replica_queries(store).await?; Ok(()) } - fn test_replica_queries(mut store: Store) -> Result<()> { + async fn test_replica_queries(mut store: Store) -> Result<()> { let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); let namespace = NamespaceSecret::new(&mut rng); let namespace_id = namespace.id(); @@ -2287,10 +2320,10 @@ mod tests { ); let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert("hi/world", &a2, "a2")?; - replica.hash_and_insert("hi/world", &a1, "a1")?; - replica.hash_and_insert("hi/moon", &a2, "a1")?; - replica.hash_and_insert("hi", &a3, "a3")?; + replica.hash_and_insert("hi/world", &a2, "a2").await?; + replica.hash_and_insert("hi/world", &a1, "a1").await?; + replica.hash_and_insert("hi/moon", &a2, "a1").await?; + replica.hash_and_insert("hi", &a3, "a3").await?; struct QueryTester<'a> { store: &'a mut Store, @@ -2420,7 +2453,7 @@ mod tests { ); let mut replica = store.new_replica(namespace)?; - replica.delete_prefix("hi/world", &a2)?; + replica.delete_prefix("hi/world", &a2).await?; let mut qt = QueryTester { store: &mut store, namespace: namespace_id, @@ -2451,6 +2484,7 @@ mod tests { } #[test] + #[cfg(feature = "fs-store")] fn test_dl_policies_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let mut store = store::fs::Store::persistent(dbfile.path())?; diff --git a/tests/client.rs b/tests/client.rs index ed93a949..6ba45c6b 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -31,7 +31,7 @@ async fn test_doc_close() -> Result<()> { // dropping doc1 will close the doc if not already closed // wait a bit because the close-on-drop spawns a task for which we cannot track completion. drop(doc1); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + n0_future::time::sleep(n0_future::time::Duration::from_millis(100)).await; // operations on doc2 still succeed doc2.set_bytes(author, "foo", "bar").await?; @@ -167,6 +167,7 @@ async fn test_default_author_memory() -> Result<()> { #[tokio::test] #[traced_test] +#[cfg(feature = "fs-store")] async fn test_default_author_persist() -> TestResult<()> { let iroh_root_dir = tempfile::TempDir::new()?; let iroh_root = iroh_root_dir.path(); @@ -216,7 +217,7 @@ async fn test_default_author_persist() -> TestResult<()> { // somehow the blob store is not shutdown correctly (yet?) on macos. // so we give it some time until we find a proper fix. #[cfg(target_os = "macos")] - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + n0_future::time::sleep(std::time::Duration::from_secs(1)).await; tokio::fs::remove_file(iroh_root.join("default-author")).await?; drop(iroh); diff --git a/tests/gc.rs b/tests/gc.rs index df726a75..a297f4e0 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -1,3 +1,5 @@ +#![allow(unused)] + use std::{path::PathBuf, time::Duration}; use anyhow::Result; @@ -18,6 +20,7 @@ pub fn create_test_data(size: usize) -> Bytes { } /// Wrap a bao store in a node that has gc enabled. +#[cfg(feature = "fs-store")] async fn persistent_node( path: PathBuf, gc_period: Duration, @@ -35,6 +38,7 @@ async fn persistent_node( } #[tokio::test] +#[cfg(feature = "fs-store")] async fn redb_doc_import_stress() -> Result<()> { let _ = tracing_subscriber::fmt::try_init(); let dir = testdir!(); diff --git a/tests/sync.rs b/tests/sync.rs index 32304fc7..805bc051 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -1,9 +1,4 @@ -use std::{ - collections::HashMap, - future::Future, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, future::Future, sync::Arc, time::Duration}; use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; @@ -20,7 +15,9 @@ use iroh_docs::{ store::{DownloadPolicy, FilterKind, Query}, AuthorId, ContentStatus, Entry, }; +use n0_future::time::Instant; use rand::{CryptoRng, Rng, SeedableRng}; +#[cfg(feature = "fs-store")] use tempfile::tempdir; use tracing::{debug, error_span, info, Instrument}; use tracing_test::traced_test; @@ -140,7 +137,7 @@ async fn sync_subscribe_no_sync() -> Result<()> { let mut sub = doc.subscribe().await?; let author = client.docs().author_create().await?; doc.set_bytes(author, b"k".to_vec(), b"v".to_vec()).await?; - let event = tokio::time::timeout(Duration::from_millis(100), sub.next()).await?; + let event = n0_future::time::timeout(Duration::from_millis(100), sub.next()).await?; assert!( matches!(event, Some(Ok(LiveEvent::InsertLocal { .. }))), "expected InsertLocal but got {event:?}" @@ -588,6 +585,7 @@ async fn test_sync_via_relay() -> Result<()> { #[tokio::test] #[traced_test] #[ignore = "flaky"] +#[cfg(feature = "fs-store")] async fn sync_restart_node() -> Result<()> { let mut rng = test_rng(b"sync_restart_node"); let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?; @@ -657,7 +655,7 @@ async fn sync_restart_node() -> Result<()> { info!(me = %id1.fmt_short(), "node1 down"); info!(me = %id1.fmt_short(), "sleep 1s"); - tokio::time::sleep(Duration::from_secs(1)).await; + n0_future::time::sleep(Duration::from_secs(1)).await; info!(me = %id2.fmt_short(), "node2 set b"); let hash_b = doc2.set_bytes(author2, "n2/b", "b").await?; @@ -843,7 +841,7 @@ async fn test_download_policies() -> Result<()> { (downloaded_a, downloaded_b) }; - let (downloaded_a, mut downloaded_b) = tokio::time::timeout(TIMEOUT, fut) + let (downloaded_a, mut downloaded_b) = n0_future::time::timeout(TIMEOUT, fut) .await .context("timeout elapsed")?; @@ -874,7 +872,7 @@ async fn sync_big() -> Result<()> { tokio::task::spawn(async move { for i in 0.. { - tokio::time::sleep(Duration::from_secs(1)).await; + n0_future::time::sleep(Duration::from_secs(1)).await; info!("tick {i}"); } }); @@ -1012,7 +1010,7 @@ async fn test_list_docs_stream() -> testresult::TestResult<()> { } }; - tokio::time::timeout(Duration::from_secs(2), fut) + n0_future::time::timeout(Duration::from_secs(2), fut) .await .expect("not to timeout"); @@ -1081,7 +1079,7 @@ async fn wait_for_events( matcher: impl Fn(&LiveEvent) -> bool, ) -> anyhow::Result> { let mut res = Vec::with_capacity(count); - let sleep = tokio::time::sleep(timeout); + let sleep = n0_future::time::sleep(timeout); tokio::pin!(sleep); while res.len() < count { tokio::select! { @@ -1160,6 +1158,7 @@ impl PartialEq for (Entry, Bytes) { #[tokio::test] #[traced_test] +#[cfg(feature = "fs-store")] async fn doc_delete() -> Result<()> { let tempdir = tempdir()?; // TODO(Frando): iroh-blobs has gc only for fs store atm, change test to test both @@ -1184,7 +1183,7 @@ async fn doc_delete() -> Result<()> { // wait for gc // TODO: allow to manually trigger gc - tokio::time::sleep(Duration::from_secs(2)).await; + n0_future::time::sleep(Duration::from_secs(2)).await; let bytes = client.blobs().get_bytes(hash).await; assert!(bytes.is_err()); node.shutdown().await?; @@ -1289,7 +1288,7 @@ async fn assert_next( } items }; - let res = tokio::time::timeout(timeout, fut).await; + let res = n0_future::time::timeout(timeout, fut).await; res.expect("timeout reached") } @@ -1357,7 +1356,7 @@ async fn assert_next_unordered_with_optionals( Ok(()) }; tokio::pin!(fut); - let res = tokio::time::timeout(timeout, fut) + let res = n0_future::time::timeout(timeout, fut) .await .map_err(|_| anyhow!("Timeout reached ({timeout:?})")) .and_then(|res| res); diff --git a/tests/util.rs b/tests/util.rs index f5c844fa..8fa6ffd5 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -1,4 +1,5 @@ -#![allow(dead_code)] +#![allow(unused)] + use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, ops::Deref, @@ -6,7 +7,7 @@ use std::{ }; use iroh::{discovery::IntoDiscovery, dns::DnsResolver, EndpointId, RelayMode, SecretKey}; -use iroh_blobs::store::fs::options::{GcConfig, Options}; +use iroh_blobs::store::GcConfig; use iroh_docs::{engine::ProtectCallbackHandler, protocol::Docs}; use iroh_gossip::net::Gossip; @@ -64,7 +65,7 @@ impl Client { pub struct Builder { endpoint: iroh::endpoint::Builder, use_n0_discovery: bool, - path: Option, + storage: Storage, // node_discovery: Option>, gc_interval: Option, #[debug(skip)] @@ -97,9 +98,10 @@ impl Builder { let endpoint = builder.bind().await?; let mut router = iroh::protocol::Router::builder(endpoint.clone()); let gossip = Gossip::builder().spawn(endpoint.clone()); - let mut docs_builder = match self.path { - Some(ref path) => Docs::persistent(path.to_path_buf()), - None => Docs::memory(), + let mut docs_builder = match self.storage { + Storage::Memory => Docs::memory(), + #[cfg(feature = "fs-store")] + Storage::Persistent(ref path) => Docs::persistent(path.to_path_buf()), }; if let Some(protect_cb) = protect_cb { docs_builder = docs_builder.protect_handler(protect_cb); @@ -183,11 +185,11 @@ impl Builder { self } - fn new(path: Option) -> Self { + fn new(storage: Storage) -> Self { Self { endpoint: iroh::Endpoint::empty_builder(RelayMode::Disabled), use_n0_discovery: true, - path, + storage, gc_interval: None, bind_random_port: false, // node_discovery: None, @@ -197,30 +199,38 @@ impl Builder { } } +#[derive(Debug)] +enum Storage { + Memory, + #[cfg(feature = "fs-store")] + Persistent(PathBuf), +} + impl Node { /// Creates a new node with memory storage pub fn memory() -> Builder { - Builder::new(None) + Builder::new(Storage::Memory) } /// Creates a new node with persistent storage + #[cfg(feature = "fs-store")] pub fn persistent(path: impl AsRef) -> Builder { - let path = Some(path.as_ref().to_owned()); - Builder::new(path) + Builder::new(Storage::Persistent(path.as_ref().to_owned())) } } impl Builder { /// Spawns the node pub async fn spawn(self) -> anyhow::Result { - let (store, protect_handler) = match self.path { - None => { + let (store, protect_handler) = match self.storage { + Storage::Memory => { let store = iroh_blobs::store::mem::MemStore::new(); ((*store).clone(), None) } - Some(ref path) => { + #[cfg(feature = "fs-store")] + Storage::Persistent(ref path) => { let db_path = path.join("blobs.db"); - let mut opts = Options::new(path); + let mut opts = iroh_blobs::store::fs::options::Options::new(path); let protect_handler = if let Some(interval) = self.gc_interval { let (handler, cb) = ProtectCallbackHandler::new(); opts.gc = Some(GcConfig {