From 679cc00cac53f6cd86c0f4844c10e3b6a561517d Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Mon, 27 Jan 2025 15:53:19 +0400 Subject: [PATCH] fix(network/socket): capabilities intersection --- Cargo.toml | 2 + benches/Cargo.toml | 2 +- elfo-core/Cargo.toml | 2 +- elfo-network/Cargo.toml | 1 + elfo-network/src/discovery/mod.rs | 8 +- .../src/socket/capabilities/compression.rs | 73 +++++++------------ elfo-network/src/socket/capabilities/mod.rs | 20 +++-- elfo-network/src/socket/handshake.rs | 12 ++- elfo-network/src/socket/mod.rs | 5 +- elfo-telemeter/Cargo.toml | 4 +- elfo-utils/Cargo.toml | 2 +- 11 files changed, 63 insertions(+), 68 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 64435c50..76585b71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,8 @@ metrics = "0.17.1" dashmap = "6.0.1" toml = "0.8.14" bytesize = { version = "1.2.0", features = ["serde"] } +proptest = "1.6" +criterion = "0.5.1" [workspace.dependencies.derive_more] version = "1" diff --git a/benches/Cargo.toml b/benches/Cargo.toml index b85fa242..3eb733c2 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -17,7 +17,7 @@ elfo-utils = { version = "0.2.6", path = "../elfo-utils" } metrics.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } derive_more.workspace = true -criterion = "0.5.1" +criterion.workspace = true futures = "0.3" mimalloc = { version = "0.1.39", default-features = false } jemallocator = "0.5.4" diff --git a/elfo-core/Cargo.toml b/elfo-core/Cargo.toml index 7340fd4d..6c739876 100644 --- a/elfo-core/Cargo.toml +++ b/elfo-core/Cargo.toml @@ -61,8 +61,8 @@ elfo-utils = { version = "0.2.6", path = "../elfo-utils", features = ["test-util tokio = { workspace = true, features = ["full"] } toml.workspace = true +proptest.workspace = true anyhow = "1.0.40" -proptest = "1.4.0" [package.metadata.docs.rs] all-features = true diff --git a/elfo-network/Cargo.toml b/elfo-network/Cargo.toml index a1ed28f9..48a85d3a 100644 --- a/elfo-network/Cargo.toml +++ b/elfo-network/Cargo.toml @@ -41,3 +41,4 @@ turmoil06 = { package = "turmoil", version = "0.6", optional = true } [dev-dependencies] tracing-test = "0.2.4" # TODO: actually unused? +proptest.workspace = true diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index 59f31dc5..68eaffb7 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -209,11 +209,7 @@ impl Discovery { transport: None, }); - info!( - message = "listening for connections", - addr = %transport, - capabilities = %capabilities, - ); + info!(message = "listening for connections", addr = %transport); self.ctx.attach(Stream::from_futures03(stream)); } @@ -254,7 +250,6 @@ impl Discovery { message = "connecting to peer", addr = %transport, role = ?role, - capabilities = %capabilities, ); match socket::connect(&transport, node_no, launch_id, capabilities).await { @@ -300,6 +295,7 @@ impl Discovery { message = "new connection established", socket = %socket.info, peer = %socket.peer, + capabilities = %socket.capabilities, role = msg.role.as_str(), ); diff --git a/elfo-network/src/socket/capabilities/compression.rs b/elfo-network/src/socket/capabilities/compression.rs index fa261e65..e4ac88fa 100644 --- a/elfo-network/src/socket/capabilities/compression.rs +++ b/elfo-network/src/socket/capabilities/compression.rs @@ -78,28 +78,20 @@ impl Compression { }; } - pub(crate) const fn intersection(self, rhs: Self) -> Self { - let we_prefer = self.preferred(); - let we_support = self.supported(); - - let they_prefer = rhs.preferred(); - let they_support = rhs.supported(); - - // Let's see what we both support. - let both_support = we_support.intersection(they_support); - // And if we both prefer something. - let both_prefer = we_prefer.intersection(they_prefer); - - let preferred = if both_prefer.is_empty() { - // if we prefer something that is supported by us and - // the remote node, then it's a deal. - we_prefer.intersection(both_support) - } else { - // We both prefer something! + pub(crate) fn intersection(self, rhs: Self) -> Self { + let supported = self.supported() & rhs.supported(); + let both_prefer = self.preferred() & rhs.preferred(); + let some_prefer = (self.preferred() | rhs.preferred()) & supported; + + // If both nodes prefer the same algorithms, use this set. + // Otherwise, use the set preferred by at least one node. + let preferred = if !both_prefer.is_empty() { both_prefer + } else { + some_prefer }; - Self::new(both_support, preferred) + Self::new(supported, preferred) } } @@ -121,37 +113,24 @@ impl Compression { impl fmt::Display for Compression { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fn write_array( - hide: Option, - algos: Algorithms, - f: &mut fmt::Formatter<'_>, - ) -> fmt::Result { - write!(f, "[")?; - let mut need_comma = false; - for (name, _) in algos - .iter_names() - .filter(|(_, algo)| hide.map_or(true, |hide| hide.contains(*algo))) - { - if need_comma { - write!(f, ", ")?; - } - - f.write_str(name)?; - need_comma = true; + let mut empty = true; + + // Print only preferred algorithms, because they are required to + // actually enable compression. For resolved final capabilities, + // only one of the preferred algorithms should be selected. + // Thus, it will be printed as a single value. + for (name, _) in self.preferred().iter_names() { + if !empty { + f.write_str(", ")?; } - - write!(f, "]") + write!(f, "{name}")?; + empty = false; } - let preferred = self.preferred(); - let supported = self.supported(); + if empty { + f.write_str("None")?; + } - write!(f, "(preferred: ")?; - write_array(None, preferred, f)?; - write!(f, ", supported: ")?; - // Don't show preferred in supported, more compact - // output. - write_array(Some(preferred), supported, f)?; - write!(f, ")") + Ok(()) } } diff --git a/elfo-network/src/socket/capabilities/mod.rs b/elfo-network/src/socket/capabilities/mod.rs index cb22f358..f42a0c87 100644 --- a/elfo-network/src/socket/capabilities/mod.rs +++ b/elfo-network/src/socket/capabilities/mod.rs @@ -32,7 +32,7 @@ impl Capabilities { Self::new(compression) } - pub(crate) const fn intersection(self, rhs: Self) -> Self { + pub(crate) fn intersection(self, rhs: Self) -> Self { let compr = self.compression().intersection(rhs.compression()); Self::new(compr) } @@ -50,18 +50,19 @@ impl Capabilities { impl fmt::Display for Capabilities { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "(compression: {})", self.compression()) + write!(f, "caps(compression={})", self.compression()) } } #[cfg(test)] mod tests { - use super::*; + use proptest::prelude::*; use self::compression::Algorithms; + use super::*; #[test] - fn capabilities_format_is_compatible_with_020alpha17() { + fn format_is_compatible_with_020alpha17() { let caps = Capabilities::new(Compression::new(Algorithms::LZ4, Algorithms::empty())); let lz4_bit = caps.bits() & (1 << 8); @@ -69,7 +70,7 @@ mod tests { } #[test] - fn compression_capabilities_encoded_right_way() { + fn compression_encoded_right_way() { #[track_caller] fn case(create: (Algorithms, Algorithms), expect: (Algorithms, Algorithms)) { let caps = Capabilities::new(Compression::new(create.0, create.1)); @@ -104,4 +105,13 @@ mod tests { (Algorithms::empty(), Algorithms::empty()), ); } + + proptest! { + #[test] + fn intersection_is_commutative(lhs in prop::num::u32::ANY, rhs in prop::num::u32::ANY) { + let lhs = Capabilities::from_bits_truncate(lhs); + let rhs = Capabilities::from_bits_truncate(rhs); + prop_assert_eq!(lhs.intersection(rhs), rhs.intersection(lhs)); + } + } } diff --git a/elfo-network/src/socket/handshake.rs b/elfo-network/src/socket/handshake.rs index 6baf0e3d..ed6c6ffe 100644 --- a/elfo-network/src/socket/handshake.rs +++ b/elfo-network/src/socket/handshake.rs @@ -96,9 +96,15 @@ pub(super) async fn handshake( let version = this_node_handshake .version .min(other_node_handshake.version); - let capabilities = this_node_handshake - .capabilities - .intersection(other_node_handshake.capabilities); + + // Agree on the capabilities that both nodes support. + let this_caps = this_node_handshake.capabilities; + let that_caps = other_node_handshake.capabilities; + let capabilities = this_caps.intersection(that_caps); + + // The intersection must be commutative. In other words, both nodes + // must finally agree on the same capabilities. + assert_eq!(that_caps.intersection(this_caps), capabilities); Ok(Handshake { version, diff --git a/elfo-network/src/socket/mod.rs b/elfo-network/src/socket/mod.rs index 93b224b4..99d86000 100644 --- a/elfo-network/src/socket/mod.rs +++ b/elfo-network/src/socket/mod.rs @@ -25,15 +25,15 @@ pub(crate) use self::capabilities::{ Capabilities, }; +mod capabilities; mod handshake; mod idleness; mod raw; -mod capabilities; - pub(crate) struct Socket { pub(crate) info: raw::SocketInfo, pub(crate) peer: Peer, + pub(crate) capabilities: Capabilities, pub(crate) read: ReadHalf, pub(crate) write: WriteHalf, pub(crate) idle: IdleTracker, @@ -63,6 +63,7 @@ impl Socket { Self { info: raw.info, peer: Peer::new(handshake.node_no, handshake.launch_id), + capabilities: handshake.capabilities, read: ReadHalf::new(framed_read, raw.read, idle_track), write: WriteHalf::new(framed_write, raw.write), idle: idle_tracker, diff --git a/elfo-telemeter/Cargo.toml b/elfo-telemeter/Cargo.toml index 3922160c..e8512d79 100644 --- a/elfo-telemeter/Cargo.toml +++ b/elfo-telemeter/Cargo.toml @@ -46,5 +46,5 @@ elfo-configurer = { path = "../elfo-configurer" } tokio = { workspace = true, features = ["rt-multi-thread"] } toml.workspace = true -criterion = "0.5.1" -proptest = "1.4" +criterion.workspace = true +proptest.workspace = true diff --git a/elfo-utils/Cargo.toml b/elfo-utils/Cargo.toml index 03f726e6..8d5bf711 100644 --- a/elfo-utils/Cargo.toml +++ b/elfo-utils/Cargo.toml @@ -26,4 +26,4 @@ quanta = "0.12" crossbeam-utils = "0.8" [dev-dependencies] -criterion = "0.5.1" +criterion.workspace = true