Skip to content

refactor: use a separate queue for inbound disco packets from relays #3309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
14 changes: 14 additions & 0 deletions iroh/src/disco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use data_encoding::HEXLOWER;
use iroh_base::{PublicKey, RelayUrl};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -102,6 +103,19 @@ pub fn source_and_box(p: &[u8]) -> Option<(PublicKey, &[u8])> {
Some((sender, sealed_box))
}

/// If `p` looks like a disco message it returns the slice of `p` that represents the disco public key source,
/// and the part that is the box.
pub fn source_and_box_bytes(p: &Bytes) -> Option<(PublicKey, Bytes)> {
if !looks_like_disco_wrapper(p) {
return None;
}

let source = &p[MAGIC_LEN..MAGIC_LEN + KEY_LEN];
let sender = PublicKey::try_from(source).ok()?;
let sealed_box = p.slice(MAGIC_LEN + KEY_LEN..);
Some((sender, sealed_box))
}

/// A discovery message.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Message {
Expand Down
121 changes: 81 additions & 40 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,11 +1094,6 @@ impl MagicSock {
return None;
}

if self.handle_relay_disco_message(&dm.buf, &dm.url, dm.src) {
// DISCO messages are handled internally in the MagicSock, do not pass to Quinn.
return None;
}

let quic_mapped_addr = self.node_map.receive_relay(&dm.url, dm.src);

// Normalize local_ip
Expand All @@ -1119,32 +1114,6 @@ impl MagicSock {
Some((dm.src, meta, dm.buf))
}

fn handle_relay_disco_message(
&self,
msg: &[u8],
url: &RelayUrl,
relay_node_src: PublicKey,
) -> bool {
match disco::source_and_box(msg) {
Some((source, sealed_box)) => {
if relay_node_src != source {
// TODO: return here?
warn!("Received relay disco message from connection for {}, but with message from {}", relay_node_src.fmt_short(), source.fmt_short());
}
self.handle_disco_message(
source,
sealed_box,
DiscoMessageSource::Relay {
url: url.clone(),
key: relay_node_src,
},
);
true
}
None => false,
}
}

/// Handles a discovery message.
#[instrument("disco_in", skip_all, fields(node = %sender.fmt_short(), %src))]
fn handle_disco_message(&self, sender: PublicKey, sealed_box: &[u8], src: DiscoMessageSource) {
Expand Down Expand Up @@ -1827,7 +1796,13 @@ impl Handle {

let mut actor_tasks = JoinSet::default();

let relay_actor = RelayActor::new(msock.clone(), relay_datagram_recv_queue, relay_protocol);
let (relay_disco_recv_tx, mut relay_disco_recv_rx) = tokio::sync::mpsc::channel(1024);
let relay_actor = RelayActor::new(
msock.clone(),
relay_datagram_recv_queue,
relay_disco_recv_tx,
relay_protocol,
);
let relay_actor_cancel_token = relay_actor.cancel_token();
actor_tasks.spawn(
async move {
Expand All @@ -1837,6 +1812,23 @@ impl Handle {
}
.instrument(info_span!("relay-actor")),
);
actor_tasks.spawn({
let msock = msock.clone();
async move {
while let Some(message) = relay_disco_recv_rx.recv().await {
msock.handle_disco_message(
message.source,
&message.sealed_box,
DiscoMessageSource::Relay {
url: message.relay_url,
key: message.relay_remote_node_id,
},
);
}
debug!("relay-disco-recv actor closed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "actor closed" is sufficient since you already have an info span with the actor name. But I don't mind if you do this either.

}
.instrument(info_span!("relay-disco-recv"))
});

#[cfg(not(wasm_browser))]
let _ = actor_tasks.spawn({
Expand Down Expand Up @@ -2123,15 +2115,17 @@ impl RelayDatagramSendChannelReceiver {
#[derive(Debug)]
struct RelayDatagramRecvQueue {
queue: ConcurrentQueue<RelayRecvDatagram>,
waker: AtomicWaker,
recv_waker: AtomicWaker,
send_wakers: ConcurrentQueue<Waker>,
}

impl RelayDatagramRecvQueue {
/// Creates a new, empty queue with a fixed size bound of 512 items.
fn new() -> Self {
Self {
queue: ConcurrentQueue::bounded(512),
waker: AtomicWaker::new(),
recv_waker: AtomicWaker::new(),
send_wakers: ConcurrentQueue::unbounded(),
}
}

Expand All @@ -2144,10 +2138,49 @@ impl RelayDatagramRecvQueue {
item: RelayRecvDatagram,
) -> Result<(), concurrent_queue::PushError<RelayRecvDatagram>> {
self.queue.push(item).inspect(|_| {
self.waker.wake();
self.recv_waker.wake();
})
}

/// Polls for whether the queue has free slots for sending items.
///
/// If the queue has free slots, this returns [`Poll::Ready`].
/// If the queue is full, [`Poll::Pending`] is returned and the waker
/// is stored and woken once the queue has free slots.
///
/// This can be called from multiple tasks concurrently. If a slot becomes
/// available, all stored wakers will be woken simultaneously.
/// This also means that even if [`Poll::Ready`] is returned, it is not
/// guaranteed that [`Self::try_send`] will return `Ok` on the next call,
/// because another send task could have used the slot already.
fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
if self.queue.is_closed() {
Poll::Ready(Err(anyhow!("Queue closed")))
} else if !self.queue.is_full() {
Poll::Ready(Ok(()))
} else {
match self.send_wakers.push(cx.waker().clone()) {
Ok(()) => Poll::Pending,
Err(concurrent_queue::PushError::Full(_)) => {
unreachable!("Send waker queue is unbounded")
}
Err(concurrent_queue::PushError::Closed(_)) => {
Poll::Ready(Err(anyhow!("Queue closed")))
}
}
}
}

async fn send_ready(&self) -> Result<()> {
std::future::poll_fn(|cx| self.poll_send_ready(cx)).await
}

fn wake_senders(&self) {
while let Ok(waker) = self.send_wakers.pop() {
waker.wake();
}
}

/// Polls for new items in the queue.
///
/// Although this method is available from `&self`, it must not be
Expand All @@ -2162,23 +2195,31 @@ impl RelayDatagramRecvQueue {
/// to be able to poll from `&self`.
fn poll_recv(&self, cx: &mut Context) -> Poll<Result<RelayRecvDatagram>> {
match self.queue.pop() {
Ok(value) => Poll::Ready(Ok(value)),
Ok(value) => {
self.wake_senders();
Poll::Ready(Ok(value))
}
Err(concurrent_queue::PopError::Empty) => {
self.waker.register(cx.waker());
self.recv_waker.register(cx.waker());

match self.queue.pop() {
Ok(value) => {
self.waker.take();
self.recv_waker.take();
self.wake_senders();
Poll::Ready(Ok(value))
}
Err(concurrent_queue::PopError::Empty) => Poll::Pending,
Err(concurrent_queue::PopError::Closed) => {
self.waker.take();
self.recv_waker.take();
self.wake_senders();
Poll::Ready(Err(anyhow!("Queue closed")))
}
}
}
Err(concurrent_queue::PopError::Closed) => Poll::Ready(Err(anyhow!("Queue closed"))),
Err(concurrent_queue::PopError::Closed) => {
self.wake_senders();
Poll::Ready(Err(anyhow!("Queue closed")))
}
}
}
}
Expand Down
Loading
Loading