Skip to content

Commit

Permalink
Improves the remote protocol a bit, converts test main/client to remo…
Browse files Browse the repository at this point in the history
…te benchmark. (#4)

* Prefix network packets with their length for less allocations during read.

* Implement more conversions for dests.

* Create benchmark harness for remote protocol.
  • Loading branch information
dtzxporter authored May 31, 2024
1 parent 4b8564d commit 771be02
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 10 deletions.
29 changes: 25 additions & 4 deletions hydra-test-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use hydra::Message;
use hydra::Node;
use hydra::NodeOptions;
use hydra::Pid;
use hydra::Process;

#[hydra::main]
Expand All @@ -23,9 +24,29 @@ async fn main() {

println!("Nodes: {:?}", Node::list());

loop {
let message: Message<()> = Process::receive().await;

println!("Got message: {:?}", message);
for i in 0..8 {
Process::spawn(async move {
let pid = Process::current();
let mut pid2: Option<Pid> = None;

loop {
if let Some(pid2) = pid2 {
Process::send(pid2, pid);
} else {
Process::send(
(format!("bench-receive{}", i), ("hydra-test-main", address)),
pid,
);
}

let pidd = Process::receive::<Pid>().await;

if let Message::User(pidd) = pidd {
pid2 = Some(pidd);
}
}
});
}

Process::sleep(Duration::from_secs(1000)).await;
}
1 change: 1 addition & 0 deletions hydra-test-main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ publish = false
hydra = { path = "../hydra", default-features = false, features = ["tracing", "macros"] }

serde.workspace = true
tracing.workspace = true
45 changes: 44 additions & 1 deletion hydra-test-main/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use std::net::SocketAddr;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;

use hydra::Message;
use hydra::Node;
use hydra::NodeOptions;
use hydra::Pid;
use hydra::Process;

static COUNTER: AtomicU64 = AtomicU64::new(0);

#[hydra::main]
async fn main() {
Node::start(
Expand All @@ -14,5 +21,41 @@ async fn main() {
.broadcast_address("127.0.0.1:1337".parse::<SocketAddr>().unwrap()),
);

Process::sleep(Duration::from_secs(1000)).await;
for i in 0..8 {
Process::spawn(async move {
Process::register(Process::current(), format!("bench-receive{}", i))
.expect("Failed to register process!");

let pid = Process::current();

loop {
let Message::User(pid2) = Process::receive::<Pid>().await else {
panic!()
};

COUNTER.fetch_add(1, Ordering::Relaxed);

Process::send(pid2, pid);
}
});
}

let mut start = Instant::now();

loop {
Process::sleep(Duration::from_secs(1)).await;

let elapsed = start.elapsed();
let count = COUNTER.load(Ordering::Relaxed);

if count == 0 {
tracing::info!("Waiting for first message...");
start = Instant::now();
continue;
}

let ops = count / elapsed.as_secs().max(1);

tracing::info!("Msg/s: {}", ops);
}
}
18 changes: 18 additions & 0 deletions hydra/src/dest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ impl From<String> for Dest {
}
}

impl<T> From<(String, T)> for Dest
where
T: Into<Node>,
{
fn from(value: (String, T)) -> Self {
Self::Named(value.0.into(), value.1.into())
}
}

impl<T> From<(&'static str, T)> for Dest
where
T: Into<Node>,
Expand Down Expand Up @@ -152,6 +161,15 @@ impl From<&'static str> for Dests {
}
}

impl<T> From<(String, T)> for Dests
where
T: Into<Node>,
{
fn from(value: (String, T)) -> Self {
Self::Dest(Dest::Named(value.0.into(), value.1.into()))
}
}

impl<T> From<(&'static str, T)> for Dests
where
T: Into<Node>,
Expand Down
29 changes: 24 additions & 5 deletions hydra/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const FRAME_CONFIG: Configuration<LittleEndian, Fixint> = config::standard()
.with_fixed_int_encoding()
.with_little_endian();

/// The size of the marker.
const MARKER_LENGTH: usize = std::mem::size_of::<u32>();

/// A frame value for the codec.
#[derive(Debug, Encode, Decode)]
pub enum Frame {
Expand Down Expand Up @@ -134,8 +137,13 @@ impl Encoder<Frame> for Codec {
type Error = Error;

fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
bincode::encode_into_std_write(item, &mut dst.writer(), FRAME_CONFIG)
dst.put_u32_le(0);

let size = bincode::encode_into_std_write(item, &mut dst.writer(), FRAME_CONFIG)
.map_err(|e| Error::new(ErrorKind::InvalidInput, e))?;

dst[0..MARKER_LENGTH].copy_from_slice(&(size as u32).to_le_bytes());

Ok(())
}
}
Expand All @@ -145,15 +153,26 @@ impl Decoder for Codec {
type Error = Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
if src.len() < 4 {
return Ok(None);
}

let mut length_marker = [0u8; MARKER_LENGTH];

length_marker.copy_from_slice(&src[0..MARKER_LENGTH]);

let length = u32::from_le_bytes(length_marker) as usize;

if src.len() < MARKER_LENGTH + length {
src.reserve(MARKER_LENGTH + length - src.len());
return Ok(None);
}

let result = bincode::decode_from_slice(src, FRAME_CONFIG);
let result = bincode::decode_from_slice(&src[4..], FRAME_CONFIG);

match result {
Ok((frame, length)) => {
src.advance(length);
Ok((frame, _)) => {
src.advance(MARKER_LENGTH + length);
Ok(Some(frame))
}
Err(DecodeError::UnexpectedEnd { additional }) => {
Expand Down

0 comments on commit 771be02

Please sign in to comment.