@@ -26,6 +26,12 @@ use hyperactor::channel::Rx;
26
26
use hyperactor:: channel:: Tx ;
27
27
use hyperactor:: channel:: dial;
28
28
use hyperactor:: channel:: serve;
29
+ use hyperactor:: mailbox:: Mailbox ;
30
+ use hyperactor:: mailbox:: PortSender ;
31
+ use hyperactor:: mailbox:: monitored_return_handle;
32
+ use hyperactor:: reference:: ActorId ;
33
+ use hyperactor:: reference:: ProcId ;
34
+ use hyperactor:: reference:: WorldId ;
29
35
use serde:: Deserialize ;
30
36
use serde:: Serialize ;
31
37
use tokio:: runtime;
@@ -56,6 +62,7 @@ impl Message {
56
62
}
57
63
}
58
64
65
+ // CHANNEL
59
66
// Benchmark message sizes
60
67
fn bench_message_sizes ( c : & mut Criterion ) {
61
68
let transports = vec ! [
@@ -234,11 +241,136 @@ async fn channel_ping_pong(
234
241
start. elapsed ( )
235
242
}
236
243
244
+ // MAILBOX
245
+
246
+ fn bench_mailbox_message_sizes ( c : & mut Criterion ) {
247
+ let sizes: Vec < usize > = vec ! [
248
+ 10_000 ,
249
+ 100_000 ,
250
+ 1_000_000 ,
251
+ 10_000_000 ,
252
+ 100_000_000 ,
253
+ 1_000_000_000 ,
254
+ ] ;
255
+
256
+ for size in sizes {
257
+ let mut group = c. benchmark_group ( "mailbox_send_receive" . to_string ( ) ) ;
258
+ group. throughput ( Throughput :: Bytes ( size as u64 ) ) ;
259
+ group. sampling_mode ( criterion:: SamplingMode :: Flat ) ;
260
+ group. sample_size ( 10 ) ;
261
+ group. bench_function ( BenchmarkId :: from_parameter ( size) , move |b| {
262
+ let mut b = b. to_async ( Runtime :: new ( ) . unwrap ( ) ) ;
263
+ b. iter_custom ( |iters| async move {
264
+ let proc_id = ProcId :: Ranked ( WorldId ( "world" . to_string ( ) ) , 0 ) ;
265
+ let actor_id = ActorId ( proc_id, "actor" . to_string ( ) , 0 ) ;
266
+ let mbox = Mailbox :: new_detached ( actor_id) ;
267
+ let ( port, mut receiver) = mbox. open_port :: < Message > ( ) ;
268
+ let port = port. bind ( ) ;
269
+
270
+ let msg = Message :: new ( 0 , size) ;
271
+ let start = Instant :: now ( ) ;
272
+ for _ in 0 ..iters {
273
+ mbox. serialize_and_send ( & port, msg. clone ( ) , monitored_return_handle ( ) )
274
+ . unwrap ( ) ;
275
+ receiver. recv ( ) . await . unwrap ( ) ;
276
+ }
277
+ start. elapsed ( )
278
+ } ) ;
279
+ } ) ;
280
+ group. finish ( ) ;
281
+ }
282
+ }
283
+
284
+ // Benchmark message rates for mailbox
285
+ fn bench_mailbox_message_rates ( c : & mut Criterion ) {
286
+ let mut group = c. benchmark_group ( "mailbox_message_rates" ) ;
287
+ let rates = vec ! [ 100 , 1000 , 5000 ] ;
288
+ let payload_size = 1024 ; // 1KB payload
289
+
290
+ for rate in & rates {
291
+ let rate = * rate;
292
+ group. bench_function ( format ! ( "rate_{}mps" , rate) , move |b| {
293
+ let mut b = b. to_async ( Runtime :: new ( ) . unwrap ( ) ) ;
294
+ b. iter_custom ( |iters| async move {
295
+ let proc_id = ProcId :: Ranked ( WorldId ( "world" . to_string ( ) ) , 0 ) ;
296
+ let actor_id = ActorId ( proc_id, "actor" . to_string ( ) , 0 ) ;
297
+ let mbox = Mailbox :: new_detached ( actor_id) ;
298
+ let ( port, mut receiver) = mbox. open_port :: < Message > ( ) ;
299
+ let port = port. bind ( ) ;
300
+
301
+ // Spawn a task to receive messages
302
+ let total_msgs = iters * rate;
303
+ let receiver_task = tokio:: spawn ( async move {
304
+ let mut received_count = 0 ;
305
+ while received_count < total_msgs {
306
+ match receiver. recv ( ) . await {
307
+ Ok ( _) => received_count += 1 ,
308
+ Err ( e) => {
309
+ panic ! ( "Error receiving message: {}" , e) ;
310
+ }
311
+ }
312
+ }
313
+ } ) ;
314
+
315
+ let message = Message :: new ( 0 , payload_size) ;
316
+ let start = Instant :: now ( ) ;
317
+
318
+ for _ in 0 ..iters {
319
+ let mut response_handlers: Vec < tokio:: task:: JoinHandle < ( ) > > =
320
+ Vec :: with_capacity ( rate as usize ) ;
321
+
322
+ for _ in 0 ..rate {
323
+ let ( return_sender, return_receiver) = oneshot:: channel ( ) ;
324
+ let msg_clone = message. clone ( ) ;
325
+ let port_clone = port. clone ( ) ;
326
+ let mbox_clone = mbox. clone ( ) ;
327
+
328
+ let handle = tokio:: spawn ( async move {
329
+ mbox_clone
330
+ . serialize_and_send (
331
+ & port_clone,
332
+ msg_clone,
333
+ monitored_return_handle ( ) ,
334
+ )
335
+ . unwrap ( ) ;
336
+ let _ = return_sender. send ( ( ) ) ;
337
+
338
+ select ! {
339
+ _ = return_receiver => { } ,
340
+ _ = tokio:: time:: sleep( Duration :: from_millis( 5000 ) ) => {
341
+ panic!( "Did not get ack within timeout" ) ;
342
+ }
343
+ }
344
+ } ) ;
345
+
346
+ response_handlers. push ( handle) ;
347
+
348
+ let delay_ms = if rate > 0 { 1000 / rate } else { 0 } ;
349
+ let elapsed = start. elapsed ( ) . as_millis ( ) ;
350
+ let effective_delay = ( delay_ms as u128 ) . saturating_sub ( elapsed) ;
351
+ if effective_delay > 0 {
352
+ tokio:: time:: sleep ( Duration :: from_millis ( effective_delay as u64 ) ) . await ;
353
+ }
354
+ }
355
+ join_all ( response_handlers) . await ;
356
+ }
357
+
358
+ receiver_task. await . unwrap ( ) ;
359
+ start. elapsed ( )
360
+ } ) ;
361
+ } ) ;
362
+ }
363
+
364
+ group. finish ( ) ;
365
+ }
366
+
237
367
criterion_group ! (
238
368
benches,
239
- bench_message_sizes,
240
- bench_message_rates,
241
- bench_channel_ping_pong
369
+ // bench_message_sizes,
370
+ // bench_message_rates,
371
+ bench_mailbox_message_sizes,
372
+ bench_mailbox_message_rates,
373
+ // bench_channel_ping_pong,
242
374
) ;
243
375
244
376
criterion_main ! ( benches) ;
0 commit comments