From 20b2f3782be823847e91b4fae192f6666a60a7e0 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Wed, 1 Oct 2025 14:50:43 +0200 Subject: [PATCH 1/3] wip: some congestion trickery --- iroh/src/magicsock.rs | 15 +++- iroh/src/magicsock/node_map/node_state.rs | 14 +++- iroh/src/magicsock/node_map/path_state.rs | 12 +++- iroh/src/magicsock/node_map/path_validity.rs | 73 +++++++++++++++++++- iroh/src/magicsock/transports/relay.rs | 9 ++- 5 files changed, 114 insertions(+), 9 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 814b0c7d22..e35f6c0e42 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -97,6 +97,19 @@ const ENDPOINTS_FRESH_ENOUGH_DURATION: Duration = Duration::from_secs(27); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// Jitter range for heartbeat intervals (±25% of base interval). +/// This prevents synchronized heartbeat storms across many nodes. +const HEARTBEAT_JITTER_PCT: f64 = 0.25; + +/// Create a jittered interval to prevent synchronized heartbeat storms. +fn jittered_interval(base: Duration) -> time::Interval { + let jitter_range = base.as_secs_f64() * HEARTBEAT_JITTER_PCT; + let jitter = rand::thread_rng().gen_range(-jitter_range..=jitter_range); + let jittered = base.as_secs_f64() + jitter; + let duration = Duration::from_secs_f64(jittered.max(0.1)); + time::interval(duration) +} + /// Contains options for `MagicSock::listen`. #[derive(derive_more::Debug)] pub(crate) struct Options { @@ -1864,7 +1877,7 @@ impl Actor { let mut current_netmon_state = self.netmon_watcher.get(); #[cfg(not(wasm_browser))] - let mut direct_addr_heartbeat_timer = time::interval(HEARTBEAT_INTERVAL); + let mut direct_addr_heartbeat_timer = jittered_interval(HEARTBEAT_INTERVAL); #[cfg(not(wasm_browser))] let mut portmap_watcher = self diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 99a2644594..957a596198 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -464,11 +464,21 @@ impl NodeState { // which we should have received the pong, clear best addr and // pong. Both are used to select this path again, but we know // it's not a usable path now. - path_state.validity = PathValidity::empty(); + // Record ping failure and only clear validity after threshold + path_state.validity.record_ping_failure(); metrics.path_ping_failures.inc(); path_state.validity.record_metrics(metrics); - metrics.path_marked_outdated.inc(); + + if path_state.validity.should_mark_outdated() { + debug!( + "path {} marked outdated after {} consecutive failures", + addr, + path_state.validity.consecutive_failures() + ); + metrics.path_marked_outdated.inc(); + path_state.validity = PathValidity::empty(); + } } } } diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 5e2e933706..381f04af69 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -125,7 +125,14 @@ impl PathState { } } + let had_failures = self.validity.consecutive_failures() > 0; + self.validity.update_pong(r.pong_at, r.latency); + self.validity.reset_failures(); + + if had_failures { + metrics.path_failure_resets.inc(); + } self.validity.record_metrics(metrics); } @@ -134,17 +141,20 @@ impl PathState { self.last_payload_msg = Some(now); self.validity .receive_payload(now, path_validity::Source::QuicPayload); + self.validity.reset_failures(); } #[cfg(test)] pub(super) fn with_pong_reply(node_id: NodeId, r: PongReply) -> Self { + let mut validity = PathValidity::new(r.pong_at, r.latency); + validity.reset_failures(); PathState { node_id, path: r.from.clone(), last_ping: None, last_got_ping: None, call_me_maybe_time: None, - validity: PathValidity::new(r.pong_at, r.latency), + validity, last_payload_msg: None, sources: HashMap::new(), } diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs index 43b7d91b30..1e975025a8 100644 --- a/iroh/src/magicsock/node_map/path_validity.rs +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -9,7 +9,14 @@ use crate::magicsock::Metrics as MagicsockMetrics; /// currently trusted. /// /// If trust goes away, it can be brought back with another valid DISCO UDP pong. -const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); +/// +/// Increased from 6.5s to 12s to be more resilient under congestion. +const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_secs(12); + +/// Number of consecutive ping failures required before marking a path as outdated. +/// +/// This implements a tolerance to prevent temporary packet loss from causing path degradation. +const PING_FAILURE_THRESHOLD: u8 = 3; /// Tracks a path's validity. /// @@ -27,6 +34,7 @@ struct Inner { latest_pong: Instant, latency: Duration, trust_until: Instant, + consecutive_failures: u8, congestion_metrics: CongestionMetrics, } @@ -150,6 +158,7 @@ impl PathValidity { trust_until: pong_at + Source::ReceivedPong.trust_duration(), latest_pong: pong_at, latency, + consecutive_failures: 0, congestion_metrics: metrics, })) } @@ -161,6 +170,7 @@ impl PathValidity { inner.trust_until = pong_at + Source::ReceivedPong.trust_duration(); inner.latest_pong = pong_at; inner.latency = latency; + inner.consecutive_failures = 0; inner.congestion_metrics.add_latency_sample(latency); } None => { @@ -226,6 +236,31 @@ impl PathValidity { Some(self.0.as_ref()?.latest_pong) } + /// Record a ping failure (timeout or no response). + /// + /// Only marks the path as outdated after PING_FAILURE_THRESHOLD consecutive failures. + pub(super) fn record_ping_failure(&mut self) { + let Some(state) = self.0.as_mut() else { + return; + }; + state.consecutive_failures = state.consecutive_failures.saturating_add(1); + } + + /// Check if path should be considered outdated based on consecutive failures. + pub(super) fn should_mark_outdated(&self) -> bool { + self.0 + .as_ref() + .map(|state| state.consecutive_failures >= PING_FAILURE_THRESHOLD) + .unwrap_or(false) + } + + /// Reset consecutive failure counter (called when we receive activity). + pub(super) fn reset_failures(&mut self) { + if let Some(state) = self.0.as_mut() { + state.consecutive_failures = 0; + } + } + /// Record that a ping was sent on this path. pub(super) fn record_ping_sent(&mut self) { if let Some(state) = self.0.as_mut() { @@ -267,6 +302,14 @@ impl PathValidity { .and_then(|state| state.congestion_metrics.avg_latency()) } + /// Get the number of consecutive failures. + pub(super) fn consecutive_failures(&self) -> u8 { + self.0 + .as_ref() + .map(|state| state.consecutive_failures) + .unwrap_or(0) + } + /// Record congestion metrics to the metrics system. /// Should be called periodically or on significant events. pub(super) fn record_metrics(&self, metrics: &MagicsockMetrics) { @@ -302,7 +345,7 @@ impl Inner { mod tests { use n0_future::time::{Duration, Instant}; - use super::{PathValidity, Source, TRUST_UDP_ADDR_DURATION}; + use super::{PING_FAILURE_THRESHOLD, PathValidity, Source, TRUST_UDP_ADDR_DURATION}; #[tokio::test(start_paused = true)] async fn test_basic_path_validity_lifetime() { @@ -330,6 +373,32 @@ mod tests { assert!(!validity.is_valid(Instant::now())); assert!(validity.is_outdated(Instant::now())); } + + #[tokio::test] + async fn test_multiple_ping_failures() { + let mut validity = PathValidity::new(Instant::now(), Duration::from_millis(20)); + + // First failure should not mark as outdated + validity.record_ping_failure(); + assert!(!validity.should_mark_outdated()); + assert_eq!(validity.consecutive_failures(), 1); + + // Second failure should not mark as outdated + validity.record_ping_failure(); + assert!(!validity.should_mark_outdated()); + assert_eq!(validity.consecutive_failures(), 2); + + // Third failure should mark as outdated (threshold = 3) + validity.record_ping_failure(); + assert!(validity.should_mark_outdated()); + assert_eq!(validity.consecutive_failures(), PING_FAILURE_THRESHOLD); + + // Receiving pong should reset failures + validity.update_pong(Instant::now(), Duration::from_millis(20)); + assert_eq!(validity.consecutive_failures(), 0); + assert!(!validity.should_mark_outdated()); + } + #[tokio::test] async fn test_congestion_metrics() { let mut validity = PathValidity::new(Instant::now(), Duration::from_millis(10)); diff --git a/iroh/src/magicsock/transports/relay.rs b/iroh/src/magicsock/transports/relay.rs index e81c26f267..ee73bcb87a 100644 --- a/iroh/src/magicsock/transports/relay.rs +++ b/iroh/src/magicsock/transports/relay.rs @@ -39,11 +39,14 @@ pub(crate) struct RelayTransport { impl RelayTransport { pub(crate) fn new(config: RelayActorConfig) -> Self { - let (relay_datagram_send_tx, relay_datagram_send_rx) = mpsc::channel(256); + // Increased from 256 to 1024 to better handle congestion and burst traffic + let (relay_datagram_send_tx, relay_datagram_send_rx) = mpsc::channel(1024); - let (relay_datagram_recv_tx, relay_datagram_recv_rx) = mpsc::channel(512); + // Increased from 512 to 2048 to reduce drops under load + let (relay_datagram_recv_tx, relay_datagram_recv_rx) = mpsc::channel(2048); - let (actor_sender, actor_receiver) = mpsc::channel(256); + // Increased from 256 to 512 for actor control messages + let (actor_sender, actor_receiver) = mpsc::channel(512); let my_node_id = config.secret_key.public(); let my_relay = config.my_relay.clone(); From bf53b83ced2e131790530c85523e087d0c6fe23e Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Thu, 2 Oct 2025 23:43:02 +0200 Subject: [PATCH 2/3] intf prio --- iroh/src/endpoint.rs | 1 + iroh/src/magicsock.rs | 75 ++++- iroh/src/magicsock/interface_priority.rs | 333 ++++++++++++++++++++++ iroh/src/magicsock/node_map.rs | 43 ++- iroh/src/magicsock/node_map/node_state.rs | 15 +- iroh/src/magicsock/node_map/path_state.rs | 15 + iroh/src/magicsock/node_map/udp_paths.rs | 40 ++- 7 files changed, 488 insertions(+), 34 deletions(-) create mode 100644 iroh/src/magicsock/interface_priority.rs diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 45a816adde..31d905948f 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -183,6 +183,7 @@ impl Builder { insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] path_selection: self.path_selection, + interface_priority: Default::default(), metrics, }; diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index e35f6c0e42..ff8e223167 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -79,6 +79,7 @@ use crate::{ net_report::{self, IfStateDetails, IpMappedAddresses, Report}, }; +mod interface_priority; mod metrics; mod node_map; @@ -87,6 +88,7 @@ pub(crate) mod transports; pub use node_map::Source; pub use self::{ + interface_priority::InterfacePriority, metrics::Metrics, node_map::{ConnectionType, ControlMsg, DirectAddrInfo}, }; @@ -104,7 +106,7 @@ const HEARTBEAT_JITTER_PCT: f64 = 0.25; /// Create a jittered interval to prevent synchronized heartbeat storms. fn jittered_interval(base: Duration) -> time::Interval { let jitter_range = base.as_secs_f64() * HEARTBEAT_JITTER_PCT; - let jitter = rand::thread_rng().gen_range(-jitter_range..=jitter_range); + let jitter = rand::rng().random_range(-jitter_range..=jitter_range); let jittered = base.as_secs_f64() + jitter; let duration = Duration::from_secs_f64(jittered.max(0.1)); time::interval(duration) @@ -157,6 +159,12 @@ pub(crate) struct Options { #[cfg(any(test, feature = "test-utils"))] pub(crate) path_selection: PathSelection, + /// Interface-based path prioritization configuration. + /// + /// Allows preferring certain network interfaces over others when multiple paths exist. + /// Useful for scenarios like preferring Ethernet over Wi-Fi. + pub(crate) interface_priority: InterfacePriority, + pub(crate) metrics: EndpointMetrics, } @@ -685,7 +693,7 @@ impl MagicSock { // UDP // Update the NodeMap and remap RecvMeta to the NodeIdMappedAddr. - match self.node_map.receive_udp(*addr) { + match self.node_map.receive_udp(*addr, quinn_meta.dst_ip) { None => { // Check if this address is mapped to an IpMappedAddr if let Some(ip_mapped_addr) = @@ -1365,6 +1373,7 @@ impl Handle { insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] path_selection, + interface_priority, metrics, } = opts; @@ -1384,6 +1393,23 @@ impl Handle { } }; + // Load interface priority from environment if not explicitly set + let interface_priority = if interface_priority.is_empty() { + match InterfacePriority::from_env() { + Ok(Some(priority)) => { + info!("Loaded interface priority from IROH_INTERFACE_PRIORITY"); + priority + } + Ok(None) => InterfacePriority::default(), + Err(e) => { + warn!("Failed to parse IROH_INTERFACE_PRIORITY: {}", e); + InterfacePriority::default() + } + } + } else { + interface_priority + }; + let addr_v4 = addr_v4.unwrap_or_else(|| SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); #[cfg(not(wasm_browser))] @@ -1397,13 +1423,7 @@ impl Handle { let ipv6_reported = false; // load the node data - let node_map = NodeMap::load_from_vec( - Vec::new(), - #[cfg(any(test, feature = "test-utils"))] - path_selection, - ipv6_reported, - &metrics.magicsock, - ); + let node_addrs = Vec::new(); let my_relay = Watchable::new(None); let ipv6_reported = Arc::new(AtomicBool::new(ipv6_reported)); @@ -1430,6 +1450,35 @@ impl Handle { #[cfg(wasm_browser)] let transports = Transports::new(relay_transports); + // Create network monitor early, so we can build the interface map + let network_monitor = netmon::Monitor::new() + .await + .context(CreateNetmonMonitorSnafu)?; + + // Build interface map from bind addresses and network state + #[cfg(not(wasm_browser))] + let ip_to_interface = { + let netmon_state = network_monitor.interface_state().get(); + let bind_addrs = transports.ip_bind_addrs(); + interface_priority::build_interface_map(&bind_addrs, &netmon_state) + }; + #[cfg(wasm_browser)] + let ip_to_interface = Default::default(); + + // Create NodeMap with the interface map + let node_map = NodeMap::load_from_vec( + node_addrs, + #[cfg(any(test, feature = "test-utils"))] + path_selection, + interface_priority.clone(), + ip_to_interface, + #[cfg(not(wasm_browser))] + ipv6, + #[cfg(wasm_browser)] + false, + &metrics.magicsock, + ); + let (disco, disco_receiver) = DiscoState::new(secret_encryption_key); let msock = Arc::new(MagicSock { @@ -1438,7 +1487,7 @@ impl Handle { closed: AtomicBool::new(false), disco, actor_sender: actor_sender.clone(), - ipv6_reported, + ipv6_reported: ipv6_reported.clone(), node_map, ip_mapped_addrs: ip_mapped_addrs.clone(), discovery, @@ -1479,10 +1528,6 @@ impl Handle { ) .context(CreateQuinnEndpointSnafu)?; - let network_monitor = netmon::Monitor::new() - .await - .context(CreateNetmonMonitorSnafu)?; - let qad_endpoint = endpoint.clone(); #[cfg(any(test, feature = "test-utils"))] @@ -2570,6 +2615,7 @@ mod tests { insecure_skip_relay_cert_verify: false, #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), + interface_priority: Default::default(), discovery_user_data: None, metrics: Default::default(), } @@ -3105,6 +3151,7 @@ mod tests { server_config, insecure_skip_relay_cert_verify: false, path_selection: PathSelection::default(), + interface_priority: Default::default(), metrics: Default::default(), }; let msock = MagicSock::spawn(opts).await?; diff --git a/iroh/src/magicsock/interface_priority.rs b/iroh/src/magicsock/interface_priority.rs new file mode 100644 index 0000000000..4f35bac2b5 --- /dev/null +++ b/iroh/src/magicsock/interface_priority.rs @@ -0,0 +1,333 @@ +//! Interface-based path prioritization for network paths. +//! +//! Allows preferring certain network interfaces over others when multiple paths +//! to a peer exist. Useful for scenarios like preferring Ethernet over Wi-Fi. + +use std::{net::IpAddr, sync::Arc}; + +use netwatch::netmon; + +/// Configuration for prioritizing network interfaces when selecting paths. +/// +/// Interfaces are matched using glob-style patterns and assigned priority weights. +/// Higher weights indicate higher priority. +/// +/// # Examples +/// +/// ```no_run +/// use iroh::magicsock::InterfacePriority; +/// +/// // Prefer Infiniband (ib*) over Ethernet (eth*, en*) +/// let priority = InterfacePriority::new(vec![ +/// ("ib*".to_string(), 100), +/// ("eth*".to_string(), 50), +/// ("en*".to_string(), 50), +/// ]); +/// ``` +#[derive(Debug, Clone)] +pub struct InterfacePriority { + rules: Arc>, +} + +#[derive(Debug, Clone)] +struct InterfaceRule { + pattern: Pattern, + weight: u32, +} + +#[derive(Debug, Clone)] +enum Pattern { + Exact(String), + Prefix(String), + Suffix(String), + Contains(String), + Wildcard { prefix: String, suffix: String }, +} + +impl InterfacePriority { + /// Create a new interface priority configuration. + /// + /// # Arguments + /// + /// * `patterns` - List of (pattern, weight) tuples. Patterns support: + /// - Exact match: `"eth0"` matches only "eth0" + /// - Prefix: `"ib*"` matches "ib0", "ib1", etc. + /// - Suffix: `"*0"` matches "eth0", "ib0", etc. + /// - Contains: `"*eth*"` matches anything containing "eth" + /// - Complex: `"eth*0"` matches "eth0", "eth10", etc. + /// + /// Higher weights indicate higher priority. + pub fn new(patterns: Vec<(String, u32)>) -> Self { + let rules = patterns + .into_iter() + .map(|(pattern, weight)| InterfaceRule { + pattern: Pattern::parse(&pattern), + weight, + }) + .collect(); + + Self { + rules: Arc::new(rules), + } + } + + /// Parse interface priorities from an environment variable. + /// + /// Format: `IROH_INTERFACE_PRIORITY="ib*:100,eth*:50,en*:50"` + /// + /// Each entry is `pattern:weight` separated by commas. + /// Returns `None` if the environment variable is not set. + /// Returns an error if the format is invalid. + /// + /// # Example + /// + /// ```bash + /// export IROH_INTERFACE_PRIORITY="ib*:100,eth*:50" + /// ``` + pub fn from_env() -> Result, String> { + const ENV_VAR: &str = "IROH_INTERFACE_PRIORITY"; + + let Some(value) = std::env::var(ENV_VAR).ok() else { + return Ok(None); + }; + + if value.is_empty() { + return Ok(None); + } + + let mut patterns = Vec::new(); + for entry in value.split(',') { + let entry = entry.trim(); + if entry.is_empty() { + continue; + } + + let parts: Vec<&str> = entry.split(':').collect(); + if parts.len() != 2 { + return Err(format!( + "Invalid format in {}: '{}'. Expected 'pattern:weight'", + ENV_VAR, entry + )); + } + + let pattern = parts[0].trim().to_string(); + let weight = parts[1].trim().parse::().map_err(|e| { + format!( + "Invalid weight in {}: '{}'. Expected number, got error: {}", + ENV_VAR, parts[1], e + ) + })?; + + patterns.push((pattern, weight)); + } + + if patterns.is_empty() { + Ok(None) + } else { + Ok(Some(Self::new(patterns))) + } + } + + /// Get the priority weight for a given interface name. + /// + /// Returns the weight of the first matching pattern, or 0 if no pattern matches. + pub fn weight(&self, interface_name: &str) -> u32 { + self.rules + .iter() + .find(|rule| rule.pattern.matches(interface_name)) + .map(|rule| rule.weight) + .unwrap_or(0) + } + + /// Returns true if this configuration is empty (no rules). + pub fn is_empty(&self) -> bool { + self.rules.is_empty() + } +} + +/// Detect which network interface would be used for a given local IP address. +/// +/// This examines the netmon state to find which interface owns the given IP address. +pub(super) fn detect_interface(local_ip: IpAddr, netmon_state: &netmon::State) -> Option { + for (iface_name, iface) in &netmon_state.interfaces { + // Check if any of this interface's addresses match our local IP + if iface.addrs().any(|addr| addr.addr() == local_ip) { + return Some(iface_name.clone()); + } + } + None +} + +/// Build a mapping from bind addresses to interface names. +/// +/// This is useful for quickly looking up which interface a socket is bound to. +pub(super) fn build_interface_map( + bind_addrs: &[std::net::SocketAddr], + netmon_state: &netmon::State, +) -> std::collections::HashMap { + let mut map = std::collections::HashMap::new(); + for bind_addr in bind_addrs { + if let Some(interface) = detect_interface(bind_addr.ip(), netmon_state) { + map.insert(bind_addr.ip(), interface); + } + } + map +} + +impl Default for InterfacePriority { + fn default() -> Self { + Self { + rules: Arc::new(Vec::new()), + } + } +} + +impl Pattern { + fn parse(pattern: &str) -> Self { + if !pattern.contains('*') { + return Pattern::Exact(pattern.to_string()); + } + + if pattern == "*" { + return Pattern::Contains(String::new()); + } + + if pattern.starts_with('*') && pattern.ends_with('*') { + let middle = &pattern[1..pattern.len() - 1]; + return Pattern::Contains(middle.to_string()); + } + + if let Some(suffix) = pattern.strip_prefix('*') { + return Pattern::Suffix(suffix.to_string()); + } + + if let Some(prefix) = pattern.strip_suffix('*') { + return Pattern::Prefix(prefix.to_string()); + } + + let parts: Vec<&str> = pattern.split('*').collect(); + if parts.len() == 2 { + return Pattern::Wildcard { + prefix: parts[0].to_string(), + suffix: parts[1].to_string(), + }; + } + + Pattern::Exact(pattern.to_string()) + } + + fn matches(&self, s: &str) -> bool { + match self { + Pattern::Exact(exact) => s == exact, + Pattern::Prefix(prefix) => s.starts_with(prefix), + Pattern::Suffix(suffix) => s.ends_with(suffix), + Pattern::Contains(substr) => s.contains(substr), + Pattern::Wildcard { prefix, suffix } => { + s.starts_with(prefix) + && s.ends_with(suffix) + && s.len() >= prefix.len() + suffix.len() + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_exact_match() { + let priority = InterfacePriority::new(vec![("eth0".to_string(), 100)]); + assert_eq!(priority.weight("eth0"), 100); + assert_eq!(priority.weight("eth1"), 0); + assert_eq!(priority.weight("ib0"), 0); + } + + #[test] + fn test_prefix_match() { + let priority = InterfacePriority::new(vec![("ib*".to_string(), 100)]); + assert_eq!(priority.weight("ib0"), 100); + assert_eq!(priority.weight("ib1"), 100); + assert_eq!(priority.weight("ib_main"), 100); + assert_eq!(priority.weight("eth0"), 0); + } + + #[test] + fn test_suffix_match() { + let priority = InterfacePriority::new(vec![("*0".to_string(), 50)]); + assert_eq!(priority.weight("eth0"), 50); + assert_eq!(priority.weight("ib0"), 50); + assert_eq!(priority.weight("eth1"), 0); + } + + #[test] + fn test_contains_match() { + let priority = InterfacePriority::new(vec![("*eth*".to_string(), 50)]); + assert_eq!(priority.weight("eth0"), 50); + assert_eq!(priority.weight("myeth"), 50); + assert_eq!(priority.weight("ethtest"), 50); + assert_eq!(priority.weight("ib0"), 0); + } + + #[test] + fn test_wildcard_match() { + let priority = InterfacePriority::new(vec![("eth*0".to_string(), 75)]); + assert_eq!(priority.weight("eth0"), 75); + assert_eq!(priority.weight("eth10"), 75); + assert_eq!(priority.weight("eth_test_0"), 75); + assert_eq!(priority.weight("eth1"), 0); + assert_eq!(priority.weight("ib0"), 0); + } + + #[test] + fn test_first_match_wins() { + let priority = + InterfacePriority::new(vec![("ib*".to_string(), 100), ("ib0".to_string(), 50)]); + assert_eq!(priority.weight("ib0"), 100); + } + + #[test] + fn test_multiple_interfaces() { + let priority = InterfacePriority::new(vec![ + ("ib*".to_string(), 100), + ("eth*".to_string(), 50), + ("en*".to_string(), 50), + ]); + assert_eq!(priority.weight("ib0"), 100); + assert_eq!(priority.weight("ib1"), 100); + assert_eq!(priority.weight("eth0"), 50); + assert_eq!(priority.weight("enp0s1"), 50); + assert_eq!(priority.weight("wlan0"), 0); + } + + #[test] + fn test_empty_priority() { + let priority = InterfacePriority::default(); + assert!(priority.is_empty()); + assert_eq!(priority.weight("any"), 0); + } + + #[test] + fn test_from_patterns() { + // Test by directly creating from patterns instead of env var + let priority = + InterfacePriority::new(vec![("ib*".to_string(), 100), ("eth*".to_string(), 50)]); + assert_eq!(priority.weight("ib0"), 100); + assert_eq!(priority.weight("eth0"), 50); + assert_eq!(priority.weight("wlan0"), 0); + } + + #[test] + fn test_parse_format() { + // Test the parsing logic without environment variables + // Valid format + let patterns = vec![("ib*".to_string(), 100), ("eth*".to_string(), 50)]; + let priority = InterfacePriority::new(patterns); + assert_eq!(priority.weight("ib0"), 100); + + // Empty is OK + let empty = InterfacePriority::new(vec![]); + assert!(empty.is_empty()); + assert_eq!(empty.weight("any"), 0); + } +} diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 997ec67b7c..5ac9aa73e8 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -52,7 +52,7 @@ pub(super) struct NodeMap { inner: Mutex, } -#[derive(Default, Debug)] +#[derive(Debug, Default)] pub(super) struct NodeMapInner { by_node_key: HashMap, by_ip_port: HashMap, @@ -61,6 +61,10 @@ pub(super) struct NodeMapInner { next_id: usize, #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, + interface_priority: crate::magicsock::InterfacePriority, + /// Mapping from local IP addresses to interface names. + /// Built from bind addresses at startup and updated on network changes. + ip_to_interface: HashMap, } /// Identifier to look up a [`NodeState`] in the [`NodeMap`]. @@ -122,6 +126,8 @@ impl NodeMap { pub(super) fn load_from_vec( nodes: Vec, #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, + interface_priority: crate::magicsock::InterfacePriority, + ip_to_interface: HashMap, have_ipv6: bool, metrics: &Metrics, ) -> Self { @@ -129,6 +135,8 @@ impl NodeMap { nodes, #[cfg(any(test, feature = "test-utils"))] path_selection, + interface_priority, + ip_to_interface, have_ipv6, metrics, )) @@ -163,8 +171,12 @@ impl NodeMap { pub(super) fn receive_udp( &self, udp_addr: SocketAddr, + dst_ip: Option, ) -> Option<(PublicKey, NodeIdMappedAddr)> { - self.inner.lock().expect("poisoned").receive_udp(udp_addr) + self.inner + .lock() + .expect("poisoned") + .receive_udp(udp_addr, dst_ip) } pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> NodeIdMappedAddr { @@ -346,12 +358,16 @@ impl NodeMapInner { fn load_from_vec( nodes: Vec, #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, + interface_priority: crate::magicsock::InterfacePriority, + ip_to_interface: HashMap, have_ipv6: bool, metrics: &Metrics, ) -> Self { let mut me = Self { #[cfg(any(test, feature = "test-utils"))] path_selection, + interface_priority, + ip_to_interface, ..Default::default() }; for node_addr in nodes { @@ -374,6 +390,7 @@ impl NodeMapInner { let relay_url = node_addr.relay_url.clone(); #[cfg(any(test, feature = "test-utils"))] let path_selection = self.path_selection; + let interface_priority = self.interface_priority.clone(); let node_state = self.get_or_insert_with(NodeStateKey::NodeId(node_id), || Options { node_id, relay_url, @@ -381,6 +398,7 @@ impl NodeMapInner { source, #[cfg(any(test, feature = "test-utils"))] path_selection, + interface_priority, }); node_state.update_from_node_addr( node_addr.relay_url.as_ref(), @@ -460,13 +478,19 @@ impl NodeMapInner { /// Marks the node we believe to be at `ipp` as recently used. #[cfg(not(wasm_browser))] - fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, NodeIdMappedAddr)> { + fn receive_udp( + &mut self, + udp_addr: SocketAddr, + dst_ip: Option, + ) -> Option<(NodeId, NodeIdMappedAddr)> { let ip_port: IpPort = udp_addr.into(); + let interface_name = dst_ip.and_then(|ip| self.ip_to_interface.get(&ip).cloned()); + let Some(node_state) = self.get_mut(NodeStateKey::IpPort(ip_port)) else { trace!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore"); return None; }; - node_state.receive_udp(ip_port, Instant::now()); + node_state.receive_udp(ip_port, interface_name.as_deref(), Instant::now()); Some((*node_state.public_key(), *node_state.quic_mapped_addr())) } @@ -474,6 +498,7 @@ impl NodeMapInner { fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> NodeIdMappedAddr { #[cfg(any(test, feature = "test-utils"))] let path_selection = self.path_selection; + let interface_priority = self.interface_priority.clone(); let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || { trace!("packets from unknown node, insert into node map"); Options { @@ -483,6 +508,7 @@ impl NodeMapInner { source: Source::Relay, #[cfg(any(test, feature = "test-utils"))] path_selection, + interface_priority, } }); node_state.receive_relay(relay_url, src, Instant::now()); @@ -578,6 +604,7 @@ impl NodeMapInner { fn handle_ping(&mut self, sender: NodeId, src: SendAddr, tx_id: TransactionId) -> PingHandled { #[cfg(any(test, feature = "test-utils"))] let path_selection = self.path_selection; + let interface_priority = self.interface_priority.clone(); let node_state = self.get_or_insert_with(NodeStateKey::NodeId(sender), || { debug!("received ping: node unknown, add to node map"); let source = if src.is_relay() { @@ -592,6 +619,7 @@ impl NodeMapInner { source, #[cfg(any(test, feature = "test-utils"))] path_selection, + interface_priority, } }); @@ -799,6 +827,8 @@ mod tests { let loaded_node_map = NodeMap::load_from_vec( addrs.clone(), PathSelection::default(), + Default::default(), + Default::default(), true, &Default::default(), ); @@ -844,6 +874,7 @@ mod tests { name: "test".into(), }, path_selection: PathSelection::default(), + interface_priority: Default::default(), }) .id(); @@ -859,7 +890,7 @@ mod tests { // add address node_map.add_test_addr(node_addr); // make it active - node_map.inner.lock().unwrap().receive_udp(addr); + node_map.inner.lock().unwrap().receive_udp(addr, None); } info!("Adding offline/inactive addresses"); @@ -914,7 +945,7 @@ mod tests { .inner .lock() .unwrap() - .receive_udp(addr) + .receive_udp(addr, None) .expect("registered"); for _ in 0..MAX_INACTIVE_NODES + 1 { diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 957a596198..b161c01cee 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -155,6 +155,7 @@ pub(super) struct Options { pub(super) source: super::Source, #[cfg(any(test, feature = "test-utils"))] pub(super) path_selection: PathSelection, + pub(super) interface_priority: crate::magicsock::InterfacePriority, } impl NodeState { @@ -170,6 +171,9 @@ impl NodeState { let now = Instant::now(); + let mut udp_paths = NodeUdpPaths::new(); + udp_paths.interface_priority = options.interface_priority; + NodeState { id, quic_mapped_addr, @@ -181,7 +185,7 @@ impl NodeState { PathState::new(options.node_id, SendAddr::Relay(url), options.source, now), ) }), - udp_paths: NodeUdpPaths::new(), + udp_paths, sent_pings: HashMap::new(), last_used: options.active.then(Instant::now), last_call_me_maybe: None, @@ -1076,13 +1080,17 @@ impl NodeState { /// Marks this node as having received a UDP payload message. #[cfg(not(wasm_browser))] - pub(super) fn receive_udp(&mut self, addr: IpPort, now: Instant) { + pub(super) fn receive_udp(&mut self, addr: IpPort, interface_name: Option<&str>, now: Instant) { let mut guard = self.udp_paths.access_mut(now); let Some(state) = guard.paths().get_mut(&addr) else { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); return; }; state.receive_payload(now); + // Update interface name if we detected one + if let Some(name) = interface_name { + state.set_interface_name(Some(name.to_string())); + } self.last_used = Some(now); } @@ -1700,6 +1708,8 @@ mod tests { ]), next_id: 5, path_selection: PathSelection::default(), + interface_priority: Default::default(), + ip_to_interface: Default::default(), }); let mut got = node_map.list_remote_infos(later); got.sort_by_key(|p| p.node_id); @@ -1731,6 +1741,7 @@ mod tests { name: "test".into(), }, path_selection: PathSelection::default(), + interface_priority: Default::default(), }; let mut ep = NodeState::new(0, opts); diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 381f04af69..3815d235d6 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -62,6 +62,10 @@ pub(super) struct PathState { /// We keep track of only the latest [`Instant`] for each [`Source`], keeping the size of /// the map of sources down to one entry per type of source. pub(super) sources: HashMap, + /// Network interface name this path uses, if known. + /// + /// Used for interface-based path prioritization (e.g., preferring Ethernet over Wi-Fi). + pub(super) interface_name: Option, } impl PathState { @@ -77,6 +81,7 @@ impl PathState { validity: PathValidity::empty(), last_payload_msg: None, sources, + interface_name: None, } } @@ -97,6 +102,7 @@ impl PathState { validity: PathValidity::empty(), last_payload_msg: Some(now), sources, + interface_name: None, } } @@ -157,9 +163,18 @@ impl PathState { validity, last_payload_msg: None, sources: HashMap::new(), + interface_name: None, } } + pub(super) fn set_interface_name(&mut self, interface_name: Option) { + self.interface_name = interface_name; + } + + pub(super) fn interface_name(&self) -> Option<&str> { + self.interface_name.as_deref() + } + /// Check whether this path is considered active. /// /// Active means the path has received payload messages within the last diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 96cdc3dec2..ca039a5ac7 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -90,6 +90,8 @@ pub(super) struct NodeUdpPaths { /// /// Follows the same logic as `best` above, but doesn't include any IPv6 addresses. best_ipv4: UdpSendAddr, + /// Interface priority configuration for path selection. + pub(super) interface_priority: crate::magicsock::InterfacePriority, } pub(super) struct MutAccess<'a> { @@ -126,6 +128,7 @@ impl NodeUdpPaths { paths, best_ipv4: best, // we only use ipv4 addrs in tests best, + interface_priority: Default::default(), } } @@ -194,25 +197,41 @@ impl NodeUdpPaths { /// If we don't have any addresses, returns [`UdpSendAddr::None`]. /// /// If `have_ipv6` is false, we only search among ipv4 candidates. + /// + /// Path selection considers: + /// 1. Validity (Valid > Outdated > Unconfirmed) + /// 2. Interface priority (if configured via IROH_INTERFACE_PRIORITY) + /// 3. Latency (lower is better) + /// 4. IPv6 preference (tie-breaker) fn best_addr(&self, have_ipv6: bool, now: Instant) -> UdpSendAddr { let Some((ipp, path)) = self .paths .iter() .filter(|(ipp, _)| have_ipv6 || ipp.ip.is_ipv4()) .max_by_key(|(ipp, path)| { - // We find the best by sorting on a key of type (Option>, Option>, bool) - // where the first is set to Some(ReverseOrd(latency)) iff path.is_valid(now) and - // the second is set to Some(ReverseOrd(latency)) if path.is_outdated(now) and - // the third is set to whether the ipp is ipv6. - // This makes max_by_key sort for the lowest valid latency first, then sort for - // the lowest outdated latency second, and if latencies are equal, it'll sort IPv6 paths first. + // Sorting key: (validity_tier, interface_priority, latency, ipv6) + // We use Option> for latencies to sort lowest first + // Interface priority is u32 where higher values = higher priority + let is_ipv6 = ipp.ip.is_ipv6(); + + // Get interface priority for this path + // Try interface_name first (if populated) otherwise fall back to flat weight of 1 + let interface_weight = if let Some(name) = path.interface_name() { + self.interface_priority.weight(name) + } else { + 1 + }; + if let Some(latency) = path.validity.latency_if_valid(now) { - (Some(ReverseOrd(latency)), None, is_ipv6) + // Valid paths: sort by interface priority, then latency + (Some(ReverseOrd(latency)), None, interface_weight, is_ipv6) } else if let Some(latency) = path.validity.latency_if_outdated(now) { - (None, Some(ReverseOrd(latency)), is_ipv6) + // Outdated paths: sort by interface priority, then latency + (None, Some(ReverseOrd(latency)), interface_weight, is_ipv6) } else { - (None, None, is_ipv6) + // Unconfirmed paths: sort by interface priority only + (None, None, interface_weight, is_ipv6) } }) else { @@ -230,9 +249,6 @@ impl NodeUdpPaths { } /// Implements the reverse [`Ord`] implementation for the wrapped type. -/// -/// Literally calls [`std::cmp::Ordering::reverse`] on the inner value's -/// ordering. #[derive(PartialEq, Eq)] struct ReverseOrd(N); From 65dbbad4b65308b12392bf51f0b8bddca6d62976 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Fri, 3 Oct 2025 08:46:58 +0200 Subject: [PATCH 3/3] docs --- iroh/src/lib.rs | 1 + iroh/src/magicsock/interface_priority.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 8a97b9f576..8fa3debfa1 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -275,6 +275,7 @@ pub use iroh_base::{ KeyParsingError, NodeAddr, NodeId, PublicKey, RelayUrl, RelayUrlParseError, SecretKey, }; pub use iroh_relay::{RelayMap, RelayNode, node_info}; +pub use magicsock::InterfacePriority; pub use n0_watcher::Watcher; #[cfg(any(test, feature = "test-utils"))] diff --git a/iroh/src/magicsock/interface_priority.rs b/iroh/src/magicsock/interface_priority.rs index 4f35bac2b5..6941820147 100644 --- a/iroh/src/magicsock/interface_priority.rs +++ b/iroh/src/magicsock/interface_priority.rs @@ -15,7 +15,7 @@ use netwatch::netmon; /// # Examples /// /// ```no_run -/// use iroh::magicsock::InterfacePriority; +/// use iroh::InterfacePriority; /// /// // Prefer Infiniband (ib*) over Ethernet (eth*, en*) /// let priority = InterfacePriority::new(vec![