Skip to content

Commit

Permalink
fix(network/socket): capabilities intersection
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jan 27, 2025
1 parent 4b74c95 commit 679cc00
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 68 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ metrics = "0.17.1"
dashmap = "6.0.1"
toml = "0.8.14"
bytesize = { version = "1.2.0", features = ["serde"] }
proptest = "1.6"
criterion = "0.5.1"

[workspace.dependencies.derive_more]
version = "1"
Expand Down
2 changes: 1 addition & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ elfo-utils = { version = "0.2.6", path = "../elfo-utils" }
metrics.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
derive_more.workspace = true
criterion = "0.5.1"
criterion.workspace = true
futures = "0.3"
mimalloc = { version = "0.1.39", default-features = false }
jemallocator = "0.5.4"
Expand Down
2 changes: 1 addition & 1 deletion elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ elfo-utils = { version = "0.2.6", path = "../elfo-utils", features = ["test-util

tokio = { workspace = true, features = ["full"] }
toml.workspace = true
proptest.workspace = true
anyhow = "1.0.40"
proptest = "1.4.0"

[package.metadata.docs.rs]
all-features = true
Expand Down
1 change: 1 addition & 0 deletions elfo-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ turmoil06 = { package = "turmoil", version = "0.6", optional = true }

[dev-dependencies]
tracing-test = "0.2.4" # TODO: actually unused?
proptest.workspace = true
8 changes: 2 additions & 6 deletions elfo-network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,7 @@ impl Discovery {
transport: None,
});

info!(
message = "listening for connections",
addr = %transport,
capabilities = %capabilities,
);
info!(message = "listening for connections", addr = %transport);

self.ctx.attach(Stream::from_futures03(stream));
}
Expand Down Expand Up @@ -254,7 +250,6 @@ impl Discovery {
message = "connecting to peer",
addr = %transport,
role = ?role,
capabilities = %capabilities,
);

match socket::connect(&transport, node_no, launch_id, capabilities).await {
Expand Down Expand Up @@ -300,6 +295,7 @@ impl Discovery {
message = "new connection established",
socket = %socket.info,
peer = %socket.peer,
capabilities = %socket.capabilities,
role = msg.role.as_str(),
);

Expand Down
73 changes: 26 additions & 47 deletions elfo-network/src/socket/capabilities/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,20 @@ impl Compression {
};
}

pub(crate) const fn intersection(self, rhs: Self) -> Self {
let we_prefer = self.preferred();
let we_support = self.supported();

let they_prefer = rhs.preferred();
let they_support = rhs.supported();

// Let's see what we both support.
let both_support = we_support.intersection(they_support);
// And if we both prefer something.
let both_prefer = we_prefer.intersection(they_prefer);

let preferred = if both_prefer.is_empty() {
// if we prefer something that is supported by us and
// the remote node, then it's a deal.
we_prefer.intersection(both_support)
} else {
// We both prefer something!
pub(crate) fn intersection(self, rhs: Self) -> Self {
let supported = self.supported() & rhs.supported();
let both_prefer = self.preferred() & rhs.preferred();
let some_prefer = (self.preferred() | rhs.preferred()) & supported;

// If both nodes prefer the same algorithms, use this set.
// Otherwise, use the set preferred by at least one node.
let preferred = if !both_prefer.is_empty() {
both_prefer
} else {
some_prefer
};

Self::new(both_support, preferred)
Self::new(supported, preferred)
}
}

Expand All @@ -121,37 +113,24 @@ impl Compression {

impl fmt::Display for Compression {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn write_array(
hide: Option<Algorithms>,
algos: Algorithms,
f: &mut fmt::Formatter<'_>,
) -> fmt::Result {
write!(f, "[")?;
let mut need_comma = false;
for (name, _) in algos
.iter_names()
.filter(|(_, algo)| hide.map_or(true, |hide| hide.contains(*algo)))
{
if need_comma {
write!(f, ", ")?;
}

f.write_str(name)?;
need_comma = true;
let mut empty = true;

// Print only preferred algorithms, because they are required to
// actually enable compression. For resolved final capabilities,
// only one of the preferred algorithms should be selected.
// Thus, it will be printed as a single value.
for (name, _) in self.preferred().iter_names() {
if !empty {
f.write_str(", ")?;
}

write!(f, "]")
write!(f, "{name}")?;
empty = false;
}

let preferred = self.preferred();
let supported = self.supported();
if empty {
f.write_str("None")?;
}

write!(f, "(preferred: ")?;
write_array(None, preferred, f)?;
write!(f, ", supported: ")?;
// Don't show preferred in supported, more compact
// output.
write_array(Some(preferred), supported, f)?;
write!(f, ")")
Ok(())
}
}
20 changes: 15 additions & 5 deletions elfo-network/src/socket/capabilities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Capabilities {
Self::new(compression)
}

pub(crate) const fn intersection(self, rhs: Self) -> Self {
pub(crate) fn intersection(self, rhs: Self) -> Self {
let compr = self.compression().intersection(rhs.compression());
Self::new(compr)
}
Expand All @@ -50,26 +50,27 @@ impl Capabilities {

impl fmt::Display for Capabilities {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "(compression: {})", self.compression())
write!(f, "caps(compression={})", self.compression())
}
}

#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;

use self::compression::Algorithms;
use super::*;

#[test]
fn capabilities_format_is_compatible_with_020alpha17() {
fn format_is_compatible_with_020alpha17() {
let caps = Capabilities::new(Compression::new(Algorithms::LZ4, Algorithms::empty()));
let lz4_bit = caps.bits() & (1 << 8);

assert_eq!(lz4_bit, 1 << 8);
}

#[test]
fn compression_capabilities_encoded_right_way() {
fn compression_encoded_right_way() {
#[track_caller]
fn case(create: (Algorithms, Algorithms), expect: (Algorithms, Algorithms)) {
let caps = Capabilities::new(Compression::new(create.0, create.1));
Expand Down Expand Up @@ -104,4 +105,13 @@ mod tests {
(Algorithms::empty(), Algorithms::empty()),
);
}

proptest! {
#[test]
fn intersection_is_commutative(lhs in prop::num::u32::ANY, rhs in prop::num::u32::ANY) {
let lhs = Capabilities::from_bits_truncate(lhs);
let rhs = Capabilities::from_bits_truncate(rhs);
prop_assert_eq!(lhs.intersection(rhs), rhs.intersection(lhs));
}
}
}
12 changes: 9 additions & 3 deletions elfo-network/src/socket/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,15 @@ pub(super) async fn handshake(
let version = this_node_handshake
.version
.min(other_node_handshake.version);
let capabilities = this_node_handshake
.capabilities
.intersection(other_node_handshake.capabilities);

// Agree on the capabilities that both nodes support.
let this_caps = this_node_handshake.capabilities;
let that_caps = other_node_handshake.capabilities;
let capabilities = this_caps.intersection(that_caps);

// The intersection must be commutative. In other words, both nodes
// must finally agree on the same capabilities.
assert_eq!(that_caps.intersection(this_caps), capabilities);

Ok(Handshake {
version,
Expand Down
5 changes: 3 additions & 2 deletions elfo-network/src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ pub(crate) use self::capabilities::{
Capabilities,
};

mod capabilities;
mod handshake;
mod idleness;
mod raw;

mod capabilities;

pub(crate) struct Socket {
pub(crate) info: raw::SocketInfo,
pub(crate) peer: Peer,
pub(crate) capabilities: Capabilities,
pub(crate) read: ReadHalf,
pub(crate) write: WriteHalf,
pub(crate) idle: IdleTracker,
Expand Down Expand Up @@ -63,6 +63,7 @@ impl Socket {
Self {
info: raw.info,
peer: Peer::new(handshake.node_no, handshake.launch_id),
capabilities: handshake.capabilities,
read: ReadHalf::new(framed_read, raw.read, idle_track),
write: WriteHalf::new(framed_write, raw.write),
idle: idle_tracker,
Expand Down
4 changes: 2 additions & 2 deletions elfo-telemeter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ elfo-configurer = { path = "../elfo-configurer" }

tokio = { workspace = true, features = ["rt-multi-thread"] }
toml.workspace = true
criterion = "0.5.1"
proptest = "1.4"
criterion.workspace = true
proptest.workspace = true
2 changes: 1 addition & 1 deletion elfo-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ quanta = "0.12"
crossbeam-utils = "0.8"

[dev-dependencies]
criterion = "0.5.1"
criterion.workspace = true

0 comments on commit 679cc00

Please sign in to comment.