Skip to content

Commit d1b3582

Browse files
pablorfb-metafacebook-github-bot
authored andcommitted
Mailbox send/recv (#940)
Summary: Measures: - [Throughput] How much data can be processed per sec ? - [Parallelism] How long does it take to process a burst of X 1KB msg/sec ? # Mailbox Send/Receive | Payload Size | Throughput (GiB/s) | Throughput Change % | Time Min (ms) | Time Median (ms) | Time Max (ms) | |---------------|--------------------|---------------------|---------------|------------------|---------------| | 10,000 | 7.7868 | 0.00% | 0.0011907 | 0.0011960 | 0.0012014 | | 100,000 | 7.4149 | -4.77% | 0.012444 | 0.012560 | 0.012677 | | 1,000,000 | 2.3116 | -70.30% | 0.39953 | 0.40289 | 0.40668 | | 10,000,000 | 1.7613 | -77.37% | 5.2436 | 5.2877 | 5.3270 | | 100,000,000 | 1.6510 | -78.78% | 56.264 | 56.411 | 56.573 | | 1,000,000,000 | 1.6439 | -78.88% | 565.78 | 566.54 | 567.21 | # Mailbox Message Rate | Rate | Time Min (ms) | Time Median (ms) | Time Max (ms) | |---------|---------------|------------------|---------------| | 100 mps | 0.23001 | 0.23246 | 0.23559 | | 1000 mps| 2.1806 | 2.1931 | 2.2075 | | 5000 mps| 10.634 | 10.788 | 10.963 | Differential Revision: D80209682
1 parent 3ebb329 commit d1b3582

File tree

1 file changed

+137
-8
lines changed

1 file changed

+137
-8
lines changed

hyperactor/benches/main.rs

Lines changed: 137 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ use hyperactor::channel::Rx;
2626
use hyperactor::channel::Tx;
2727
use hyperactor::channel::dial;
2828
use hyperactor::channel::serve;
29+
use hyperactor::mailbox::Mailbox;
30+
use hyperactor::mailbox::PortSender;
31+
use hyperactor::mailbox::monitored_return_handle;
32+
use hyperactor::reference::ActorId;
33+
use hyperactor::reference::ProcId;
34+
use hyperactor::reference::WorldId;
2935
use serde::Deserialize;
3036
use serde::Serialize;
3137
use tokio::runtime;
@@ -56,6 +62,7 @@ impl Message {
5662
}
5763
}
5864

65+
// CHANNEL
5966
// Benchmark message sizes
6067
fn bench_message_sizes(c: &mut Criterion) {
6168
let transports = vec![
@@ -148,13 +155,12 @@ fn bench_message_rates(c: &mut Criterion) {
148155
}
149156

150157
let handle = tokio::spawn(async move {
151-
select! {
152-
_ = return_receiver => {},
153-
_ = tokio::time::sleep(Duration::from_millis(5000)) => {
154-
panic!("Did not get ack within timeout");
155-
156-
}
157-
}
158+
_ = tokio::time::timeout(
159+
Duration::from_millis(5000),
160+
return_receiver,
161+
)
162+
.await
163+
.unwrap();
158164
});
159165

160166
response_handlers.push(handle);
@@ -234,11 +240,134 @@ async fn channel_ping_pong(
234240
start.elapsed()
235241
}
236242

243+
// MAILBOX
244+
245+
fn bench_mailbox_message_sizes(c: &mut Criterion) {
246+
let sizes: Vec<usize> = vec![
247+
10_000,
248+
100_000,
249+
1_000_000,
250+
10_000_000,
251+
100_000_000,
252+
1_000_000_000,
253+
];
254+
255+
for size in sizes {
256+
let mut group = c.benchmark_group("mailbox_send_receive".to_string());
257+
group.throughput(Throughput::Bytes(size as u64));
258+
group.sampling_mode(criterion::SamplingMode::Flat);
259+
group.sample_size(10);
260+
group.bench_function(BenchmarkId::from_parameter(size), move |b| {
261+
let mut b = b.to_async(Runtime::new().unwrap());
262+
b.iter_custom(|iters| async move {
263+
let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
264+
let actor_id = ActorId(proc_id, "actor".to_string(), 0);
265+
let mbox = Mailbox::new_detached(actor_id);
266+
let (port, mut receiver) = mbox.open_port::<Message>();
267+
let port = port.bind();
268+
269+
let msg = Message::new(0, size);
270+
let start = Instant::now();
271+
for _ in 0..iters {
272+
mbox.serialize_and_send(&port, msg.clone(), monitored_return_handle())
273+
.unwrap();
274+
receiver.recv().await.unwrap();
275+
}
276+
start.elapsed()
277+
});
278+
});
279+
group.finish();
280+
}
281+
}
282+
283+
// Benchmark message rates for mailbox
284+
fn bench_mailbox_message_rates(c: &mut Criterion) {
285+
let mut group = c.benchmark_group("mailbox_message_rates");
286+
let rates = vec![100, 1000, 5000];
287+
let payload_size = 1024; // 1KB payload
288+
289+
for rate in &rates {
290+
let rate = *rate;
291+
group.bench_function(format!("rate_{}mps", rate), move |b| {
292+
let mut b = b.to_async(Runtime::new().unwrap());
293+
b.iter_custom(|iters| async move {
294+
let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
295+
let actor_id = ActorId(proc_id, "actor".to_string(), 0);
296+
let mbox = Mailbox::new_detached(actor_id);
297+
let (port, mut receiver) = mbox.open_port::<Message>();
298+
let port = port.bind();
299+
300+
// Spawn a task to receive messages
301+
let total_msgs = iters * rate;
302+
let receiver_task = tokio::spawn(async move {
303+
let mut received_count = 0;
304+
while received_count < total_msgs {
305+
match receiver.recv().await {
306+
Ok(_) => received_count += 1,
307+
Err(e) => {
308+
panic!("Error receiving message: {}", e);
309+
}
310+
}
311+
}
312+
});
313+
314+
let message = Message::new(0, payload_size);
315+
let start = Instant::now();
316+
317+
for _ in 0..iters {
318+
let mut response_handlers: Vec<tokio::task::JoinHandle<()>> =
319+
Vec::with_capacity(rate as usize);
320+
321+
for _ in 0..rate {
322+
let (return_sender, return_receiver) = oneshot::channel();
323+
let msg_clone = message.clone();
324+
let port_clone = port.clone();
325+
let mbox_clone = mbox.clone();
326+
327+
let handle = tokio::spawn(async move {
328+
mbox_clone
329+
.serialize_and_send(
330+
&port_clone,
331+
msg_clone,
332+
monitored_return_handle(),
333+
)
334+
.unwrap();
335+
let _ = return_sender.send(());
336+
337+
let _ =
338+
tokio::time::timeout(Duration::from_millis(5000), return_receiver)
339+
.await
340+
.expect("Timed out waiting for return message");
341+
});
342+
343+
response_handlers.push(handle);
344+
345+
let delay_ms = if rate > 0 { 1000 / rate } else { 0 };
346+
let elapsed = start.elapsed().as_millis();
347+
let effective_delay = (delay_ms as u128).saturating_sub(elapsed);
348+
if effective_delay > 0 {
349+
tokio::time::sleep(Duration::from_millis(effective_delay as u64)).await;
350+
}
351+
}
352+
join_all(response_handlers).await;
353+
}
354+
355+
receiver_task.await.unwrap();
356+
start.elapsed()
357+
});
358+
});
359+
}
360+
361+
group.finish();
362+
}
363+
237364
criterion_group!(
238365
benches,
239366
bench_message_sizes,
240367
bench_message_rates,
241-
bench_channel_ping_pong
368+
bench_mailbox_message_sizes,
369+
bench_mailbox_message_rates,
370+
bench_channel_ping_pong,
242371
);
243372

244373
criterion_main!(benches);

0 commit comments

Comments
 (0)