Skip to content

Commit 857f0dc

Browse files
authored
feat!: Add Docs<S> which wraps the Engine<S> (#18)
* Add Docs<S> which wraps the Engine<S> The ProtocolHandler is now implemented only for Blobs<S>. Docs<S> has a builder that takes a Blobs<S> and Gossip<S>, but there is also a way to create a Docs directly from an Engine. * feature gate the Docs<S> and its builder * use the gossip builder API * add net feature * remove top level feature flag * Make Engine !Clone and remove a few nested arcs. * fully qualify all the things * more feature flag madness
1 parent b6c7616 commit 857f0dc

File tree

7 files changed

+148
-75
lines changed

7 files changed

+148
-75
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ futures-util = { version = "0.3.25" }
3737
hex = "0.4"
3838
iroh-base = { version = "0.29" }
3939
iroh-blobs = { version = "0.29.0", optional = true, features = ["downloader"] }
40-
iroh-gossip = { version = "0.29.0", optional = true }
40+
iroh-gossip = { version = "0.29.0", optional = true, features = ["net"] }
4141
iroh-metrics = { version = "0.29.0", default-features = false }
4242
iroh = { version = "0.29", optional = true }
4343
num_enum = "0.7"

src/engine.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,21 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256;
4242

4343
/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
4444
/// peers and a gossip swarm for each syncing document.
45-
#[derive(derive_more::Debug, Clone)]
45+
#[derive(derive_more::Debug)]
4646
pub struct Engine<D> {
4747
/// [`Endpoint`] used by the engine.
4848
pub endpoint: Endpoint,
4949
/// Handle to the actor thread.
5050
pub sync: SyncHandle,
5151
/// The persistent default author for this engine.
52-
pub default_author: Arc<DefaultAuthor>,
52+
pub default_author: DefaultAuthor,
5353
to_live_actor: mpsc::Sender<ToLiveActor>,
5454
#[allow(dead_code)]
55-
actor_handle: Arc<AbortOnDropHandle<()>>,
55+
actor_handle: AbortOnDropHandle<()>,
5656
#[debug("ContentStatusCallback")]
5757
content_status_cb: ContentStatusCallback,
5858
local_pool_handle: LocalPoolHandle,
5959
blob_store: D,
60-
#[cfg(feature = "rpc")]
61-
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
6260
}
6361

6462
impl<D: iroh_blobs::store::Store> Engine<D> {
@@ -116,24 +114,22 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
116114
endpoint,
117115
sync,
118116
to_live_actor: live_actor_tx,
119-
actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
117+
actor_handle: AbortOnDropHandle::new(actor_handle),
120118
content_status_cb,
121-
default_author: Arc::new(default_author),
119+
default_author,
122120
local_pool_handle,
123121
blob_store: bao_store,
124-
#[cfg(feature = "rpc")]
125-
rpc_handler: Default::default(),
126122
})
127123
}
128124

129125
/// Return a callback that can be added to blobs to protect the content of
130126
/// all docs from garbage collection.
131127
pub fn protect_cb(&self) -> ProtectCb {
132-
let this = self.clone();
128+
let sync = self.sync.clone();
133129
Box::new(move |live| {
134-
let this = this.clone();
130+
let sync = sync.clone();
135131
Box::pin(async move {
136-
let doc_hashes = match this.sync.content_hashes().await {
132+
let doc_hashes = match sync.content_hashes().await {
137133
Ok(hashes) => hashes,
138134
Err(err) => {
139135
tracing::warn!("Error getting doc hashes: {}", err);
@@ -202,7 +198,7 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
202198

203199
// Create a future that sends channel senders to the respective actors.
204200
// We clone `self` so that the future does not capture any lifetimes.
205-
let this = self.clone();
201+
let this = self;
206202

207203
// Subscribe to insert events from the replica.
208204
let a = {

src/protocol.rs

Lines changed: 108 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,127 @@
11
//! [`ProtocolHandler`] implementation for the docs [`Engine`].
22
3+
use std::{path::PathBuf, sync::Arc};
4+
35
use anyhow::Result;
46
use futures_lite::future::Boxed as BoxedFuture;
57
use iroh::{endpoint::Connecting, protocol::ProtocolHandler};
8+
use iroh_blobs::net_protocol::{Blobs, ProtectCb};
9+
use iroh_gossip::net::Gossip;
610

7-
use crate::engine::Engine;
11+
use crate::{
12+
engine::{DefaultAuthorStorage, Engine},
13+
store::Store,
14+
};
815

9-
impl<D: iroh_blobs::store::Store> ProtocolHandler for Engine<D> {
16+
impl<S: iroh_blobs::store::Store> ProtocolHandler for Docs<S> {
1017
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
11-
let this = self.clone();
18+
let this = self.engine.clone();
1219
Box::pin(async move { this.handle_connection(conn).await })
1320
}
1421

1522
fn shutdown(&self) -> BoxedFuture<()> {
16-
let this = self.clone();
23+
let this = self.engine.clone();
1724
Box::pin(async move {
1825
if let Err(err) = this.shutdown().await {
1926
tracing::warn!("shutdown error: {:?}", err);
2027
}
2128
})
2229
}
2330
}
31+
32+
/// Docs protocol.
33+
#[derive(Debug, Clone)]
34+
pub struct Docs<S> {
35+
engine: Arc<Engine<S>>,
36+
#[cfg(feature = "rpc")]
37+
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
38+
}
39+
40+
impl Docs<()> {
41+
/// Create a new [`Builder`] for the docs protocol, using in memory replica and author storage.
42+
pub fn memory() -> Builder {
43+
Builder::default()
44+
}
45+
46+
/// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage
47+
/// in the given directory.
48+
pub fn persistent(path: PathBuf) -> Builder {
49+
Builder { path: Some(path) }
50+
}
51+
}
52+
53+
impl<S: iroh_blobs::store::Store> Docs<S> {
54+
/// Get an in memory client to interact with the docs engine.
55+
#[cfg(feature = "rpc")]
56+
pub fn client(&self) -> &crate::rpc::client::docs::MemClient {
57+
&self
58+
.rpc_handler
59+
.get_or_init(|| crate::rpc::RpcHandler::new(self.engine.clone()))
60+
.client
61+
}
62+
63+
/// Create a new docs protocol with the given engine.
64+
///
65+
/// Note that usually you would use the [`Builder`] to create a new docs protocol.
66+
pub fn new(engine: Engine<S>) -> Self {
67+
Self {
68+
engine: Arc::new(engine),
69+
#[cfg(feature = "rpc")]
70+
rpc_handler: Default::default(),
71+
}
72+
}
73+
74+
/// Handle a docs request from the RPC server.
75+
#[cfg(feature = "rpc")]
76+
pub async fn handle_rpc_request<
77+
C: quic_rpc::server::ChannelTypes<crate::rpc::proto::RpcService>,
78+
>(
79+
self,
80+
msg: crate::rpc::proto::Request,
81+
chan: quic_rpc::server::RpcChannel<crate::rpc::proto::RpcService, C>,
82+
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
83+
crate::rpc::Handler(self.engine.clone())
84+
.handle_rpc_request(msg, chan)
85+
.await
86+
}
87+
88+
/// Get the protect callback for the docs engine.
89+
pub fn protect_cb(&self) -> ProtectCb {
90+
self.engine.protect_cb()
91+
}
92+
}
93+
94+
/// Builder for the docs protocol.
95+
#[derive(Debug, Default)]
96+
pub struct Builder {
97+
path: Option<PathBuf>,
98+
}
99+
100+
impl Builder {
101+
/// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol.
102+
pub async fn spawn<S: iroh_blobs::store::Store>(
103+
self,
104+
blobs: &Blobs<S>,
105+
gossip: &Gossip,
106+
) -> anyhow::Result<Docs<S>> {
107+
let replica_store = match self.path {
108+
Some(ref path) => Store::persistent(path.join("docs.redb"))?,
109+
None => Store::memory(),
110+
};
111+
let author_store = match self.path {
112+
Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")),
113+
None => DefaultAuthorStorage::Mem,
114+
};
115+
let engine = Engine::spawn(
116+
blobs.endpoint().clone(),
117+
gossip.clone(),
118+
replica_store,
119+
blobs.store().clone(),
120+
blobs.downloader().clone(),
121+
author_store,
122+
blobs.rt().clone(),
123+
)
124+
.await?;
125+
Ok(Docs::new(engine))
126+
}
127+
}

src/rpc.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Quic RPC implementation for docs.
22
3+
use std::{ops::Deref, sync::Arc};
4+
35
use proto::{Request, RpcService};
46
use quic_rpc::{
57
server::{ChannelTypes, RpcChannel},
@@ -17,15 +19,18 @@ mod docs_handle_request;
1719
type RpcError = serde_error::Error;
1820
type RpcResult<T> = std::result::Result<T, RpcError>;
1921

20-
impl<D: iroh_blobs::store::Store> Engine<D> {
21-
/// Get an in memory client to interact with the docs engine.
22-
pub fn client(&self) -> &client::docs::MemClient {
23-
&self
24-
.rpc_handler
25-
.get_or_init(|| RpcHandler::new(self))
26-
.client
22+
#[derive(Debug, Clone)]
23+
pub(crate) struct Handler<S>(pub(crate) Arc<Engine<S>>);
24+
25+
impl<S> Deref for Handler<S> {
26+
type Target = Engine<S>;
27+
28+
fn deref(&self) -> &Self::Target {
29+
&self.0
2730
}
31+
}
2832

33+
impl<D: iroh_blobs::store::Store> Handler<D> {
2934
/// Handle a docs request from the RPC server.
3035
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
3136
self,
@@ -80,14 +85,14 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
8085
#[derive(Debug)]
8186
pub(crate) struct RpcHandler {
8287
/// Client to hand out
83-
client: client::docs::MemClient,
88+
pub(crate) client: client::docs::MemClient,
8489
/// Handler task
8590
_handler: AbortOnDropHandle<()>,
8691
}
8792

8893
impl RpcHandler {
89-
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
90-
let engine = engine.clone();
94+
pub fn new<D: iroh_blobs::store::Store>(engine: Arc<Engine<D>>) -> Self {
95+
let engine = Handler(engine);
9196
let (listener, connector) = quic_rpc::transport::flume::channel(1);
9297
let listener = RpcServer::new(listener);
9398
let client = client::docs::MemClient::new(RpcClient::new(connector));

src/rpc/docs_handle_request.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ use super::{
2727
SetRequest, SetResponse, ShareRequest, ShareResponse, StartSyncRequest, StartSyncResponse,
2828
StatusRequest, StatusResponse,
2929
},
30-
RpcError, RpcResult,
30+
Handler, RpcError, RpcResult,
3131
};
32-
use crate::{engine::Engine, Author, DocTicket, NamespaceSecret};
32+
use crate::{Author, DocTicket, NamespaceSecret};
3333

3434
/// Capacity for the flume channels to forward sync store iterators to async RPC streams.
3535
const ITER_CHANNEL_CAP: usize = 64;
3636

37-
impl<D: iroh_blobs::store::Store> Engine<D> {
37+
impl<D: iroh_blobs::store::Store> Handler<D> {
3838
pub(super) async fn author_create(
3939
self,
4040
_req: AuthorCreateRequest,

tests/util.rs

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ use std::{
55
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
66
ops::Deref,
77
path::{Path, PathBuf},
8-
sync::Arc,
98
};
109

1110
use iroh::{discovery::Discovery, dns::DnsResolver, key::SecretKey, NodeId, RelayMode};
1211
use iroh_blobs::{
12+
net_protocol::Blobs,
1313
store::{GcConfig, Store as BlobStore},
1414
util::local_pool::LocalPool,
1515
};
16+
use iroh_docs::protocol::Docs;
17+
use iroh_gossip::net::Gossip;
1618
use nested_enum_utils::enum_conversions;
1719
use quic_rpc::transport::{Connector, Listener};
1820
use serde::{Deserialize, Serialize};
@@ -134,58 +136,24 @@ impl<S: BlobStore> Builder<S> {
134136
builder = builder.dns_resolver(dns_resolver);
135137
}
136138
let endpoint = builder.bind().await?;
137-
let addr = endpoint.node_addr().await?;
138139
let local_pool = LocalPool::single();
139140
let mut router = iroh::protocol::Router::builder(endpoint.clone());
140-
141-
// Setup blobs
142-
let downloader = iroh_blobs::downloader::Downloader::new(
143-
store.clone(),
144-
endpoint.clone(),
145-
local_pool.handle().clone(),
146-
);
147-
let blobs = iroh_blobs::net_protocol::Blobs::new(
148-
store.clone(),
149-
local_pool.handle().clone(),
150-
Default::default(),
151-
downloader.clone(),
152-
endpoint.clone(),
153-
);
154-
let gossip = iroh_gossip::net::Gossip::from_endpoint(
155-
endpoint.clone(),
156-
Default::default(),
157-
&addr.info,
158-
);
159-
let replica_store = match self.path {
160-
Some(ref path) => iroh_docs::store::Store::persistent(path.join("docs.redb"))?,
161-
None => iroh_docs::store::Store::memory(),
162-
};
163-
let author_store = match self.path {
164-
Some(ref path) => {
165-
iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author"))
166-
}
167-
None => iroh_docs::engine::DefaultAuthorStorage::Mem,
141+
let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint);
142+
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
143+
let builder = match self.path {
144+
Some(ref path) => Docs::persistent(path.to_path_buf()),
145+
None => Docs::memory(),
168146
};
169-
let docs = match iroh_docs::engine::Engine::spawn(
170-
endpoint,
171-
gossip.clone(),
172-
replica_store,
173-
store.clone(),
174-
downloader,
175-
author_store,
176-
local_pool.handle().clone(),
177-
)
178-
.await
179-
{
147+
let docs = match builder.spawn(&blobs, &gossip).await {
180148
Ok(docs) => docs,
181149
Err(err) => {
182150
store.shutdown().await;
183151
return Err(err);
184152
}
185153
};
186154
router = router.accept(iroh_blobs::ALPN, blobs.clone());
187-
router = router.accept(iroh_docs::ALPN, Arc::new(docs.clone()));
188-
router = router.accept(iroh_gossip::ALPN, Arc::new(gossip.clone()));
155+
router = router.accept(iroh_docs::ALPN, docs.clone());
156+
router = router.accept(iroh_gossip::ALPN, gossip.clone());
189157

190158
// Build the router
191159
let router = router.spawn().await?;

0 commit comments

Comments
 (0)