@@ -19,13 +19,20 @@ use criterion::criterion_group;
19
19
use criterion:: criterion_main;
20
20
use futures:: future:: join_all;
21
21
use hyperactor:: Named ;
22
+ use hyperactor:: accum;
22
23
use hyperactor:: channel;
23
24
use hyperactor:: channel:: ChannelAddr ;
24
25
use hyperactor:: channel:: ChannelTransport ;
25
26
use hyperactor:: channel:: Rx ;
26
27
use hyperactor:: channel:: Tx ;
27
28
use hyperactor:: channel:: dial;
28
29
use hyperactor:: channel:: serve;
30
+ use hyperactor:: mailbox:: Mailbox ;
31
+ use hyperactor:: mailbox:: PortSender ;
32
+ use hyperactor:: mailbox:: monitored_return_handle;
33
+ use hyperactor:: reference:: ActorId ;
34
+ use hyperactor:: reference:: ProcId ;
35
+ use hyperactor:: reference:: WorldId ;
29
36
use serde:: Deserialize ;
30
37
use serde:: Serialize ;
31
38
use tokio:: runtime;
@@ -56,6 +63,7 @@ impl Message {
56
63
}
57
64
}
58
65
66
+ // CHANNEL
59
67
// Benchmark message sizes
60
68
fn bench_message_sizes ( c : & mut Criterion ) {
61
69
let transports = vec ! [
@@ -183,7 +191,6 @@ fn bench_message_rates(c: &mut Criterion) {
183
191
fn bench_channel_ping_pong ( c : & mut Criterion ) {
184
192
let transport = ChannelTransport :: Unix ;
185
193
186
- for size in [ 1usize , 1_000_000usize ] {
187
194
let mut group = c. benchmark_group ( "channel_ping_pong" . to_string ( ) ) ;
188
195
let transport = transport. clone ( ) ;
189
196
group. throughput ( Throughput :: Bytes ( ( size * 2 ) as u64 ) ) ; // send and receive
@@ -240,5 +247,131 @@ criterion_group!(
240
247
bench_message_rates,
241
248
bench_channel_ping_pong
242
249
) ;
250
+ // MAILBOX
251
+
252
+ 10_000 ,
253
+ 100_000 ,
254
+ 1_000_000 ,
255
+ 10_000_000 ,
256
+ 100_000_000 ,
257
+ 1_000_000_000 ,
258
+ ] {
259
+ let mut group = c. benchmark_group ( "mailbox_send_receive" . to_string ( ) ) ;
260
+ group. throughput ( Throughput :: Bytes ( size as u64 ) ) ;
261
+ group. sampling_mode ( criterion:: SamplingMode :: Flat ) ;
262
+ group. sample_size ( 10 ) ;
263
+ group. bench_function ( BenchmarkId :: from_parameter ( size) , move |b| {
264
+ let mut b = b. to_async ( Runtime :: new ( ) . unwrap ( ) ) ;
265
+ b. iter_custom ( |iters| async move {
266
+ let proc_id = ProcId :: Ranked ( WorldId ( "world" . to_string ( ) ) , 0 ) ;
267
+ let actor_id = ActorId ( proc_id, "actor" . to_string ( ) , 0 ) ;
268
+ let mbox = Mailbox :: new_detached ( actor_id) ;
269
+ let ( port, mut receiver) = mbox. open_port :: < Message > ( ) ;
270
+ let port = port. bind ( ) ;
271
+
272
+ let msg = Message :: new ( 0 , size) ;
273
+ let start = Instant :: now ( ) ;
274
+ for _ in 0 ..iters {
275
+ mbox. serialize_and_send ( & port, msg. clone ( ) , monitored_return_handle ( ) )
276
+ . unwrap ( ) ;
277
+ receiver. recv ( ) . await . unwrap ( ) ;
278
+ }
279
+ start. elapsed ( )
280
+ } ) ;
281
+ } ) ;
282
+ group. finish ( ) ;
283
+ }
284
+ }
285
+
286
+ // Benchmark message rates for mailbox
287
+ fn bench_mailbox_message_rates ( c : & mut Criterion ) {
288
+ let mut group = c. benchmark_group ( "mailbox_message_rates" ) ;
289
+ let rates = vec ! [ 100 , 1000 , 5000 ] ;
290
+ let payload_size = 1024 ; // 1KB payload
291
+
292
+ for rate in & rates {
293
+ let rate = * rate;
294
+ group. bench_function ( format ! ( "rate_{}mps" , rate) , move |b| {
295
+ let mut b = b. to_async ( Runtime :: new ( ) . unwrap ( ) ) ;
296
+ b. iter_custom ( |iters| async move {
297
+ let proc_id = ProcId :: Ranked ( WorldId ( "world" . to_string ( ) ) , 0 ) ;
298
+ let actor_id = ActorId ( proc_id, "actor" . to_string ( ) , 0 ) ;
299
+ let mbox = Mailbox :: new_detached ( actor_id) ;
300
+ let ( port, mut receiver) = mbox. open_port :: < Message > ( ) ;
301
+ let port = port. bind ( ) ;
302
+
303
+ // Spawn a task to receive messages
304
+ let total_msgs = iters * rate;
305
+ let receiver_task = tokio:: spawn ( async move {
306
+ let mut received_count = 0 ;
307
+ while received_count < total_msgs {
308
+ match receiver. recv ( ) . await {
309
+ Ok ( _) => received_count += 1 ,
310
+ Err ( e) => {
311
+ panic ! ( "Error receiving message: {}" , e) ;
312
+ }
313
+ }
314
+ }
315
+ } ) ;
316
+
317
+ let message = Message :: new ( 0 , payload_size) ;
318
+ let start = Instant :: now ( ) ;
319
+
320
+ for _ in 0 ..iters {
321
+ let mut response_handlers: Vec < tokio:: task:: JoinHandle < ( ) > > =
322
+ Vec :: with_capacity ( rate as usize ) ;
323
+
324
+ for _ in 0 ..rate {
325
+ let ( return_sender, return_receiver) = oneshot:: channel ( ) ;
326
+ let msg_clone = message. clone ( ) ;
327
+ let port_clone = port. clone ( ) ;
328
+ let mbox_clone = mbox. clone ( ) ;
329
+
330
+ let handle = tokio:: spawn ( async move {
331
+ mbox_clone
332
+ . serialize_and_send (
333
+ & port_clone,
334
+ msg_clone,
335
+ monitored_return_handle ( ) ,
336
+ )
337
+ . unwrap ( ) ;
338
+ let _ = return_sender. send ( ( ) ) ;
339
+
340
+ select ! {
341
+ _ = return_receiver => { } ,
342
+ _ = tokio:: time:: sleep( Duration :: from_millis( 5000 ) ) => {
343
+ panic!( "Did not get ack within timeout" ) ;
344
+ }
345
+ }
346
+ } ) ;
347
+
348
+ response_handlers. push ( handle) ;
349
+
350
+ let delay_ms = if rate > 0 { 1000 / rate } else { 0 } ;
351
+ let elapsed = start. elapsed ( ) . as_millis ( ) ;
352
+ let effective_delay = ( delay_ms as u128 ) . saturating_sub ( elapsed) ;
353
+ if effective_delay > 0 {
354
+ tokio:: time:: sleep ( Duration :: from_millis ( effective_delay as u64 ) ) . await ;
355
+ }
356
+ }
357
+ join_all ( response_handlers) . await ;
358
+ }
359
+
360
+ receiver_task. await . unwrap ( ) ;
361
+ start. elapsed ( )
362
+ } ) ;
363
+ } ) ;
364
+ }
365
+
366
+ group. finish ( ) ;
367
+ }
368
+
369
+ criterion_group ! (
370
+ benches,
371
+ // bench_message_sizes,
372
+ // bench_message_rates,
373
+ bench_mailbox_message_sizes,
374
+ // bench_mailbox_message_rates,
375
+ ) ;
243
376
244
377
criterion_main ! ( benches) ;
0 commit comments