diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000..226dec961 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[target.wasm32-unknown-unknown] +runner = "wasm-bindgen-test-runner" +rustflags = ['--cfg', 'getrandom_backend="wasm_js"'] diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b2a48b5e4..7a3eac76c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -277,3 +277,40 @@ jobs: - uses: actions/checkout@v5 - run: pip install --user codespell[toml] - run: codespell --ignore-words-list=ans,atmost,crate,inout,ratatui,ser,stayin,swarmin,worl --skip=CHANGELOG.md + + wasm_build: + name: Build & test wasm32 + runs-on: ubuntu-latest + env: + RUSTFLAGS: '--cfg getrandom_backend="wasm_js"' + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install Node.js + uses: actions/setup-node@v4 + with: + node-version: 20 + + - name: Install stable toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Add wasm target + run: rustup target add wasm32-unknown-unknown + + - name: Install wasm-tools + uses: bytecodealliance/actions/wasm-tools/setup@v1 + + - name: Install wasm-pack + uses: taiki-e/install-action@v2 + with: + tool: wasm-bindgen,wasm-pack + + - name: wasm32 build + run: cargo build --target wasm32-unknown-unknown --no-default-features + + # If the Wasm file contains any 'import "env"' declarations, then + # some non-Wasm-compatible code made it into the final code. + - name: Ensure no 'import "env"' in wasm + run: | + ! wasm-tools print --skeleton target/wasm32-unknown-unknown/debug/iroh_blobs.wasm | grep 'import "env"' \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 76a37d681..7b6d6ad49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -258,9 +258,9 @@ dependencies = [ [[package]] name = "bao-tree" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff16d65e48353db458be63ee395c03028f24564fd48668389bd65fd945f5ac36" +checksum = "06384416b1825e6e04fde63262fda2dc408f5b64c02d04e0d8b70ae72c17a52b" dependencies = [ "blake3", "bytes", @@ -1606,9 +1606,9 @@ dependencies = [ [[package]] name = "iroh" -version = "0.94.0" +version = "0.95.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9428cef1eafd2eac584269986d1949e693877ac12065b401dfde69f664b07ac" +checksum = "2374ba3cdaac152dc6ada92d971f7328e6408286faab3b7350842b2ebbed4789" dependencies = [ "aead", "backon", @@ -1630,10 +1630,9 @@ dependencies = [ "iroh-quinn-proto", "iroh-quinn-udp", "iroh-relay", + "n0-error", "n0-future", - "n0-snafu", "n0-watcher", - "nested_enum_utils", "netdev", "netwatch", "pin-project", @@ -1648,7 +1647,6 @@ dependencies = [ "rustls-webpki", "serde", "smallvec", - "snafu", "strum", "swarm-discovery", "time", @@ -1664,19 +1662,17 @@ dependencies = [ [[package]] name = "iroh-base" -version = "0.94.0" +version = "0.95.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db942f6f3d6fa9b475690c6e8e6684d60591dd886bf1bdfef4c60d89d502215c" +checksum = "25a8c5fb1cc65589f0d7ab44269a76f615a8c4458356952c9b0ef1c93ea45ff8" dependencies = [ "curve25519-dalek", "data-encoding", "derive_more 2.0.1", "ed25519-dalek", - "n0-snafu", - "nested_enum_utils", + "n0-error", "rand_core 0.9.3", "serde", - "snafu", "url", "zeroize", "zeroize_derive", @@ -1692,6 +1688,7 @@ dependencies = [ "atomic_refcell", "bao-tree", "bytes", + "cfg_aliases", "chrono", "clap", "concat_const", @@ -1708,6 +1705,7 @@ dependencies = [ "iroh-test", "iroh-tickets", "irpc", + "n0-error", "n0-future", "n0-snafu", "nested_enum_utils", @@ -1728,7 +1726,6 @@ dependencies = [ "test-strategy", "testresult", "tokio", - "tokio-util", "tracing", "tracing-subscriber", "tracing-test", @@ -1750,24 +1747,24 @@ dependencies = [ [[package]] name = "iroh-metrics" -version = "0.36.1" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "090161e84532a0cb78ab13e70abb882b769ec67cf5a2d2dcea39bd002e1f7172" +checksum = "79e3381da7c93c12d353230c74bba26131d1c8bf3a4d8af0fec041546454582e" dependencies = [ "iroh-metrics-derive", "itoa", + "n0-error", "postcard", "ryu", "serde", - "snafu", "tracing", ] [[package]] name = "iroh-metrics-derive" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a39de3779d200dadde3a27b9fbdb34389a2af1b85ea445afca47bf4d7672573" +checksum = "d4e12bd0763fd16062f5cc5e8db15dd52d26e75a8af4c7fb57ccee3589b344b8" dependencies = [ "heck", "proc-macro2", @@ -1832,9 +1829,9 @@ dependencies = [ [[package]] name = "iroh-relay" -version = "0.94.0" +version = "0.95.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "360e201ab1803201de9a125dd838f7a4d13e6ba3a79aeb46c7fbf023266c062e" +checksum = "43fbdf2aeffa7d6ede1a31f6570866c2199b1cee96a0b563994623795d1bac2c" dependencies = [ "blake3", "bytes", @@ -1852,9 +1849,8 @@ dependencies = [ "iroh-quinn", "iroh-quinn-proto", "lru 0.16.1", + "n0-error", "n0-future", - "n0-snafu", - "nested_enum_utils", "num_enum", "pin-project", "pkarr", @@ -1866,7 +1862,6 @@ dependencies = [ "serde", "serde_bytes", "sha1", - "snafu", "strum", "tokio", "tokio-rustls", @@ -1893,38 +1888,35 @@ dependencies = [ [[package]] name = "iroh-tickets" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7683c7819693eb8b3d61d1d45ffa92e2faeb07762eb0c3debb50ad795538d221" +checksum = "1a322053cacddeca222f0999ce3cf6aa45c64ae5ad8c8911eac9b66008ffbaa5" dependencies = [ "data-encoding", "derive_more 2.0.1", "iroh-base", - "n0-snafu", - "nested_enum_utils", + "n0-error", "postcard", "serde", - "snafu", ] [[package]] name = "irpc" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52cf44fdb253f2a3e22e5ecfa8efa466929f8b7cdd4fc0f958f655406e8cdab6" +checksum = "0bee97aaa18387c4f0aae61058195dc9f9dea3e41c0e272973fe3e9bf611563d" dependencies = [ - "anyhow", "futures-buffered", "futures-util", "iroh-quinn", "irpc-derive", + "n0-error", "n0-future", "postcard", "rcgen", "rustls", "serde", "smallvec", - "thiserror 2.0.12", "tokio", "tokio-util", "tracing", @@ -1932,9 +1924,9 @@ dependencies = [ [[package]] name = "irpc-derive" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "969df6effc474e714fb7e738eb9859aa22f40dc2280cadeab245817075c7f273" +checksum = "58148196d2230183c9679431ac99b57e172000326d664e8456fa2cd27af6505a" dependencies = [ "proc-macro2", "quote", @@ -2138,6 +2130,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "n0-error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a4839a11b62f1fdd75be912ee20634053c734c2240e867ded41c7f50822c549" +dependencies = [ + "derive_more 2.0.1", + "n0-error-macros", + "spez", +] + +[[package]] +name = "n0-error-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed2a7e5ca3cb5729d4a162d7bcab5b338bed299a2fee8457568d7e0a747ed89" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "n0-future" version = "0.3.0" @@ -2174,13 +2189,13 @@ dependencies = [ [[package]] name = "n0-watcher" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34c65e127e06e5a2781b28df6a33ea474a7bddc0ac0cfea888bd20c79a1b6516" +checksum = "38acf13c1ddafc60eb7316d52213467f8ccb70b6f02b65e7d97f7799b1f50be4" dependencies = [ "derive_more 2.0.1", + "n0-error", "n0-future", - "snafu", ] [[package]] @@ -2262,9 +2277,9 @@ dependencies = [ [[package]] name = "netwatch" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98d7ec7abdbfe67ee70af3f2002326491178419caea22254b9070e6ff0c83491" +checksum = "26f2acd376ef48b6c326abf3ba23c449e0cb8aa5c2511d189dd8a8a3bfac889b" dependencies = [ "atomic-waker", "bytes", @@ -2273,9 +2288,9 @@ dependencies = [ "iroh-quinn-udp", "js-sys", "libc", + "n0-error", "n0-future", "n0-watcher", - "nested_enum_utils", "netdev", "netlink-packet-core", "netlink-packet-route", @@ -2283,7 +2298,6 @@ dependencies = [ "netlink-sys", "pin-project-lite", "serde", - "snafu", "socket2 0.6.0", "time", "tokio", @@ -2548,9 +2562,9 @@ checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" [[package]] name = "portmapper" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d73aa9bd141e0ff6060fea89a5437883f3b9ceea1cda71c790b90e17d072a3b3" +checksum = "7b575f975dcf03e258b0c7ab3f81497d7124f508884c37da66a7314aa2a8d467" dependencies = [ "base64", "bytes", @@ -2561,13 +2575,12 @@ dependencies = [ "igd-next", "iroh-metrics", "libc", - "nested_enum_utils", + "n0-error", "netwatch", "num_enum", "rand 0.9.2", "serde", "smallvec", - "snafu", "socket2 0.6.0", "time", "tokio", @@ -2579,9 +2592,9 @@ dependencies = [ [[package]] name = "positioned-io" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8078ce4d22da5e8f57324d985cc9befe40c49ab0507a192d6be9e59584495c9" +checksum = "d4ec4b80060f033312b99b6874025d9503d2af87aef2dd4c516e253fbfcdada7" dependencies = [ "libc", "winapi", @@ -3436,6 +3449,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "spez" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c87e960f4dca2788eeb86bbdde8dd246be8948790b7618d656e68f9b720a86e8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "spin" version = "0.9.8" @@ -3814,12 +3838,10 @@ checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", - "futures-io", "futures-sink", "futures-util", "hashbrown", "pin-project-lite", - "slab", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index ddeb85949..77c293f68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,18 +13,17 @@ rust-version = "1.85" [dependencies] anyhow = "1.0.95" -bao-tree = { version = "0.15.1", features = ["experimental-mixed", "tokio_fsm", "validate", "serde"], default-features = false } +bao-tree = { version = "0.16", features = ["experimental-mixed", "tokio_fsm", "validate", "serde"], default-features = false } bytes = { version = "1", features = ["serde"] } derive_more = { version = "2.0.1", features = ["from", "try_from", "into", "debug", "display", "deref", "deref_mut"] } futures-lite = "2.6.0" -quinn = { package = "iroh-quinn", version = "0.14.0" } +quinn = { package = "iroh-quinn", version = "0.14.0", optional = true } n0-future = "0.3.0" n0-snafu = "0.2.2" range-collections = { version = "0.4.6", features = ["serde"] } smallvec = { version = "1", features = ["serde", "const_new"] } snafu = "0.8.5" -tokio = { version = "1.43.0", features = ["full"] } -tokio-util = { version = "0.7.13", features = ["full"] } +tokio = { version = "1.43.0", default-features = false, features = ["sync"] } tracing = "0.1.41" iroh-io = "0.6.1" rand = "0.9.2" @@ -36,15 +35,16 @@ chrono = "0.4.39" nested_enum_utils = "0.2.1" ref-cast = "1.0.24" arrayvec = "0.7.6" -iroh = "0.94" +iroh = { version = "0.95", default-features = false } self_cell = "1.1.0" genawaiter = { version = "0.99.1", features = ["futures03"] } -iroh-base = "0.94" -iroh-tickets = "0.1" -irpc = { version = "0.10.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false } -iroh-metrics = { version = "0.36" } +iroh-base = "0.95" +iroh-tickets = "0.2" +irpc = { version = "0.11.0", features = ["spans", "stream", "derive", "varint-util"], default-features = false } +iroh-metrics = { version = "0.37" } redb = { version = "2.6.3", optional = true } reflink-copy = { version = "0.1.24", optional = true } +n0-error = "0.1.0" [dev-dependencies] clap = { version = "4.5.31", features = ["derive"] } @@ -60,12 +60,24 @@ tracing-subscriber = { version = "0.3.20", features = ["fmt"] } tracing-test = "0.2.5" walkdir = "2.5.0" atomic_refcell = "0.1.13" -iroh = { version = "0.94", features = ["discovery-local-network"]} +iroh = { version = "0.95", features = ["discovery-local-network"]} async-compression = { version = "0.4.30", features = ["lz4", "tokio"] } concat_const = "0.2.0" +[build-dependencies] +cfg_aliases = "0.2.1" + [features] hide-proto-docs = [] metrics = [] -default = ["hide-proto-docs", "fs-store"] -fs-store = ["dep:redb", "dep:reflink-copy"] +default = ["hide-proto-docs", "fs-store", "rpc"] +fs-store = ["dep:redb", "dep:reflink-copy", "bao-tree/fs"] +rpc = ["dep:quinn", "irpc/rpc", "irpc/quinn_endpoint_setup"] + +[[example]] +name = "expiring-tags" +required-features = ["fs-store"] + +[[example]] +name = "random_store" +required-features = ["fs-store"] diff --git a/build.rs b/build.rs new file mode 100644 index 000000000..7aae56820 --- /dev/null +++ b/build.rs @@ -0,0 +1,9 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Convenience aliases + wasm_browser: { all(target_family = "wasm", target_os = "unknown") }, + } +} diff --git a/examples/compression.rs b/examples/compression.rs index eb83f91dd..686df5870 100644 --- a/examples/compression.rs +++ b/examples/compression.rs @@ -160,7 +160,7 @@ impl ProtocolHandler for CompressedBlobsProtocol { .events .client_connected(|| ClientConnected { connection_id, - endpoint_id: connection.remote_id().ok(), + endpoint_id: Some(connection.remote_id()), }) .await { diff --git a/examples/custom-protocol.rs b/examples/custom-protocol.rs index 6d782f194..76ec62d1c 100644 --- a/examples/custom-protocol.rs +++ b/examples/custom-protocol.rs @@ -177,7 +177,7 @@ impl ProtocolHandler for BlobSearch { async fn accept(&self, connection: Connection) -> std::result::Result<(), AcceptError> { let this = self.clone(); // We can get the remote's endpoint id from the connection. - let node_id = connection.remote_id()?; + let node_id = connection.remote_id(); println!("accepted connection from {node_id}"); // Our protocol is a simple request-response protocol, so we expect the diff --git a/examples/expiring-tags.rs b/examples/expiring-tags.rs index e19771e80..c8a4e1119 100644 --- a/examples/expiring-tags.rs +++ b/examples/expiring-tags.rs @@ -122,17 +122,17 @@ async fn print_store_info(store: &Store) -> anyhow::Result<()> { } async fn info_task(store: Store) -> anyhow::Result<()> { - tokio::time::sleep(Duration::from_secs(1)).await; + n0_future::time::sleep(Duration::from_secs(1)).await; loop { print_store_info(&store).await?; - tokio::time::sleep(Duration::from_secs(5)).await; + n0_future::time::sleep(Duration::from_secs(5)).await; } } async fn delete_expired_tags_task(store: Store, prefix: &str) -> anyhow::Result<()> { loop { delete_expired_tags(&store, prefix, false).await?; - tokio::time::sleep(Duration::from_secs(5)).await; + n0_future::time::sleep(Duration::from_secs(5)).await; } } diff --git a/examples/limit.rs b/examples/limit.rs index 4a9a379ed..58a1d7635 100644 --- a/examples/limit.rs +++ b/examples/limit.rs @@ -156,7 +156,7 @@ fn throttle(delay_ms: u64) -> EventSender { ); // we could compute the delay from the size of the data to have a fixed rate. // but the size is almost always 16 KiB (16 chunks). - tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + n0_future::time::sleep(std::time::Duration::from_millis(delay_ms)).await; msg.tx.send(Ok(())).await.ok(); }); } diff --git a/src/api.rs b/src/api.rs index 3abb13bdb..ec65a5c05 100644 --- a/src/api.rs +++ b/src/api.rs @@ -12,14 +12,13 @@ //! //! You can also [`connect`](Store::connect) to a remote store that is listening //! to rpc requests. -use std::{io, net::SocketAddr, ops::Deref}; +use std::{io, ops::Deref}; use bao_tree::io::EncodeError; use iroh::Endpoint; -use irpc::rpc::{listen, RemoteService}; use n0_snafu::SpanTrace; use nested_enum_utils::common_fields; -use proto::{Request, ShutdownRequest, SyncDbRequest}; +use proto::{ShutdownRequest, SyncDbRequest}; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use snafu::{Backtrace, IntoError, Snafu}; @@ -124,11 +123,12 @@ impl From for Error { impl From for ExportBaoError { fn from(e: irpc::Error) -> Self { match e { - irpc::Error::MpscRecv(e) => MpscRecvSnafu.into_error(e), - irpc::Error::OneshotRecv(e) => OneshotRecvSnafu.into_error(e), - irpc::Error::Send(e) => SendSnafu.into_error(e), - irpc::Error::Request(e) => RequestSnafu.into_error(e), - irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()), + irpc::Error::MpscRecv { source, .. } => MpscRecvSnafu.into_error(source), + irpc::Error::OneshotRecv { source, .. } => OneshotRecvSnafu.into_error(source), + irpc::Error::Send { source, .. } => SendSnafu.into_error(source), + irpc::Error::Request { source, .. } => RequestSnafu.into_error(source), + #[cfg(feature = "rpc")] + irpc::Error::Write { source, .. } => ExportBaoIoSnafu.into_error(source.into()), } } } @@ -220,6 +220,7 @@ impl From for Error { } } +#[cfg(feature = "rpc")] impl From for Error { fn from(e: irpc::rpc::WriteError) -> Self { Self::Io(e.into()) @@ -298,16 +299,21 @@ impl Store { } /// Connect to a remote store as a rpc client. - pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self { + #[cfg(feature = "rpc")] + pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self { let sender = irpc::Client::quinn(endpoint, addr); Store::from_sender(sender) } /// Listen on a quinn endpoint for incoming rpc connections. + #[cfg(feature = "rpc")] pub async fn listen(self, endpoint: quinn::Endpoint) { + use irpc::rpc::RemoteService; + + use self::proto::Request; let local = self.client.as_local().unwrap().clone(); let handler = Request::remote_handler(local); - listen::(endpoint, handler).await + irpc::rpc::listen::(endpoint, handler).await } pub async fn sync_db(&self) -> RequestResult<()> { diff --git a/src/api/downloader.rs b/src/api/downloader.rs index fffacc142..9f5bfbc2d 100644 --- a/src/api/downloader.rs +++ b/src/api/downloader.rs @@ -10,10 +10,9 @@ use anyhow::bail; use genawaiter::sync::Gen; use iroh::{Endpoint, EndpointId}; use irpc::{channel::mpsc, rpc_requests}; -use n0_future::{future, stream, BufferedStreamExt, Stream, StreamExt}; +use n0_future::{future, stream, task::JoinSet, BufferedStreamExt, Stream, StreamExt}; use rand::seq::SliceRandom; use serde::{de::Error, Deserialize, Serialize}; -use tokio::task::JoinSet; use tracing::instrument::Instrument; use super::Store; @@ -31,7 +30,7 @@ pub struct Downloader { client: irpc::Client, } -#[rpc_requests(message = SwarmMsg, alias = "Msg")] +#[rpc_requests(message = SwarmMsg, alias = "Msg", rpc_feature = "rpc")] #[derive(Debug, Serialize, Deserialize)] enum SwarmProtocol { #[rpc(tx = mpsc::Sender)] @@ -42,7 +41,7 @@ struct DownloaderActor { store: Store, pool: ConnectionPool, tasks: JoinSet<()>, - running: HashSet, + running: HashSet, } #[derive(Debug, Serialize, Deserialize)] @@ -342,7 +341,7 @@ impl Downloader { pub fn new(store: &Store, endpoint: &Endpoint) -> Self { let (tx, rx) = tokio::sync::mpsc::channel::(32); let actor = DownloaderActor::new(store.clone(), endpoint.clone()); - tokio::spawn(actor.run(rx)); + n0_future::task::spawn(actor.run(rx)); Self { client: tx.into() } } diff --git a/src/api/proto.rs b/src/api/proto.rs index b2a0eed94..80478e934 100644 --- a/src/api/proto.rs +++ b/src/api/proto.rs @@ -87,7 +87,7 @@ impl HashSpecific for CreateTagMsg { } } -#[rpc_requests(message = Command, alias = "Msg")] +#[rpc_requests(message = Command, alias = "Msg", rpc_feature = "rpc")] #[derive(Debug, Serialize, Deserialize)] pub enum Request { #[rpc(tx = mpsc::Sender>)] diff --git a/src/get.rs b/src/get.rs index 15f40ea1b..d9c59b034 100644 --- a/src/get.rs +++ b/src/get.rs @@ -18,12 +18,13 @@ //! [iroh]: https://docs.rs/iroh use std::{ fmt::{self, Debug}, - time::{Duration, Instant}, + time::Duration, }; use anyhow::Result; use bao_tree::{io::fsm::BaoContentItem, ChunkNum}; use fsm::RequestCounters; +use n0_future::time::Instant; use n0_snafu::SpanTrace; use nested_enum_utils::common_fields; use serde::{Deserialize, Serialize}; diff --git a/src/provider.rs b/src/provider.rs index 390254010..fa4150619 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -3,19 +3,13 @@ //! Note that while using this API directly is fine, the standard way //! to provide data is to just register a [`crate::BlobsProtocol`] protocol //! handler with an [`iroh::Endpoint`](iroh::protocol::Router). -use std::{ - fmt::Debug, - future::Future, - io, - time::{Duration, Instant}, -}; +use std::{fmt::Debug, future::Future, io, time::Duration}; use anyhow::Result; use bao_tree::ChunkRanges; -use iroh::endpoint::{self, VarInt}; +use iroh::endpoint::{self, ConnectionError, VarInt}; use iroh_io::{AsyncStreamReader, AsyncStreamWriter}; -use n0_future::StreamExt; -use quinn::ConnectionError; +use n0_future::{time::Instant, StreamExt}; use serde::{Deserialize, Serialize}; use snafu::Snafu; use tokio::select; @@ -298,7 +292,7 @@ pub async fn handle_connection( if let Err(cause) = progress .client_connected(|| ClientConnected { connection_id, - endpoint_id: connection.remote_id().ok(), + endpoint_id: Some(connection.remote_id()), }) .await { @@ -309,7 +303,7 @@ pub async fn handle_connection( while let Ok(pair) = StreamPair::accept(&connection, progress.clone()).await { let span = debug_span!("stream", stream_id = %pair.stream_id()); let store = store.clone(); - tokio::spawn(handle_stream(pair, store).instrument(span)); + n0_future::task::spawn(handle_stream(pair, store).instrument(span)); } progress .connection_closed(|| ConnectionClosed { connection_id }) diff --git a/src/provider/events.rs b/src/provider/events.rs index 932570e9c..7f27b2dd2 100644 --- a/src/provider/events.rs +++ b/src/provider/events.rs @@ -1,5 +1,6 @@ use std::{fmt::Debug, io, ops::Deref}; +use iroh::endpoint::VarInt; use irpc::{ channel::{mpsc, none::NoSender, oneshot}, rpc_requests, Channels, WithChannels, @@ -106,11 +107,11 @@ impl From for io::Error { } pub trait HasErrorCode { - fn code(&self) -> quinn::VarInt; + fn code(&self) -> VarInt; } impl HasErrorCode for ProgressError { - fn code(&self) -> quinn::VarInt { + fn code(&self) -> VarInt { match self { ProgressError::Limit => ERR_LIMIT, ProgressError::Permission => ERR_PERMISSION, @@ -531,7 +532,7 @@ impl EventSender { } } -#[rpc_requests(message = ProviderMessage)] +#[rpc_requests(message = ProviderMessage, rpc_feature = "rpc")] #[derive(Debug, Serialize, Deserialize)] pub enum ProviderProto { /// A new client connected to the provider. @@ -705,10 +706,15 @@ mod irpc_ext { .map_err(irpc::Error::from)?; Ok(req_tx) } + #[cfg(feature = "rpc")] irpc::Request::Remote(remote) => { let (s, _) = remote.write(msg).await?; Ok(s.into()) } + #[cfg(not(feature = "rpc"))] + irpc::Request::Remote(_) => { + unreachable!() + } } } } diff --git a/src/store/fs.rs b/src/store/fs.rs index 8bf43f3d3..53c697abc 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -1561,7 +1561,7 @@ pub mod tests { let ranges = ChunkRanges::all(); let (hash, bao) = create_n0_bao(&data, &ranges)?; let obs = store.observe(hash); - let task = tokio::spawn(async move { + let task = n0_future::task::spawn(async move { obs.await_completion().await?; api::Result::Ok(()) }); diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 3b09f8daf..0502cead6 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -740,7 +740,7 @@ impl BaoFileStorageSubscriber { tokio::select! { _ = tx.closed() => { // the sender is closed, we are done - Err(irpc::channel::SendError::ReceiverClosed.into()) + Err(n0_error::e!(irpc::channel::SendError::ReceiverClosed).into()) } e = self.receiver.changed() => Ok(e?), } diff --git a/src/store/fs/meta.rs b/src/store/fs/meta.rs index aac43cb4a..b03304ad1 100644 --- a/src/store/fs/meta.rs +++ b/src/store/fs/meta.rs @@ -766,7 +766,7 @@ impl Actor { self.cmds.push_back(cmd.into()).ok(); let tx = db.begin_read().context(TransactionSnafu)?; let tables = ReadOnlyTables::new(&tx).context(TableSnafu)?; - let timeout = tokio::time::sleep(self.options.max_read_duration); + let timeout = n0_future::time::sleep(self.options.max_read_duration); pin!(timeout); let mut n = 0; while let Some(cmd) = self.cmds.extract(Command::read_only, &mut timeout).await @@ -784,7 +784,7 @@ impl Actor { let ftx = self.ds.begin_write(); let tx = db.begin_write().context(TransactionSnafu)?; let mut tables = Tables::new(&tx, &ftx).context(TableSnafu)?; - let timeout = tokio::time::sleep(self.options.max_read_duration); + let timeout = n0_future::time::sleep(self.options.max_read_duration); pin!(timeout); let mut n = 0; while let Some(cmd) = self diff --git a/src/store/fs/util/entity_manager.rs b/src/store/fs/util/entity_manager.rs index b0b2898ea..ea5762594 100644 --- a/src/store/fs/util/entity_manager.rs +++ b/src/store/fs/util/entity_manager.rs @@ -723,7 +723,7 @@ impl EntityManager

{ options.entity_response_inbox_size, options.entity_futures_initial_capacity, ); - tokio::spawn(actor.run()); + n0_future::task::spawn(actor.run()); Self(send) } diff --git a/src/store/gc.rs b/src/store/gc.rs index ca8404c92..435c06fbf 100644 --- a/src/store/gc.rs +++ b/src/store/gc.rs @@ -221,7 +221,7 @@ pub async fn run_gc(store: Store, config: GcConfig) { let mut live = HashSet::new(); loop { live.clear(); - tokio::time::sleep(config.interval).await; + n0_future::time::sleep(config.interval).await; if let Some(ref cb) = config.add_protected { match (cb)(&mut live).await { ProtectOutcome::Continue => {} diff --git a/src/store/mem.rs b/src/store/mem.rs index 76bc0e6e4..918338efc 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -14,7 +14,6 @@ use std::{ num::NonZeroU64, ops::Deref, sync::Arc, - time::SystemTime, }; use bao_tree::{ @@ -29,13 +28,13 @@ use bao_tree::{ }; use bytes::Bytes; use irpc::channel::mpsc; -use n0_future::future::yield_now; -use range_collections::range_set::RangeSetRange; -use tokio::{ - io::AsyncReadExt, - sync::watch, +use n0_future::{ + future::yield_now, task::{JoinError, JoinSet}, + time::SystemTime, }; +use range_collections::range_set::RangeSetRange; +use tokio::sync::watch; use tracing::{error, info, instrument, trace, Instrument}; use super::util::{BaoTreeSender, PartialMemStorage}; @@ -121,7 +120,7 @@ impl MemStore { pub fn new_with_opts(opts: Options) -> Self { let (sender, receiver) = tokio::sync::mpsc::channel(32); - tokio::spawn( + n0_future::task::spawn( Actor { commands: receiver, tasks: JoinSet::new(), @@ -140,7 +139,7 @@ impl MemStore { let store = Self::from_sender(sender.into()); if let Some(gc_config) = opts.gc_config { - tokio::spawn(run_gc(store.deref().clone(), gc_config)); + n0_future::task::spawn(run_gc(store.deref().clone(), gc_config)); } store @@ -755,8 +754,18 @@ async fn import_byte_stream( import_bytes(res.into(), scope, format, tx).await } +#[cfg(wasm_browser)] +async fn import_path(cmd: ImportPathMsg) -> anyhow::Result { + let _: ImportPathRequest = cmd.inner; + Err(anyhow::anyhow!( + "import_path is not supported in the browser" + )) +} + #[instrument(skip_all, fields(path = %cmd.path.display()))] +#[cfg(not(wasm_browser))] async fn import_path(cmd: ImportPathMsg) -> anyhow::Result { + use tokio::io::AsyncReadExt; let ImportPathMsg { inner: ImportPathRequest { @@ -1069,7 +1078,7 @@ impl BaoFileStorageSubscriber { tokio::select! { _ = tx.closed() => { // the sender is closed, we are done - Err(irpc::channel::SendError::ReceiverClosed.into()) + Err(n0_error::e!(irpc::channel::SendError::ReceiverClosed).into()) } e = self.receiver.changed() => Ok(e?), } @@ -1098,7 +1107,7 @@ mod tests { let store2 = MemStore::new(); let mut or = store2.observe(hash).stream().await?; - tokio::spawn(async move { + n0_future::task::spawn(async move { while let Some(event) = or.next().await { println!("event: {event:?}"); } diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index cb46228cd..649acdcbc 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -23,10 +23,12 @@ use bao_tree::{ }; use bytes::Bytes; use irpc::channel::mpsc; -use n0_future::future::{self, yield_now}; +use n0_future::{ + future::{self, yield_now}, + task::{JoinError, JoinSet}, +}; use range_collections::range_set::RangeSetRange; use ref_cast::RefCast; -use tokio::task::{JoinError, JoinSet}; use super::util::BaoTreeSender; use crate::{ @@ -369,7 +371,7 @@ impl ReadonlyMemStore { } let (sender, receiver) = tokio::sync::mpsc::channel(1); let actor = Actor::new(receiver, entries); - tokio::spawn(actor.run()); + n0_future::task::spawn(actor.run()); let local = irpc::LocalSender::from(sender); Self { client: local.into(), diff --git a/src/store/util.rs b/src/store/util.rs index 03be152bb..03630a6fc 100644 --- a/src/store/util.rs +++ b/src/store/util.rs @@ -1,8 +1,9 @@ -use std::{borrow::Borrow, fmt, time::SystemTime}; +use std::{borrow::Borrow, fmt}; use bao_tree::io::mixed::EncodedItem; use bytes::Bytes; use derive_more::{From, Into}; +use n0_future::time::SystemTime; mod sparse_mem_file; use irpc::channel::mpsc; @@ -68,6 +69,13 @@ impl fmt::Display for Tag { impl Tag { /// Create a new tag that does not exist yet. pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self { + // On wasm, SystemTime is web_time::SystemTime, but we need a std system time + // to convert to chrono. + // TODO: Upstream to n0-future or expose SystemTimeExt on wasm + #[cfg(wasm_browser)] + let time = std::time::SystemTime::UNIX_EPOCH + + time.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let now = chrono::DateTime::::from(time); let mut i = 0; loop { diff --git a/src/tests.rs b/src/tests.rs index 76af8f0a8..5460f428b 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -266,7 +266,7 @@ async fn two_nodes_observe( let mut stream = store2 .remote() .observe(conn.clone(), ObserveRequest::new(hash)); - let remote_observe_task = tokio::spawn(async move { + let remote_observe_task = n0_future::task::spawn(async move { let mut current = Bitfield::empty(); while let Some(item) = stream.next().await { current = current.combine(item?); @@ -346,7 +346,7 @@ fn event_handler( let (count_tx, count_rx) = tokio::sync::watch::channel(0usize); let (events_tx, mut events_rx) = EventSender::channel(16, EventMask::ALL_READONLY); let allowed_nodes = allowed_nodes.into_iter().collect::>(); - let task = AbortOnDropHandle::new(tokio::task::spawn(async move { + let task = AbortOnDropHandle::new(n0_future::task::spawn(async move { while let Some(event) = events_rx.recv().await { match event { ProviderMessage::ClientConnected(msg) => { @@ -360,7 +360,7 @@ fn event_handler( ProviderMessage::PushRequestReceived(mut msg) => { msg.tx.send(Ok(())).await.ok(); let count_tx = count_tx.clone(); - tokio::task::spawn(async move { + n0_future::task::spawn(async move { while let Ok(Some(update)) = msg.rx.recv().await { if let RequestUpdate::Completed(_) = update { count_tx.send_modify(|x| *x += 1); diff --git a/src/util.rs b/src/util.rs index c0acfcaad..7606d759a 100644 --- a/src/util.rs +++ b/src/util.rs @@ -446,7 +446,7 @@ pub(crate) mod sink { self.0 .send(value) .await - .map_err(|_| irpc::channel::SendError::ReceiverClosed) + .map_err(|_| n0_error::e!(irpc::channel::SendError::ReceiverClosed)) } } diff --git a/src/util/connection_pool.rs b/src/util/connection_pool.rs index e3c2d3a1a..fd66b4531 100644 --- a/src/util/connection_pool.rs +++ b/src/util/connection_pool.rs @@ -32,7 +32,6 @@ use tokio::sync::{ mpsc::{self, error::SendError as TokioSendError}, oneshot, Notify, }; -use tokio_util::time::FutureExt as TimeFutureExt; use tracing::{debug, error, info, trace}; pub type OnConnected = @@ -194,8 +193,7 @@ impl Context { }; // Connect to the node - let state = conn_fut - .timeout(context.options.connect_timeout) + let state = n0_future::time::timeout(context.options.connect_timeout, conn_fut) .await .map_err(|_| PoolConnectError::Timeout) .and_then(|r| r); @@ -265,7 +263,7 @@ impl Context { break; } // set the idle timer - idle_timer.as_mut().set_future(tokio::time::sleep(context.options.idle_timeout)); + idle_timer.as_mut().set_future(n0_future::time::sleep(context.options.idle_timeout)); } // Idle timeout - request shutdown @@ -422,7 +420,7 @@ impl ConnectionPool { let (actor, tx) = Actor::new(endpoint, alpn, options); // Spawn the main actor - tokio::spawn(actor.run()); + n0_future::task::spawn(actor.run()); Self { tx } } @@ -563,7 +561,7 @@ mod tests { impl ProtocolHandler for Echo { async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { let conn_id = connection.stable_id(); - let id = connection.remote_id().map_err(AcceptError::from_err)?; + let id = connection.remote_id(); trace!(%id, %conn_id, "Accepting echo connection"); loop { match connection.accept_bi().await { @@ -584,7 +582,7 @@ mod tests { async fn echo_client(conn: &Connection, text: &[u8]) -> n0_snafu::Result> { let conn_id = conn.stable_id(); - let id = conn.remote_id().e()?; + let id = conn.remote_id(); trace!(%id, %conn_id, "Sending echo request"); let (mut send, mut recv) = conn.open_bi().await.e()?; send.write_all(text).await.e()?; @@ -714,7 +712,7 @@ mod tests { assert_eq!(cid1, cid2); connection_ids.insert(id, cid1); } - tokio::time::sleep(Duration::from_millis(1000)).await; + n0_future::time::sleep(Duration::from_millis(1000)).await; for id in &ids { let cid1 = *connection_ids.get(id).expect("Connection ID not found"); let (cid2, res) = client.echo(*id, msg.clone()).await??; @@ -799,9 +797,7 @@ mod tests { .bind() .await?; let on_connected = |ep: Endpoint, conn: Connection| async move { - let Ok(id) = conn.remote_id() else { - return Err(io::Error::other("unable to get endpoint id")); - }; + let id = conn.remote_id(); let Some(watcher) = ep.conn_type(id) else { return Err(io::Error::other("unable to get conn_type watcher")); }; @@ -846,7 +842,7 @@ mod tests { let conn = pool.get_or_connect(ids[0]).await?; let cid1 = conn.stable_id(); conn.close(0u32.into(), b"test"); - tokio::time::sleep(Duration::from_millis(500)).await; + n0_future::time::sleep(Duration::from_millis(500)).await; let conn = pool.get_or_connect(ids[0]).await?; let cid2 = conn.stable_id(); assert_ne!(cid1, cid2); diff --git a/tests/blobs.rs b/tests/blobs.rs index 16f626cc9..e59930a29 100644 --- a/tests/blobs.rs +++ b/tests/blobs.rs @@ -109,7 +109,7 @@ async fn blobs_smoke_fs_rpc() -> TestResult { let client = irpc::util::make_client_endpoint(unspecified, &[cert.as_ref()])?; let td = tempfile::tempdir()?; let store = FsStore::load(td.path().join("a")).await?; - tokio::spawn(store.deref().clone().listen(server.clone())); + n0_future::task::spawn(store.deref().clone().listen(server.clone())); let api = Store::connect(client, server.local_addr()?); blobs_smoke(td.path(), api.blobs()).await?; api.shutdown().await?; diff --git a/tests/tags.rs b/tests/tags.rs index 5fe929488..3df517756 100644 --- a/tests/tags.rs +++ b/tests/tags.rs @@ -154,7 +154,7 @@ async fn tags_smoke_fs_rpc() -> TestResult<()> { let client = irpc::util::make_client_endpoint(unspecified, &[cert.as_ref()])?; let td = tempfile::tempdir()?; let store = FsStore::load(td.path().join("a")).await?; - tokio::spawn(store.deref().clone().listen(server.clone())); + n0_future::task::spawn(store.deref().clone().listen(server.clone())); let api = Store::connect(client, server.local_addr()?); tags_smoke(api.tags()).await?; api.shutdown().await?;