From 664afda3e5a7ec07f83d43c436738e6582e9cf63 Mon Sep 17 00:00:00 2001 From: John Murray Date: Wed, 10 May 2023 16:22:16 -0400 Subject: [PATCH 1/4] wip --- build.rs | 5 ++++- src/actor/address.proto | 14 ++++++++++++++ src/lib.rs | 1 + src/patterns/load_balancer.rs | 26 ++++++++++++++++++++++++++ src/patterns/mod.rs | 1 + 5 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 src/actor/address.proto create mode 100644 src/patterns/load_balancer.rs create mode 100644 src/patterns/mod.rs diff --git a/build.rs b/build.rs index 3950335..12be6c0 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,9 @@ use std::io::Result; fn main() -> Result<()> { - prost_build::compile_protos(&["src/message/wrappers.proto"], &["src/"])?; + prost_build::compile_protos( + &["src/message/wrappers.proto", "src/actor/address.proto"], + &["src/"], + )?; Ok(()) } diff --git a/src/actor/address.proto b/src/actor/address.proto new file mode 100644 index 0000000..4451c52 --- /dev/null +++ b/src/actor/address.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +message ActorAddress { + enum Scheme { + LOCAL = 0; + + // Remote is currently a placeholder for future use. + // See busan::actor::address::UriScheme + REMOTE = 1; + } + + Scheme scheme = 1; + string path = 2; +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 07c8478..5172d77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod actor; pub mod config; pub mod executor; pub mod message; +pub mod patterns; pub mod prelude; pub mod util; diff --git a/src/patterns/load_balancer.rs b/src/patterns/load_balancer.rs new file mode 100644 index 0000000..f4bd0b7 --- /dev/null +++ b/src/patterns/load_balancer.rs @@ -0,0 +1,26 @@ +use crate::actor::{Actor, ActorAddress, ActorInit, Context}; +use crate::message::{Message, ToMessage}; +use std::cell::RefCell; +use std::iter::Cycle; +use std::slice::Iter; +use std::vec::IntoIter; + +struct LoadBalancer { + actors: Vec, +} + +impl Actor for LoadBalancer { + fn receive(&mut self, ctx: Context, msg: Box) { + todo!() + } +} + +impl ActorInit for LoadBalancer { + type Init = (); + fn init(init_msg: &Self::Init) -> Self + where + Self: Sized + Actor, + { + todo!() + } +} diff --git a/src/patterns/mod.rs b/src/patterns/mod.rs new file mode 100644 index 0000000..a3ce407 --- /dev/null +++ b/src/patterns/mod.rs @@ -0,0 +1 @@ +pub mod load_balancer; From 041456842a094d2274c929a265b1df8be1dc3009 Mon Sep 17 00:00:00 2001 From: John Murray Date: Sun, 14 May 2023 11:31:50 -0400 Subject: [PATCH 2/4] Add proto/message support for ActorAddress --- src/actor/address.proto | 21 +++++++++----- src/actor/address.rs | 8 ++++-- src/actor/mod.rs | 2 ++ src/actor/proto.rs | 57 +++++++++++++++++++++++++++++++++++++ src/message/common_types.rs | 1 + 5 files changed, 80 insertions(+), 9 deletions(-) create mode 100644 src/actor/proto.rs diff --git a/src/actor/address.proto b/src/actor/address.proto index 4451c52..12dcf5e 100644 --- a/src/actor/address.proto +++ b/src/actor/address.proto @@ -1,14 +1,21 @@ syntax = "proto3"; -message ActorAddress { - enum Scheme { - LOCAL = 0; +package actor.address; + +enum Scheme { + LOCAL = 0; - // Remote is currently a placeholder for future use. - // See busan::actor::address::UriScheme - REMOTE = 1; - } + // Remote is currently a placeholder for future use. + // See busan::actor::address::UriScheme + REMOTE = 1; +} + +message ActorAddress { Scheme scheme = 1; string path = 2; +} + +message AddressList { + repeated ActorAddress addresses = 1; } \ No newline at end of file diff --git a/src/actor/address.rs b/src/actor/address.rs index f1e8be0..0592c33 100644 --- a/src/actor/address.rs +++ b/src/actor/address.rs @@ -112,8 +112,8 @@ pub(crate) enum UriScheme { /// ``` #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Uri { - scheme: UriScheme, - path_segments: Vec, + pub(crate) scheme: UriScheme, + pub(crate) path_segments: Vec, } impl Uri { @@ -160,6 +160,10 @@ impl Uri { fn is_parent(&self, maybe_parent: &Self) -> bool { maybe_parent.is_child(self) } + + pub(crate) fn path(&self) -> String { + self.path_segments.join("/") + } } impl Display for Uri { diff --git a/src/actor/mod.rs b/src/actor/mod.rs index a75c4f5..2679d2f 100644 --- a/src/actor/mod.rs +++ b/src/actor/mod.rs @@ -90,6 +90,8 @@ pub mod address; #[doc(hidden)] pub mod letter; +pub mod proto; + #[doc(inline)] pub use actor::*; #[doc(inline)] diff --git a/src/actor/proto.rs b/src/actor/proto.rs new file mode 100644 index 0000000..c01b2c3 --- /dev/null +++ b/src/actor/proto.rs @@ -0,0 +1,57 @@ +//! Serializable message types for the actor module +//! +//! This module contains the protobuf definitions that map to internal types +//! such as [`ActorAddress`](crate::actor::ActorAddress) as well as the associated +//! [`ToMessage`](crate::message::ToMessage) implementations for these types. + +use crate::actor; +use crate::actor::UriScheme; +use crate::message::common_types::impl_busan_message; +use crate::message::{Message, ToMessage}; +use std::cell::RefCell; + +// Import the generated protobuf definitions (see build.rs) +include!(concat!(env!("OUT_DIR"), "/actor.address.rs")); + +impl_busan_message!(ActorAddress); +impl_busan_message!(AddressList); + +impl ToMessage for &actor::ActorAddress { + fn to_message(self) -> ActorAddress { + ActorAddress { + scheme: match self.uri.scheme { + UriScheme::Local => Scheme::Local as i32, + UriScheme::Remote => Scheme::Remote as i32, + }, + path: self.uri.path(), + } + } +} + +impl From for actor::ActorAddress { + fn from(address: ActorAddress) -> Self { + let scheme = match address.scheme { + s if s == Scheme::Local as i32 => UriScheme::Local, + s if s == Scheme::Remote as i32 => UriScheme::Remote, + // TODO: Should this be implemented as a TryFrom instead? + _ => panic!("Invalid scheme"), + }; + actor::ActorAddress { + // TODO: This needs an internal constructor. Presumably there is at least + // one other place we're doing this same thing + uri: actor::Uri { + scheme, + path_segments: address.path.split('/').map(|s| s.to_string()).collect(), + }, + mailbox: RefCell::new(None), + } + } +} + +impl ToMessage for &[actor::ActorAddress] { + fn to_message(self) -> AddressList { + AddressList { + addresses: self.iter().map(|a| a.to_message()).collect(), + } + } +} diff --git a/src/message/common_types.rs b/src/message/common_types.rs index 3412134..d8e74e0 100644 --- a/src/message/common_types.rs +++ b/src/message/common_types.rs @@ -115,6 +115,7 @@ macro_rules! impl_busan_message { } }; } +pub(crate) use impl_busan_message; impl_busan_message!(U32Wrapper); impl_busan_message!(U64Wrapper); From 0801821fce3c554aa07005b65b51388efcb47010 Mon Sep 17 00:00:00 2001 From: John Murray Date: Tue, 16 May 2023 19:40:37 -0400 Subject: [PATCH 3/4] lb proto + ~working init --- build.rs | 6 ++- src/actor/address.proto | 2 +- src/patterns/load_balancer.proto | 15 +++++++ src/patterns/load_balancer.rs | 76 ++++++++++++++++++++++++++++++-- 4 files changed, 93 insertions(+), 6 deletions(-) create mode 100644 src/patterns/load_balancer.proto diff --git a/build.rs b/build.rs index 12be6c0..5df0466 100644 --- a/build.rs +++ b/build.rs @@ -2,7 +2,11 @@ use std::io::Result; fn main() -> Result<()> { prost_build::compile_protos( - &["src/message/wrappers.proto", "src/actor/address.proto"], + &[ + "src/message/wrappers.proto", + "src/actor/address.proto", + "src/patterns/load_balancer.proto", + ], &["src/"], )?; Ok(()) diff --git a/src/actor/address.proto b/src/actor/address.proto index 12dcf5e..2fa10ed 100644 --- a/src/actor/address.proto +++ b/src/actor/address.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package actor.address; +package actor.proto; enum Scheme { LOCAL = 0; diff --git a/src/patterns/load_balancer.proto b/src/patterns/load_balancer.proto new file mode 100644 index 0000000..ab979f8 --- /dev/null +++ b/src/patterns/load_balancer.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package patterns.load_balancer; + +import "actor/address.proto"; + +message Init { + actor.proto.AddressList nodes = 1; + Strategy strategy = 2; +} + +enum Strategy { + ROUND_ROBIN = 0; + RANDOM = 1; +} diff --git a/src/patterns/load_balancer.rs b/src/patterns/load_balancer.rs index f4bd0b7..5115213 100644 --- a/src/patterns/load_balancer.rs +++ b/src/patterns/load_balancer.rs @@ -1,26 +1,94 @@ +use crate::actor::proto as actor_proto; use crate::actor::{Actor, ActorAddress, ActorInit, Context}; +use crate::message::common_types::impl_busan_message; use crate::message::{Message, ToMessage}; use std::cell::RefCell; use std::iter::Cycle; use std::slice::Iter; use std::vec::IntoIter; +// TODO: Create some initialization functions for the LB in the parent module +// load_balancer(ctx, ROUND_ROBIN, actors); + +// TODO: Things needed to fully implement a load balancer +// + [ ] Test support for doing an end-to-end integration test (test system/executor) +// + [ ] Shutdown component of lifecycle management + +/// TODO: Write some docs struct LoadBalancer { actors: Vec, + strategy: Strategy, + + strategy_state: StrategyState, } impl Actor for LoadBalancer { fn receive(&mut self, ctx: Context, msg: Box) { - todo!() + let actor = &self.actors[self.next()]; + ctx.send_message(actor, msg); + } +} + +impl LoadBalancer { + fn new(actors: Vec, strategy: Strategy) -> Self { + Self { + actors, + strategy, + strategy_state: match strategy { + Strategy::RoundRobin => StrategyState::RoundRobin { index: 0 }, + Strategy::Random => StrategyState::Random, + }, + } + } + + fn next(&mut self) -> usize { + match &mut self.strategy_state { + StrategyState::RoundRobin { index } => { + *index = (*index + 1) % self.actors.len(); + *index + } + StrategyState::Random => rand::thread_rng() % self.actors.len(), + } } } impl ActorInit for LoadBalancer { - type Init = (); - fn init(init_msg: &Self::Init) -> Self + type Init = Init; + + fn init(init_msg: Self::Init) -> Self where Self: Sized + Actor, { - todo!() + /// This should be handled by utility code used to create load-balancers, + /// so just some additional validations to ensure the addresses are present + /// and the strategy is valid. + debug_assert!(init_msg.nodes.is_some()); + debug_assert!(match init_msg.strategy { + x if x == Strategy::RoundRobin as i32 => true, + _ => false, + }); + + Self::new( + init_msg + .nodes + .map(|n| n.addresses) + .unwrap_or_else(Vec::new) + .into_iter() + .map(|a| a.into()) + .collect(), + match init_msg.strategy { + x if x == Strategy::RoundRobin as i32 => Strategy::RoundRobin, + x if x == Strategy::Random as i32 => Strategy::Random, + _ => Strategy::RoundRobin, + }, + ) } } + +include!(concat!(env!("OUT_DIR"), "/patterns.load_balancer.rs")); +impl_busan_message!(Init); + +enum StrategyState { + RoundRobin { index: usize }, + Random, +} From 09cf6bce7940d0a8594d8dc2031824f369b869c9 Mon Sep 17 00:00:00 2001 From: John Murray Date: Fri, 19 May 2023 13:27:25 -0400 Subject: [PATCH 4/4] wip --- Cargo.toml | 1 + busan-derive/src/lib.rs | 4 +++ examples/greet_machine/Cargo.toml | 12 +++++++ examples/greet_machine/src/main.rs | 35 ++++++++++++++++++++ examples/ping_pong/src/main.rs | 6 ++-- src/actor/actor.rs | 24 ++++++++------ src/actor/proto.rs | 2 +- src/message/common_types.rs | 3 ++ src/message/mod.rs | 25 ++++++++++++++ src/patterns/load_balancer.proto | 2 +- src/patterns/load_balancer.rs | 52 ++++++++++++++---------------- 11 files changed, 124 insertions(+), 42 deletions(-) create mode 100644 examples/greet_machine/Cargo.toml create mode 100644 examples/greet_machine/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index c997655..f754ddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ license = "GPL-3.0" members = [ "busan-derive", + "examples/greet_machine", "examples/hello_world", "examples/ping_pong", ] diff --git a/busan-derive/src/lib.rs b/busan-derive/src/lib.rs index 5dc6632..68ca4ad 100644 --- a/busan-derive/src/lib.rs +++ b/busan-derive/src/lib.rs @@ -18,6 +18,10 @@ pub fn message(input: TokenStream) -> TokenStream { fn encode_to_vec2(&self) -> Vec { prost::Message::encode_to_vec(self) } + + fn merge2(&mut self, bytes: &[u8]) -> Result<(), prost::DecodeError> { + prost::Message::merge(self, bytes) + } } }; diff --git a/examples/greet_machine/Cargo.toml b/examples/greet_machine/Cargo.toml new file mode 100644 index 0000000..14ca7e5 --- /dev/null +++ b/examples/greet_machine/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "examples_greet_machine" +version = "0.0.0" +publish = false +edition = "2021" + +[dependencies] +busan = { path = "../../" } +prost = "0.11" + +[build-dependencies] +prost-build = "0.11" \ No newline at end of file diff --git a/examples/greet_machine/src/main.rs b/examples/greet_machine/src/main.rs new file mode 100644 index 0000000..5b44a79 --- /dev/null +++ b/examples/greet_machine/src/main.rs @@ -0,0 +1,35 @@ +use busan::message::common_types::I32Wrapper; +use busan::prelude::*; + +fn main() { + let mut system = ActorSystem::init(ActorSystemConfig::default()); +} + +struct GreetMachine { + num_greeters: i32, +} + +impl ActorInit for GreetMachine { + type Init = I32Wrapper; + + fn init(init_msg: Self::Init) -> Self + where + Self: Sized + Actor, + { + GreetMachine { + num_greeters: init_msg.value, + } + } +} + +impl Actor for GreetMachine { + fn before_start(&mut self, mut ctx: Context) { + for n in 0..self.num_greeters { + ctx.spawn_child("greeter", n); + } + } + + fn receive(&mut self, ctx: Context, msg: Box) { + todo!() + } +} diff --git a/examples/ping_pong/src/main.rs b/examples/ping_pong/src/main.rs index 028c31a..5363cad 100644 --- a/examples/ping_pong/src/main.rs +++ b/examples/ping_pong/src/main.rs @@ -35,14 +35,14 @@ impl ActorInit for Pong { impl Actor for Ping { fn before_start(&mut self, mut ctx: Context) { let pong_addr = Some(ctx.spawn_child::("pong", 0)); - ctx.send_message(pong_addr.as_ref().unwrap(), "ping"); + ctx.send(pong_addr.as_ref().unwrap(), "ping"); } fn receive(&mut self, ctx: Context, msg: Box) { // Print the message and respond with a "ping" if let Some(str_msg) = msg.as_any().downcast_ref::() { println!("received message: {}", str_msg.value); - ctx.send_message(ctx.sender(), "ping"); + ctx.send(ctx.sender(), "ping"); } } } @@ -51,7 +51,7 @@ impl Actor for Pong { // Print the message and respond with a "pong" if let Some(str_msg) = msg.as_any().downcast_ref::() { println!("received message: {}", str_msg.value); - ctx.send_message(ctx.sender(), "pong"); + ctx.send(ctx.sender(), "pong"); } } } diff --git a/src/actor/actor.rs b/src/actor/actor.rs index d5b3c50..4eb94f2 100644 --- a/src/actor/actor.rs +++ b/src/actor/actor.rs @@ -75,10 +75,11 @@ impl ActorCell { #[doc(hidden)] macro_rules! debug_serialize_msg { - ($msg:expr, $T:tt) => { + ($msg:expr) => { if cfg!(debug_assertions) { let bytes = $msg.encode_to_vec2(); - $T::decode(bytes.as_slice()).unwrap() + $msg.merge2(bytes.as_slice()).unwrap(); + $msg } else { $msg } @@ -122,12 +123,8 @@ impl Context<'_> { } // TODO: Document - // TODO: Talk about the debug_serialize_msg! in the docs - pub fn send_message>( - &self, - addr: &ActorAddress, - message: T, - ) { + // TODO: Coordinate documentation with `send` method + pub fn send_message(&self, addr: &ActorAddress, mut message: Box) { // Validate that the address is resolved (this is a blocking call to the runtime // manager if unresolved). if !addr.is_resolved() { @@ -146,10 +143,17 @@ impl Context<'_> { // forwarded to the dead letter queue. debug_assert!(addr.is_resolved(), "Address {} is not resolved", addr); - let message = debug_serialize_msg!(message.to_message(), M); + let message = debug_serialize_msg!(message); // Send the message to the resolved address - addr.send(Some(self.address.clone()), Box::new(message)); + addr.send(Some(self.address.clone()), message); + } + + // TODO: Document + // TODO: Talk about the debug_serialize_msg! in the docs + pub fn send>(&self, addr: &ActorAddress, message: T) { + let message = message.to_message(); + self.send_message(addr, Box::new(message)); } /// Get the sender of the current message. diff --git a/src/actor/proto.rs b/src/actor/proto.rs index c01b2c3..2cf76a7 100644 --- a/src/actor/proto.rs +++ b/src/actor/proto.rs @@ -11,7 +11,7 @@ use crate::message::{Message, ToMessage}; use std::cell::RefCell; // Import the generated protobuf definitions (see build.rs) -include!(concat!(env!("OUT_DIR"), "/actor.address.rs")); +include!(concat!(env!("OUT_DIR"), "/actor.proto.rs")); impl_busan_message!(ActorAddress); impl_busan_message!(AddressList); diff --git a/src/message/common_types.rs b/src/message/common_types.rs index d8e74e0..40c5eda 100644 --- a/src/message/common_types.rs +++ b/src/message/common_types.rs @@ -112,6 +112,9 @@ macro_rules! impl_busan_message { fn encode_to_vec2(&self) -> Vec { prost::Message::encode_to_vec(self) } + fn merge2(&mut self, bytes: &[u8]) -> Result<(), prost::DecodeError> { + prost::Message::merge(self, bytes) + } } }; } diff --git a/src/message/mod.rs b/src/message/mod.rs index 3615e82..4f8fdfb 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,5 +1,8 @@ //! Core message types used by Busan and primitive type wrappers +use prost::DecodeError; +use std::any::Any; + pub mod common_types; pub trait Message: prost::Message { @@ -12,12 +15,34 @@ pub trait Message: prost::Message { #[doc(hidden)] fn encode_to_vec2(&self) -> Vec; + /// A version of merge that does not have a [`Sized`] requirement + #[doc(hidden)] + fn merge2(&mut self, buf: &[u8]) -> Result<(), DecodeError>; + #[doc(hidden)] fn encoded_len(&self) -> usize { prost::Message::encoded_len(self) } } +impl Message for Box +where + M: Message, +{ + fn as_any(&self) -> &dyn Any { + (**self).as_any() + } + fn encode_to_vec2(&self) -> Vec { + (**self).encode_to_vec2() + } + fn merge2(&mut self, buf: &[u8]) -> Result<(), DecodeError> { + (**self).merge2(buf) + } + fn encoded_len(&self) -> usize { + prost::Message::encoded_len(&**self) + } +} + pub trait ToMessage { fn to_message(self) -> M; diff --git a/src/patterns/load_balancer.proto b/src/patterns/load_balancer.proto index ab979f8..3d9953b 100644 --- a/src/patterns/load_balancer.proto +++ b/src/patterns/load_balancer.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package patterns.load_balancer; +package patterns.load_balancer.proto; import "actor/address.proto"; diff --git a/src/patterns/load_balancer.rs b/src/patterns/load_balancer.rs index 5115213..9857c3a 100644 --- a/src/patterns/load_balancer.rs +++ b/src/patterns/load_balancer.rs @@ -1,11 +1,5 @@ -use crate::actor::proto as actor_proto; use crate::actor::{Actor, ActorAddress, ActorInit, Context}; -use crate::message::common_types::impl_busan_message; -use crate::message::{Message, ToMessage}; -use std::cell::RefCell; -use std::iter::Cycle; -use std::slice::Iter; -use std::vec::IntoIter; +use crate::message::Message; // TODO: Create some initialization functions for the LB in the parent module // load_balancer(ctx, ROUND_ROBIN, actors); @@ -13,31 +7,28 @@ use std::vec::IntoIter; // TODO: Things needed to fully implement a load balancer // + [ ] Test support for doing an end-to-end integration test (test system/executor) // + [ ] Shutdown component of lifecycle management +// + [ ] Including a death-watch mechanism so that nodes can be removed from LBs -/// TODO: Write some docs +// TODO: Write some docs struct LoadBalancer { actors: Vec, - strategy: Strategy, strategy_state: StrategyState, } impl Actor for LoadBalancer { fn receive(&mut self, ctx: Context, msg: Box) { - let actor = &self.actors[self.next()]; + let next = self.next(); + let actor = &self.actors[next]; ctx.send_message(actor, msg); } } impl LoadBalancer { - fn new(actors: Vec, strategy: Strategy) -> Self { + fn new(actors: Vec, strategy_state: StrategyState) -> Self { Self { actors, - strategy, - strategy_state: match strategy { - Strategy::RoundRobin => StrategyState::RoundRobin { index: 0 }, - Strategy::Random => StrategyState::Random, - }, + strategy_state, } } @@ -47,24 +38,25 @@ impl LoadBalancer { *index = (*index + 1) % self.actors.len(); *index } - StrategyState::Random => rand::thread_rng() % self.actors.len(), + StrategyState::Random => 0, // rand::thread_rng() % self.actors.len(), } } } impl ActorInit for LoadBalancer { - type Init = Init; + type Init = proto::Init; fn init(init_msg: Self::Init) -> Self where Self: Sized + Actor, { - /// This should be handled by utility code used to create load-balancers, - /// so just some additional validations to ensure the addresses are present - /// and the strategy is valid. + // This should be handled by utility code used to create load-balancers, + // so just some additional validations to ensure the addresses are present + // and the strategy is valid. debug_assert!(init_msg.nodes.is_some()); debug_assert!(match init_msg.strategy { - x if x == Strategy::RoundRobin as i32 => true, + x if x == proto::Strategy::RoundRobin as i32 => true, + x if x == proto::Strategy::Random as i32 => true, _ => false, }); @@ -77,16 +69,22 @@ impl ActorInit for LoadBalancer { .map(|a| a.into()) .collect(), match init_msg.strategy { - x if x == Strategy::RoundRobin as i32 => Strategy::RoundRobin, - x if x == Strategy::Random as i32 => Strategy::Random, - _ => Strategy::RoundRobin, + x if x == proto::Strategy::RoundRobin as i32 => { + StrategyState::RoundRobin { index: 0 } + } + x if x == proto::Strategy::Random as i32 => StrategyState::Random, + _ => StrategyState::Random, }, ) } } -include!(concat!(env!("OUT_DIR"), "/patterns.load_balancer.rs")); -impl_busan_message!(Init); +pub(crate) mod proto { + use crate::message::common_types::impl_busan_message; + use crate::message::Message; + include!(concat!(env!("OUT_DIR"), "/patterns.load_balancer.proto.rs")); + impl_busan_message!(Init); +} enum StrategyState { RoundRobin { index: usize },