Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l

[workspace.lints.clippy]
unused-async = "warn"

[patch.crates-io]
iroh-metrics = { git = "https://github.com/n0-computer/iroh-metrics", branch = "arqu/histograms" }
2 changes: 1 addition & 1 deletion iroh-dns-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ hickory-server = { version = "0.25.1", features = ["https-ring"] }
http = "1.0.0"
humantime = "2.2.0"
humantime-serde = "1.1.1"
iroh-metrics = { version = "0.35", features = ["service"] }
iroh-metrics = { git = "https://github.com/n0-computer/iroh-metrics", branch = "arqu/histograms", features = ["service"] }
lru = "0.13"
n0-future = "0.1.2"
n0-snafu = "0.2.2"
Expand Down
2 changes: 1 addition & 1 deletion iroh-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ http-body-util = "0.1.0"
hyper = { version = "1", features = ["server", "client", "http1"] }
hyper-util = "0.1.1"
iroh-base = { version = "0.92.0", path = "../iroh-base", default-features = false, features = ["key", "relay"] }
iroh-metrics = { version = "0.35", default-features = false }
iroh-metrics = { git = "https://github.com/n0-computer/iroh-metrics", branch = "arqu/histograms", default-features = false }
n0-future = "0.1.2"
num_enum = "0.7"
pin-project = "1"
Expand Down
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ futures-buffered = "0.2.11"
spki = { version = "0.7.3", features = ["std"] }

# metrics
iroh-metrics = { version = "0.35", default-features = false }
iroh-metrics = { git = "https://github.com/n0-computer/iroh-metrics", branch = "arqu/histograms", default-features = false }

# local-swarm-discovery
swarm-discovery = { version = "0.4", optional = true }
Expand Down
1 change: 1 addition & 0 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ impl Builder {
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
#[cfg(any(test, feature = "test-utils"))]
path_selection: self.path_selection,
interface_priority: Default::default(),
metrics,
};

Expand Down
1 change: 1 addition & 0 deletions iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ pub use iroh_base::{
KeyParsingError, NodeAddr, NodeId, PublicKey, RelayUrl, RelayUrlParseError, SecretKey,
};
pub use iroh_relay::{RelayMap, RelayNode, node_info};
pub use magicsock::InterfacePriority;
pub use n0_watcher::Watcher;

#[cfg(any(test, feature = "test-utils"))]
Expand Down
101 changes: 83 additions & 18 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use crate::{
net_report::{self, IfStateDetails, IpMappedAddresses, Report},
};

mod interface_priority;
mod metrics;
mod node_map;

Expand All @@ -85,6 +86,7 @@ pub(crate) mod transports;
pub use node_map::Source;

pub use self::{
interface_priority::InterfacePriority,
metrics::Metrics,
node_map::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo},
};
Expand All @@ -95,6 +97,19 @@ const ENDPOINTS_FRESH_ENOUGH_DURATION: Duration = Duration::from_secs(27);

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

/// Jitter range for heartbeat intervals (±25% of base interval).
/// This prevents synchronized heartbeat storms across many nodes.
const HEARTBEAT_JITTER_PCT: f64 = 0.25;

/// Create a jittered interval to prevent synchronized heartbeat storms.
fn jittered_interval(base: Duration) -> time::Interval {
let jitter_range = base.as_secs_f64() * HEARTBEAT_JITTER_PCT;
let jitter = rand::thread_rng().gen_range(-jitter_range..=jitter_range);
let jittered = base.as_secs_f64() + jitter;
let duration = Duration::from_secs_f64(jittered.max(0.1));
time::interval(duration)
}

/// Contains options for `MagicSock::listen`.
#[derive(derive_more::Debug)]
pub(crate) struct Options {
Expand Down Expand Up @@ -145,6 +160,12 @@ pub(crate) struct Options {
#[cfg(any(test, feature = "test-utils"))]
pub(crate) path_selection: PathSelection,

/// Interface-based path prioritization configuration.
///
/// Allows preferring certain network interfaces over others when multiple paths exist.
/// Useful for scenarios like preferring Ethernet over Wi-Fi.
pub(crate) interface_priority: InterfacePriority,

pub(crate) metrics: EndpointMetrics,
}

Expand Down Expand Up @@ -675,7 +696,7 @@ impl MagicSock {
// UDP

// Update the NodeMap and remap RecvMeta to the NodeIdMappedAddr.
match self.node_map.receive_udp(*addr) {
match self.node_map.receive_udp(*addr, quinn_meta.dst_ip) {
None => {
// Check if this address is mapped to an IpMappedAddr
if let Some(ip_mapped_addr) =
Expand Down Expand Up @@ -795,7 +816,8 @@ impl MagicSock {
}
disco::Message::Pong(pong) => {
self.metrics.magicsock.recv_disco_pong.inc();
self.node_map.handle_pong(sender, src, pong);
self.node_map
.handle_pong(sender, src, pong, &self.metrics.magicsock);
}
disco::Message::CallMeMaybe(cm) => {
self.metrics.magicsock.recv_disco_call_me_maybe.inc();
Expand Down Expand Up @@ -1353,9 +1375,27 @@ impl Handle {
insecure_skip_relay_cert_verify,
#[cfg(any(test, feature = "test-utils"))]
path_selection,
interface_priority,
metrics,
} = opts;

// Load interface priority from environment if not explicitly set
let interface_priority = if interface_priority.is_empty() {
match InterfacePriority::from_env() {
Ok(Some(priority)) => {
info!("Loaded interface priority from IROH_INTERFACE_PRIORITY");
priority
}
Ok(None) => InterfacePriority::default(),
Err(e) => {
warn!("Failed to parse IROH_INTERFACE_PRIORITY: {}", e);
InterfacePriority::default()
}
}
} else {
interface_priority
};

let addr_v4 = addr_v4.unwrap_or_else(|| SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));

#[cfg(not(wasm_browser))]
Expand All @@ -1369,14 +1409,7 @@ impl Handle {
let ipv6_reported = false;

// load the node data
let node_map = node_map.unwrap_or_default();
let node_map = NodeMap::load_from_vec(
node_map,
#[cfg(any(test, feature = "test-utils"))]
path_selection,
ipv6_reported,
&metrics.magicsock,
);
let node_addrs = node_map.unwrap_or_default();

let my_relay = Watchable::new(None);
let ipv6_reported = Arc::new(AtomicBool::new(ipv6_reported));
Expand All @@ -1403,6 +1436,35 @@ impl Handle {
#[cfg(wasm_browser)]
let transports = Transports::new(relay_transports);

// Create network monitor early, so we can build the interface map
let network_monitor = netmon::Monitor::new()
.await
.context(CreateNetmonMonitorSnafu)?;

// Build interface map from bind addresses and network state
#[cfg(not(wasm_browser))]
let ip_to_interface = {
let netmon_state = network_monitor.interface_state().get();
let bind_addrs = transports.ip_bind_addrs();
interface_priority::build_interface_map(&bind_addrs, &netmon_state)
};
#[cfg(wasm_browser)]
let ip_to_interface = Default::default();

// Create NodeMap with the interface map
let node_map = NodeMap::load_from_vec(
node_addrs,
#[cfg(any(test, feature = "test-utils"))]
path_selection,
interface_priority.clone(),
ip_to_interface,
#[cfg(not(wasm_browser))]
ipv6,
#[cfg(wasm_browser)]
false,
&metrics.magicsock,
);

let (disco, disco_receiver) = DiscoState::new(secret_encryption_key);

let msock = Arc::new(MagicSock {
Expand All @@ -1411,7 +1473,7 @@ impl Handle {
closed: AtomicBool::new(false),
disco,
actor_sender: actor_sender.clone(),
ipv6_reported,
ipv6_reported: ipv6_reported.clone(),
node_map,
ip_mapped_addrs: ip_mapped_addrs.clone(),
discovery,
Expand Down Expand Up @@ -1453,10 +1515,6 @@ impl Handle {
)
.context(CreateQuinnEndpointSnafu)?;

let network_monitor = netmon::Monitor::new()
.await
.context(CreateNetmonMonitorSnafu)?;

let qad_endpoint = endpoint.clone();

#[cfg(any(test, feature = "test-utils"))]
Expand Down Expand Up @@ -1502,6 +1560,7 @@ impl Handle {
);

let netmon_watcher = network_monitor.interface_state();

let actor = Actor {
msg_receiver: actor_receiver,
msock: msock.clone(),
Expand Down Expand Up @@ -1849,7 +1908,7 @@ impl Actor {
let mut current_netmon_state = self.netmon_watcher.get();

#[cfg(not(wasm_browser))]
let mut direct_addr_heartbeat_timer = time::interval(HEARTBEAT_INTERVAL);
let mut direct_addr_heartbeat_timer = jittered_interval(HEARTBEAT_INTERVAL);

#[cfg(not(wasm_browser))]
let mut portmap_watcher = self
Expand Down Expand Up @@ -2063,7 +2122,9 @@ impl Actor {
async fn handle_actor_message(&mut self, msg: ActorMessage) {
match msg {
ActorMessage::EndpointPingExpired(id, txid) => {
self.msock.node_map.notify_ping_timeout(id, txid);
self.msock
.node_map
.notify_ping_timeout(id, txid, &self.msock.metrics.magicsock);
}
ActorMessage::NetworkChange => {
self.network_monitor.network_change().await.ok();
Expand Down Expand Up @@ -2255,7 +2316,9 @@ impl Actor {
/// This is called when connectivity changes enough that we no longer trust the old routes.
#[instrument(skip_all)]
fn reset_endpoint_states(&mut self) {
self.msock.node_map.reset_node_states()
self.msock
.node_map
.reset_node_states(&self.msock.metrics.magicsock)
}
}

Expand Down Expand Up @@ -2571,6 +2634,7 @@ mod tests {
insecure_skip_relay_cert_verify: false,
#[cfg(any(test, feature = "test-utils"))]
path_selection: PathSelection::default(),
interface_priority: Default::default(),
discovery_user_data: None,
metrics: Default::default(),
}
Expand Down Expand Up @@ -3103,6 +3167,7 @@ mod tests {
server_config,
insecure_skip_relay_cert_verify: false,
path_selection: PathSelection::default(),
interface_priority: Default::default(),
metrics: Default::default(),
};
let msock = MagicSock::spawn(opts).await?;
Expand Down
Loading
Loading