Skip to content

Commit

Permalink
refactor: remove shuttle behaviour logic and send messages directly.
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 4, 2024
1 parent ae3158c commit 85c33d2
Show file tree
Hide file tree
Showing 16 changed files with 1,647 additions and 3,205 deletions.
1 change: 1 addition & 0 deletions extensions/warp-ipfs/shuttle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
opts.enable_relay_server,
false,
&opts.listen_addr,
&[],
true,
)
.await?;
Expand Down
2 changes: 0 additions & 2 deletions extensions/warp-ipfs/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,5 @@ use rust_ipfs::libp2p::{self, swarm::behaviour::toggle::Toggle};
#[derive(NetworkBehaviour)]
#[behaviour(prelude = "libp2p::swarm::derive_prelude", to_swarm = "void::Void")]
pub struct Behaviour {
pub shuttle_identity: Toggle<crate::shuttle::identity::client::Behaviour>,
pub shuttle_message: Toggle<crate::shuttle::message::client::Behaviour>,
pub phonebook: phonebook::Behaviour,
}
116 changes: 64 additions & 52 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,28 +292,49 @@ impl WarpIpfs {
}

let (pb_tx, pb_rx) = channel(50);
let (id_sh_tx, id_sh_rx) = futures::channel::mpsc::channel(1);
let (msg_sh_tx, msg_sh_rx) = futures::channel::mpsc::channel(1);

let (enable, nodes) = match &self.inner.config.store_setting().discovery {
config::Discovery::Shuttle { addresses } => (true, addresses.clone()),
_ => Default::default(),
};

let behaviour = behaviour::Behaviour {
shuttle_identity: enable
.then(|| {
shuttle::identity::client::Behaviour::new(&keypair, None, id_sh_rx, &nodes)
})
.into(),
shuttle_message: enable
.then(|| {
shuttle::message::client::Behaviour::new(&keypair, None, msg_sh_rx, &nodes)
})
.into(),
phonebook: behaviour::phonebook::Behaviour::new(self.multipass_tx.clone(), pb_rx),
};

let mut request_response_configs = vec![
RequestResponseConfig {
protocol: protocols::EXCHANGE_PROTOCOL.as_ref().into(),
max_request_size: 8 * 1024,
max_response_size: 16 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::IDENTITY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::DISCOVERY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
];

if let config::Discovery::Shuttle { .. } = &self.inner.config.store_setting().discovery {
request_response_configs.extend([
RequestResponseConfig {
protocol: protocols::SHUTTLE_IDENTITY.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::SHUTTLE_MESSAGE.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
]);
}

tracing::info!("Starting ipfs");
let mut uninitialized = UninitializedIpfs::new()
.with_identify({
Expand All @@ -333,39 +354,7 @@ impl WarpIpfs {
..Default::default()
})
.with_relay(true)
.with_request_response(vec![
RequestResponseConfig {
protocol: protocols::EXCHANGE_PROTOCOL.as_ref().into(),
max_request_size: 8 * 1024,
max_response_size: 16 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::IDENTITY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::DISCOVERY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
// TODO: Uncomment or remove during shuttle protocol refactor
// RequestResponseConfig {
// protocol: protocols::SHUTTLE_IDENTITY.as_ref().into(),
// max_request_size: 256 * 1024,
// max_response_size: 512 * 1024,
// ..Default::default()
// },
// RequestResponseConfig {
// protocol: protocols::SHUTTLE_MESSAGE.as_ref().into(),
// max_request_size: 256 * 1024,
// max_response_size: 512 * 1024,
// ..Default::default()
// },
])
.with_request_response(request_response_configs)
.set_listening_addrs(self.inner.config.listen_on().to_vec())
.with_custom_behaviour(behaviour)
.set_keypair(&keypair)
Expand Down Expand Up @@ -535,6 +524,31 @@ impl WarpIpfs {
}
}

if let config::Discovery::Shuttle { addresses } =
self.inner.config.store_setting().discovery.clone()
{
let mut nodes = IndexSet::new();
for mut addr in addresses {
let Some(peer_id) = addr.extract_peer_id() else {
tracing::warn!("{addr} does not contain a peer id. Skipping");
continue;
};

if let Err(_e) = ipfs.add_peer((peer_id, addr)).await {
println!("{_e}");
continue;
}

nodes.insert(peer_id);
}

for node in nodes {
if let Err(_e) = ipfs.connect(node).await {
println!("{_e}");
}
}
}

if matches!(
self.inner.config.store_setting().discovery,
config::Discovery::Namespace {
Expand Down Expand Up @@ -630,7 +644,6 @@ impl WarpIpfs {
self.multipass_tx.clone(),
&phonebook,
&discovery,
id_sh_tx,
&span,
)
.await?;
Expand All @@ -654,7 +667,6 @@ impl WarpIpfs {
&filestore,
self.raygun_tx.clone(),
&identity_store,
msg_sh_tx,
)
.await;

Expand Down
3 changes: 0 additions & 3 deletions extensions/warp-ipfs/src/shuttle/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use warp::crypto::DID;

use crate::store::document::identity::IdentityDocument;

pub mod client;
pub mod protocol;
#[cfg(not(target_arch = "wasm32"))]
pub mod server;

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct IdentityDag {
Expand Down
Loading

0 comments on commit 85c33d2

Please sign in to comment.