Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network): divide compression algorithm support in three categories #150

Merged
merged 5 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions elfo-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,65 @@ pub struct Config {
pub idle_timeout: Duration,
}

/// Preference.
#[derive(Debug, Clone, Copy, Default, Deserialize)]
pub enum Preference {
/// This is preferred, implies [`Preference::Supported`].
Preferred,

/// This is just supported.
#[default]
Supported,

/// Must not be used.
Disabled,
}

/// Options for the specific algorithm.
#[derive(Debug, Deserialize, Clone)]
pub struct Algorithm {
/// Preference when deciding which algorithm to use in
/// communication between nodes.
pub preference: Preference,
}

/// Options for the compression algorithms.
#[derive(Debug, Deserialize, Clone)]
pub struct CompressionAlgorithms {
/// LZ4 compression algorithm.
#[serde(default = "CompressionAlgorithms::lz4_default")]
pub lz4: Algorithm,
}

impl CompressionAlgorithms {
const fn lz4_default() -> Algorithm {
Algorithm {
preference: Preference::Supported,
}
}
}

impl Default for CompressionAlgorithms {
fn default() -> Self {
Self {
lz4: Self::lz4_default(),
}
}
}

/// Compression settings.
#[derive(Debug, Default, Deserialize, Clone)]
pub struct CompressionConfig {
/// Compression algorithm.
/// Compression algorithms.
///
/// Example:
/// ```toml
/// algorithms.lz4 = { preference = "Preferred" }
/// ```
///
/// Preferred implies supported.
#[serde(default)]
pub algorithm: CompressionAlgorithm,
}

/// Compression algorithms.
#[derive(Debug, Default, PartialEq, Eq, Deserialize, Clone)]
pub enum CompressionAlgorithm {
/// LZ4 with default compression level.
Lz4,
/// Compression disabled.
#[default]
None,
pub algorithms: CompressionAlgorithms,
}

fn default_ping_interval() -> Duration {
Expand Down
29 changes: 22 additions & 7 deletions elfo-network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use elfo_core::{

use crate::{
codec::format::{NetworkAddr, NetworkEnvelope, NetworkEnvelopePayload},
config::{self, CompressionAlgorithm, Transport},
config::{self, Transport},
node_map::{NodeInfo, NodeMap},
protocol::{internode, DataConnectionFailed, GroupInfo, HandleConnection},
socket::{self, ReadError, Socket},
Expand Down Expand Up @@ -135,12 +135,21 @@ impl Discovery {
Ok(())
}

fn get_compression(&self) -> socket::Compression {
use socket::Algorithms as Algos;

let mut compression = socket::Compression::empty();
let cfg = &self.cfg.compression.algorithms;

compression.toggle(Algos::LZ4, cfg.lz4.preference);

compression
}

fn get_capabilities(&self) -> socket::Capabilities {
let mut capabilities = socket::Capabilities::empty();
if self.cfg.compression.algorithm == CompressionAlgorithm::Lz4 {
capabilities |= socket::Capabilities::LZ4;
}
capabilities
let compression = self.get_compression();

socket::Capabilities::new(compression)
}

fn on_update_config(&mut self) {
Expand Down Expand Up @@ -206,6 +215,7 @@ impl Discovery {
info!(
message = "listening for connections",
addr = %transport,
capabilities = %capabilities,
);

self.ctx.attach(Stream::from_futures03(stream));
Expand Down Expand Up @@ -243,7 +253,12 @@ impl Discovery {

self.ctx.attach(Stream::once(async move {
loop {
debug!(message = "connecting to peer", addr = %transport, role = ?role);
debug!(
message = "connecting to peer",
addr = %transport,
role = ?role,
capabilities = %capabilities,
);

match socket::connect(&transport, node_no, launch_id, capabilities).await {
Ok(socket) => {
Expand Down
157 changes: 157 additions & 0 deletions elfo-network/src/socket/capabilities/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Layouts are specified from highest to lowest bits.

use std::fmt;

use crate::config::Preference;

bitflags::bitflags! {
/// Set of algorithms. 24 bits.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct Algorithms: u32 {
const LZ4 = 1;
// NB: Shift by 2: `const ZSTD = 1 << 2;`.
}
}

/// Compression capabilities.
///
/// Layout:
/// ```text
/// 22 2
/// ┌────────────┬─────┐
/// │ Reserved │ Lz4 │
/// └────────────┴─────┘
/// ```
///
/// Each mentioned algorithm here occupies two bits for a reason, here's the
/// layout of those bits:
/// ```text
/// 1 1
/// ┌───────────┬───────────┐
/// │ Preferred │ Supported │
/// └───────────┴───────────┘
/// ```
///
/// 1. Preferred - the compression algorithm is preferred, implies `Supported`.
/// 2. Supported - the compression algorithm is supported.
#[derive(Debug, Clone, Copy)]
pub(crate) struct Compression(u32);

impl Compression {
pub(crate) const fn empty() -> Self {
Self::new(Algorithms::empty(), Algorithms::empty())
}

pub(super) const fn from_bits_unchecked(v: u32) -> Self {
Self(v)
}

pub(crate) const fn from_bits_truncate(v: u32) -> Self {
let supported = Algorithms::from_bits_truncate(v);
let preferred = Algorithms::from_bits_truncate(v >> 1);

Self::new(supported, preferred)
}

pub(crate) const fn new(supported: Algorithms, preferred: Algorithms) -> Self {
let preferred = preferred.bits();
// Preferred implies supported.
let supported = supported.bits() | preferred;

// 0 1 0 1 | Supported
// 1 0 1 0 | Preferred
// -------
// 1 1 1 1
let joined = supported | (preferred << 1);

Self(joined)
}

pub(crate) fn toggle(&mut self, algos: Algorithms, pref: Preference) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, It's not a "toggle", because this code does not work as expected for toggle(Preferred); toggle(Disabled) sequence.

let preferred = self.preferred();
let supported = self.supported();

*self = match pref {
Preference::Preferred => Self::new(supported, preferred | algos),
Preference::Supported => Self::new(supported | algos, preferred),
Preference::Disabled => *self,
};
}

pub(crate) const fn intersection(self, rhs: Self) -> Self {
let we_prefer = self.preferred();
let we_support = self.supported();

let they_prefer = rhs.preferred();
let they_support = rhs.supported();

// Let's see what we both support.
let both_support = we_support.intersection(they_support);
// And if we both prefer something.
let both_prefer = we_prefer.intersection(they_prefer);

let preferred = if both_prefer.is_empty() {
// if we prefer something that is supported by us and
// the remote node, then it's a deal.
we_prefer.intersection(both_support)
} else {
// We both prefer something!
both_prefer
};

Self::new(both_support, preferred)
}
}

impl Compression {
pub(crate) const fn bits(self) -> u32 {
self.0
}

pub(crate) const fn supported(self) -> Algorithms {
// `preferred` bits would be discarded.
Algorithms::from_bits_truncate(self.0)
}

pub(crate) const fn preferred(self) -> Algorithms {
// `supported` bits would be discarded.
Algorithms::from_bits_truncate(self.0 >> 1)
}
}

impl fmt::Display for Compression {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn write_array(
hide: Option<Algorithms>,
algos: Algorithms,
f: &mut fmt::Formatter<'_>,
) -> fmt::Result {
write!(f, "[")?;
let mut need_comma = false;
for (name, _) in algos
.iter_names()
.filter(|(_, algo)| hide.map_or(true, |hide| hide.contains(*algo)))
{
if need_comma {
write!(f, ", ")?;
}

f.write_str(name)?;
need_comma = true;
}

write!(f, "]")
}

let preferred = self.preferred();
let supported = self.supported();

write!(f, "(preferred: ")?;
write_array(None, preferred, f)?;
write!(f, ", supported: ")?;
// Don't show preferred in supported, more compact
// output.
write_array(Some(preferred), supported, f)?;
write!(f, ")")
}
}
Loading
Loading