@@ -26,6 +26,12 @@ use hyperactor::channel::Rx;
26
26
use hyperactor:: channel:: Tx ;
27
27
use hyperactor:: channel:: dial;
28
28
use hyperactor:: channel:: serve;
29
+ use hyperactor:: mailbox:: Mailbox ;
30
+ use hyperactor:: mailbox:: PortSender ;
31
+ use hyperactor:: mailbox:: monitored_return_handle;
32
+ use hyperactor:: reference:: ActorId ;
33
+ use hyperactor:: reference:: ProcId ;
34
+ use hyperactor:: reference:: WorldId ;
29
35
use serde:: Deserialize ;
30
36
use serde:: Serialize ;
31
37
use tokio:: runtime;
@@ -56,6 +62,7 @@ impl Message {
56
62
}
57
63
}
58
64
65
+ // CHANNEL
59
66
// Benchmark message sizes
60
67
fn bench_message_sizes ( c : & mut Criterion ) {
61
68
let transports = vec ! [
@@ -65,7 +72,7 @@ fn bench_message_sizes(c: &mut Criterion) {
65
72
] ;
66
73
67
74
for ( transport_name, transport) in & transports {
68
- for size in [ 10_000 , 1_000_000 , 10_000_000 , 100_000_000 , 1_000_000_000 ] {
75
+ for size in [ 10_000 , 1_000_000_000 ] {
69
76
let mut group = c. benchmark_group ( format ! ( "send_receive/{}" , transport_name) ) ;
70
77
let transport = transport. clone ( ) ;
71
78
group. throughput ( Throughput :: Bytes ( size as u64 ) ) ;
@@ -107,7 +114,7 @@ fn bench_message_rates(c: &mut Criterion) {
107
114
//TODO Add TLS once it is able to run in Sandcastle
108
115
] ;
109
116
110
- let rates = vec ! [ 100 , 1000 , 5000 ] ;
117
+ let rates = vec ! [ 100 , 5000 ] ;
111
118
112
119
let payload_size = 1024 ; // 1KB payload
113
120
@@ -148,13 +155,12 @@ fn bench_message_rates(c: &mut Criterion) {
148
155
}
149
156
150
157
let handle = tokio:: spawn ( async move {
151
- select ! {
152
- _ = return_receiver => { } ,
153
- _ = tokio:: time:: sleep( Duration :: from_millis( 5000 ) ) => {
154
- panic!( "Did not get ack within timeout" ) ;
155
-
156
- }
157
- }
158
+ _ = tokio:: time:: timeout (
159
+ Duration :: from_millis ( 5000 ) ,
160
+ return_receiver,
161
+ )
162
+ . await
163
+ . unwrap ( ) ;
158
164
} ) ;
159
165
160
166
response_handlers. push ( handle) ;
@@ -234,11 +240,127 @@ async fn channel_ping_pong(
234
240
start. elapsed ( )
235
241
}
236
242
243
+ // MAILBOX
244
+
245
+ fn bench_mailbox_message_sizes ( c : & mut Criterion ) {
246
+ let sizes: Vec < usize > = vec ! [ 10_000 , 1_000_000_000 ] ;
247
+
248
+ for size in sizes {
249
+ let mut group = c. benchmark_group ( "mailbox_send_receive" . to_string ( ) ) ;
250
+ group. throughput ( Throughput :: Bytes ( size as u64 ) ) ;
251
+ group. sampling_mode ( criterion:: SamplingMode :: Flat ) ;
252
+ group. sample_size ( 10 ) ;
253
+ group. bench_function ( BenchmarkId :: from_parameter ( size) , move |b| {
254
+ let mut b = b. to_async ( Runtime :: new ( ) . unwrap ( ) ) ;
255
+ b. iter_custom ( |iters| async move {
256
+ let proc_id = ProcId :: Ranked ( WorldId ( "world" . to_string ( ) ) , 0 ) ;
257
+ let actor_id = ActorId ( proc_id, "actor" . to_string ( ) , 0 ) ;
258
+ let mbox = Mailbox :: new_detached ( actor_id) ;
259
+ let ( port, mut receiver) = mbox. open_port :: < Message > ( ) ;
260
+ let port = port. bind ( ) ;
261
+
262
+ let msg = Message :: new ( 0 , size) ;
263
+ let start = Instant :: now ( ) ;
264
+ for _ in 0 ..iters {
265
+ mbox. serialize_and_send ( & port, msg. clone ( ) , monitored_return_handle ( ) )
266
+ . unwrap ( ) ;
267
+ receiver. recv ( ) . await . unwrap ( ) ;
268
+ }
269
+ start. elapsed ( )
270
+ } ) ;
271
+ } ) ;
272
+ group. finish ( ) ;
273
+ }
274
+ }
275
+
276
+ // Benchmark message rates for mailbox
277
+ fn bench_mailbox_message_rates ( c : & mut Criterion ) {
278
+ let mut group = c. benchmark_group ( "mailbox_message_rates" ) ;
279
+ let rates = vec ! [ 100 , 5000 ] ;
280
+ let payload_size = 1024 ; // 1KB payload
281
+
282
+ for rate in & rates {
283
+ let rate = * rate;
284
+ group. bench_function ( format ! ( "rate_{}mps" , rate) , move |b| {
285
+ let mut b = b. to_async ( Runtime :: new ( ) . unwrap ( ) ) ;
286
+ b. iter_custom ( |iters| async move {
287
+ let proc_id = ProcId :: Ranked ( WorldId ( "world" . to_string ( ) ) , 0 ) ;
288
+ let actor_id = ActorId ( proc_id, "actor" . to_string ( ) , 0 ) ;
289
+ let mbox = Mailbox :: new_detached ( actor_id) ;
290
+ let ( port, mut receiver) = mbox. open_port :: < Message > ( ) ;
291
+ let port = port. bind ( ) ;
292
+
293
+ // Spawn a task to receive messages
294
+ let total_msgs = iters * rate;
295
+ let receiver_task = tokio:: spawn ( async move {
296
+ let mut received_count = 0 ;
297
+ while received_count < total_msgs {
298
+ match receiver. recv ( ) . await {
299
+ Ok ( _) => received_count += 1 ,
300
+ Err ( e) => {
301
+ panic ! ( "Error receiving message: {}" , e) ;
302
+ }
303
+ }
304
+ }
305
+ } ) ;
306
+
307
+ let message = Message :: new ( 0 , payload_size) ;
308
+ let start = Instant :: now ( ) ;
309
+
310
+ for _ in 0 ..iters {
311
+ let mut response_handlers: Vec < tokio:: task:: JoinHandle < ( ) > > =
312
+ Vec :: with_capacity ( rate as usize ) ;
313
+
314
+ for _ in 0 ..rate {
315
+ let ( return_sender, return_receiver) = oneshot:: channel ( ) ;
316
+ let msg_clone = message. clone ( ) ;
317
+ let port_clone = port. clone ( ) ;
318
+ let mbox_clone = mbox. clone ( ) ;
319
+
320
+ let handle = tokio:: spawn ( async move {
321
+ mbox_clone
322
+ . serialize_and_send (
323
+ & port_clone,
324
+ msg_clone,
325
+ monitored_return_handle ( ) ,
326
+ )
327
+ . unwrap ( ) ;
328
+ let _ = return_sender. send ( ( ) ) ;
329
+
330
+ let _ =
331
+ tokio:: time:: timeout ( Duration :: from_millis ( 5000 ) , return_receiver)
332
+ . await
333
+ . expect ( "Timed out waiting for return message" ) ;
334
+ } ) ;
335
+
336
+ response_handlers. push ( handle) ;
337
+
338
+ let delay_ms = if rate > 0 { 1000 / rate } else { 0 } ;
339
+ let elapsed = start. elapsed ( ) . as_millis ( ) ;
340
+ let effective_delay = ( delay_ms as u128 ) . saturating_sub ( elapsed) ;
341
+ if effective_delay > 0 {
342
+ tokio:: time:: sleep ( Duration :: from_millis ( effective_delay as u64 ) ) . await ;
343
+ }
344
+ }
345
+ join_all ( response_handlers) . await ;
346
+ }
347
+
348
+ receiver_task. await . unwrap ( ) ;
349
+ start. elapsed ( )
350
+ } ) ;
351
+ } ) ;
352
+ }
353
+
354
+ group. finish ( ) ;
355
+ }
356
+
237
357
criterion_group ! (
238
358
benches,
239
359
bench_message_sizes,
240
360
bench_message_rates,
241
- bench_channel_ping_pong
361
+ bench_mailbox_message_sizes,
362
+ bench_mailbox_message_rates,
363
+ bench_channel_ping_pong,
242
364
) ;
243
365
244
366
criterion_main ! ( benches) ;
0 commit comments