Skip to content

Commit f652b91

Browse files
committed
chore: add additional discovery logic
1 parent db3186e commit f652b91

File tree

1 file changed

+88
-5
lines changed

1 file changed

+88
-5
lines changed

extensions/warp-ipfs/src/store/discovery.rs

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Discovery {
6060
)
6161
.await;
6262

63-
task.discovery(None);
63+
task.publish_discovery();
6464

6565
let _handle = executor.spawn_abortable(task);
6666
Self {
@@ -508,6 +508,10 @@ struct DiscoveryTask {
508508

509509
broadcast_tx: broadcast::Sender<DID>,
510510

511+
discovery_publish_confirmed: bool,
512+
513+
discovery_publish_fut: Option<BoxFuture<'static, DiscoveryPublish>>,
514+
511515
waker: Option<Waker>,
512516
}
513517

@@ -537,6 +541,8 @@ impl DiscoveryTask {
537541
command_rx,
538542
connection_event,
539543
discovery_request_st,
544+
discovery_publish_confirmed: false,
545+
discovery_publish_fut: None,
540546
discovery_fut: None,
541547
waker: None,
542548
}
@@ -556,6 +562,12 @@ enum DiscoveryResponse {
556562
InvalidRequest,
557563
}
558564

565+
enum DiscoveryPublish {
566+
Dht,
567+
Rz,
568+
None,
569+
}
570+
559571
impl DiscoveryTask {
560572
#[allow(dead_code)]
561573
pub fn discovery(&mut self, ns: Option<String>) {
@@ -591,10 +603,68 @@ impl DiscoveryTask {
591603
}
592604
}
593605

606+
pub fn publish_discovery(&mut self) {
607+
let ipfs = self.ipfs.clone();
608+
let fut = match self.config {
609+
DiscoveryConfig::Shuttle { .. } => {
610+
futures::future::ready(DiscoveryPublish::None).boxed()
611+
}
612+
DiscoveryConfig::Namespace {
613+
ref namespace,
614+
ref discovery_type,
615+
} => {
616+
let namespace = namespace.clone().unwrap_or("satellite-warp".to_string());
617+
match discovery_type {
618+
DiscoveryType::DHT => async move {
619+
if let Err(e) = ipfs.dht_provide(namespace.as_bytes().to_vec()).await {
620+
tracing::error!(error = %e, "cannot provide {namespace}");
621+
return DiscoveryPublish::None;
622+
}
623+
DiscoveryPublish::Dht
624+
}
625+
.boxed(),
626+
DiscoveryType::RzPoint { addresses } => {
627+
let peers = addresses
628+
.iter()
629+
.filter_map(|addr| addr.clone().extract_peer_id())
630+
.collect::<IndexSet<_>>();
631+
632+
if peers.is_empty() {
633+
return;
634+
}
635+
636+
// We will use the first instance instead of the whole set for now
637+
let peer_id = peers.get_index(0).copied().expect("should not fail");
638+
639+
async move {
640+
if let Err(e) = ipfs
641+
.rendezvous_register_namespace(&namespace, None, peer_id)
642+
.await
643+
{
644+
tracing::error!(error = %e, "cannot provide {namespace}");
645+
return DiscoveryPublish::None;
646+
}
647+
DiscoveryPublish::Rz
648+
}
649+
.boxed()
650+
}
651+
}
652+
}
653+
DiscoveryConfig::None => futures::future::ready(DiscoveryPublish::None).boxed(),
654+
};
655+
656+
self.discovery_publish_fut = Some(fut);
657+
}
658+
594659
pub fn dht_discovery(&mut self, namespace: String) {
595660
if self.discovery_fut.is_some() {
596661
return;
597662
}
663+
664+
if !self.discovery_publish_confirmed {
665+
return;
666+
}
667+
598668
let ipfs = self.ipfs.clone();
599669
let fut = async move {
600670
let bytes = namespace.as_bytes();
@@ -613,11 +683,12 @@ impl DiscoveryTask {
613683
return;
614684
}
615685

616-
// TODO: show that we are registered so we dont repeat multiple registration to the namespace when discovering peers
686+
if !self.discovery_publish_confirmed {
687+
return;
688+
}
689+
617690
let ipfs = self.ipfs.clone();
618691
let fut = async move {
619-
ipfs.rendezvous_register_namespace(&namespace, None, rz_peer_id)
620-
.await?;
621692
let peers = ipfs
622693
.rendezvous_namespace_discovery(&namespace, None, rz_peer_id)
623694
.await?;
@@ -697,6 +768,15 @@ impl Future for DiscoveryTask {
697768
}
698769
}
699770

771+
if let Some(fut) = self.discovery_publish_fut.as_mut() {
772+
if let Poll::Ready(discovery_publish_type) = fut.poll_unpin(cx) {
773+
self.discovery_publish_fut.take();
774+
self.discovery_publish_confirmed =
775+
!matches!(discovery_publish_type, DiscoveryPublish::None);
776+
self.discovery(None);
777+
}
778+
}
779+
700780
while let Poll::Ready(Some((peer_id, request, response))) =
701781
self.discovery_request_st.poll_next_unpin(cx)
702782
{
@@ -723,7 +803,10 @@ impl Future for DiscoveryTask {
723803
pl.to_bytes().expect("valid payload")
724804
}
725805
};
726-
_ = response.send(bytes);
806+
807+
if response.send(bytes).is_err() {
808+
tracing::warn!(%peer_id, "unable to respond to peer due to request being dropped.");
809+
}
727810
}
728811

729812
if let Some(fut) = self.discovery_fut.as_mut() {

0 commit comments

Comments
 (0)