Skip to content

net: framed: property testing FrameWrite is cancel safe #939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hyperactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ rustls-pemfile = "1.0.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
serde_with = { version = "3", features = ["hex", "json"] }
serde_yaml = "0.9.25"
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
Expand All @@ -69,6 +70,7 @@ unicode-ident = "1.0.12"

[dev-dependencies]
maplit = "1.0"
proptest = "1.5"
timed_test = { version = "0.0.0", path = "../timed_test" }
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
20 changes: 13 additions & 7 deletions hyperactor/benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use hyperactor::reference::ProcId;
use hyperactor::reference::WorldId;
use serde::Deserialize;
use serde::Serialize;
use serde_multipart::Part;
use tokio::runtime;
use tokio::runtime::Runtime;
use tokio::select;
Expand Down Expand Up @@ -208,12 +209,17 @@ async fn channel_ping_pong(
message_size: usize,
num_iter: usize,
) -> Duration {
let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
.await
.unwrap();
let (server_addr, mut server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
.await
.unwrap();
#[derive(Clone, Debug, Named, Serialize, Deserialize)]
struct Message(Part);

let (client_addr, mut client_rx) =
channel::serve::<Message>(ChannelAddr::any(transport.clone()))
.await
.unwrap();
let (server_addr, mut server_rx) =
channel::serve::<Message>(ChannelAddr::any(transport.clone()))
.await
.unwrap();

let _server_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
tokio::spawn(async move {
Expand All @@ -227,7 +233,7 @@ async fn channel_ping_pong(
let client_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
tokio::spawn(async move {
let server_tx = channel::dial(server_addr)?;
let message = Bytes::from(vec![0u8; message_size]);
let message = Message(Part::from(vec![0u8; message_size]));
for _ in 0..num_iter {
server_tx.post(message.clone() /*cheap */);
client_rx.recv().await?;
Expand Down
26 changes: 17 additions & 9 deletions hyperactor/example/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@
*/

use bytes::Bytes;
use hyperactor::Named;
use hyperactor::channel;
use hyperactor::channel::ChannelAddr;
use hyperactor::channel::ChannelRx;
use hyperactor::channel::ChannelTransport;
use hyperactor::channel::Rx;
use hyperactor::channel::Tx;
use serde::Deserialize;
use serde::Serialize;
use tokio::time::Duration;
use tokio::time::Instant;

#[derive(Clone, Debug, Named, Serialize, Deserialize)]
struct Message(serde_multipart::Part);

async fn server(
mut server_rx: ChannelRx<Bytes>,
mut server_rx: ChannelRx<Message>,
client_addr: ChannelAddr,
) -> Result<(), anyhow::Error> {
let client_tx = channel::dial(client_addr)?;
Expand All @@ -35,19 +41,21 @@ async fn main() -> Result<(), anyhow::Error> {
let transport = ChannelTransport::Tcp;
// let transport = ChannelTransport::Local;
let message_size = 1_000_000;
let num_iter = 100;
let num_iter = 1000;

let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
.await
.unwrap();
let (server_addr, server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
let (client_addr, mut client_rx) =
channel::serve::<Message>(ChannelAddr::any(transport.clone()))
.await
.unwrap();
let (server_addr, server_rx) = channel::serve::<Message>(ChannelAddr::any(transport.clone()))
.await
.unwrap();

let _server_handle = tokio::spawn(server(server_rx, client_addr));

let server_tx = channel::dial(server_addr)?;
let message = Bytes::from(vec![0u8; message_size]);

let message = Message(serde_multipart::Part::from(vec![0u8; message_size]));

for _ in 0..10 {
// Warmup
Expand All @@ -62,10 +70,10 @@ async fn main() -> Result<(), anyhow::Error> {

let start = Instant::now();
for _ in 0..num_iter {
total_bytes_sent += message.len();
total_bytes_sent += message.0.len();
let start = Instant::now();
server_tx.post(message.clone() /*cheap */);
total_bytes_received += client_rx.recv().await?.len();
total_bytes_received += client_rx.recv().await?.0.len();
latencies.push(start.elapsed());
}
let elapsed = start.elapsed();
Expand Down
4 changes: 4 additions & 0 deletions hyperactor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub enum ChannelError {
#[error(transparent)]
Bincode(#[from] Box<bincode::ErrorKind>),

/// Data encoding errors.
#[error(transparent)]
Data(#[from] crate::data::Error),

/// Some other error.
#[error(transparent)]
Other(#[from] anyhow::Error),
Expand Down
Loading
Loading