diff --git a/Cargo.toml b/Cargo.toml index c997655..f754ddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ license = "GPL-3.0" members = [ "busan-derive", + "examples/greet_machine", "examples/hello_world", "examples/ping_pong", ] diff --git a/build.rs b/build.rs index 12be6c0..5df0466 100644 --- a/build.rs +++ b/build.rs @@ -2,7 +2,11 @@ use std::io::Result; fn main() -> Result<()> { prost_build::compile_protos( - &["src/message/wrappers.proto", "src/actor/address.proto"], + &[ + "src/message/wrappers.proto", + "src/actor/address.proto", + "src/patterns/load_balancer.proto", + ], &["src/"], )?; Ok(()) diff --git a/examples/greet_machine/Cargo.toml b/examples/greet_machine/Cargo.toml new file mode 100644 index 0000000..14ca7e5 --- /dev/null +++ b/examples/greet_machine/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "examples_greet_machine" +version = "0.0.0" +publish = false +edition = "2021" + +[dependencies] +busan = { path = "../../" } +prost = "0.11" + +[build-dependencies] +prost-build = "0.11" \ No newline at end of file diff --git a/examples/greet_machine/src/main.rs b/examples/greet_machine/src/main.rs new file mode 100644 index 0000000..5b44a79 --- /dev/null +++ b/examples/greet_machine/src/main.rs @@ -0,0 +1,35 @@ +use busan::message::common_types::I32Wrapper; +use busan::prelude::*; + +fn main() { + let mut system = ActorSystem::init(ActorSystemConfig::default()); +} + +struct GreetMachine { + num_greeters: i32, +} + +impl ActorInit for GreetMachine { + type Init = I32Wrapper; + + fn init(init_msg: Self::Init) -> Self + where + Self: Sized + Actor, + { + GreetMachine { + num_greeters: init_msg.value, + } + } +} + +impl Actor for GreetMachine { + fn before_start(&mut self, mut ctx: Context) { + for n in 0..self.num_greeters { + ctx.spawn_child("greeter", n); + } + } + + fn receive(&mut self, ctx: Context, msg: Box) { + todo!() + } +} diff --git a/src/lib.rs b/src/lib.rs index 07c8478..5172d77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod actor; pub mod config; pub mod executor; pub mod message; +pub mod patterns; pub mod prelude; pub mod util; diff --git a/src/message/mod.rs b/src/message/mod.rs index e8452cc..4a8407d 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,5 +1,7 @@ //! Core message types used by Busan and primitive type wrappers +use std::any::Any; + pub mod common_types; pub trait Message: prost::Message { @@ -22,6 +24,24 @@ pub trait Message: prost::Message { } } +impl Message for Box +where + M: Message, +{ + fn as_any(&self) -> &dyn Any { + (**self).as_any() + } + fn encode_to_vec2(&self) -> Vec { + (**self).encode_to_vec2() + } + fn merge2(&mut self, buf: &[u8]) -> Result<(), prost::DecodeError> { + (**self).merge2(buf) + } + fn encoded_len(&self) -> usize { + prost::Message::encoded_len(&**self) + } +} + pub trait ToMessage { fn to_message(self) -> M; diff --git a/src/patterns/load_balancer.proto b/src/patterns/load_balancer.proto new file mode 100644 index 0000000..3d9953b --- /dev/null +++ b/src/patterns/load_balancer.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package patterns.load_balancer.proto; + +import "actor/address.proto"; + +message Init { + actor.proto.AddressList nodes = 1; + Strategy strategy = 2; +} + +enum Strategy { + ROUND_ROBIN = 0; + RANDOM = 1; +} diff --git a/src/patterns/load_balancer.rs b/src/patterns/load_balancer.rs new file mode 100644 index 0000000..9857c3a --- /dev/null +++ b/src/patterns/load_balancer.rs @@ -0,0 +1,92 @@ +use crate::actor::{Actor, ActorAddress, ActorInit, Context}; +use crate::message::Message; + +// TODO: Create some initialization functions for the LB in the parent module +// load_balancer(ctx, ROUND_ROBIN, actors); + +// TODO: Things needed to fully implement a load balancer +// + [ ] Test support for doing an end-to-end integration test (test system/executor) +// + [ ] Shutdown component of lifecycle management +// + [ ] Including a death-watch mechanism so that nodes can be removed from LBs + +// TODO: Write some docs +struct LoadBalancer { + actors: Vec, + + strategy_state: StrategyState, +} + +impl Actor for LoadBalancer { + fn receive(&mut self, ctx: Context, msg: Box) { + let next = self.next(); + let actor = &self.actors[next]; + ctx.send_message(actor, msg); + } +} + +impl LoadBalancer { + fn new(actors: Vec, strategy_state: StrategyState) -> Self { + Self { + actors, + strategy_state, + } + } + + fn next(&mut self) -> usize { + match &mut self.strategy_state { + StrategyState::RoundRobin { index } => { + *index = (*index + 1) % self.actors.len(); + *index + } + StrategyState::Random => 0, // rand::thread_rng() % self.actors.len(), + } + } +} + +impl ActorInit for LoadBalancer { + type Init = proto::Init; + + fn init(init_msg: Self::Init) -> Self + where + Self: Sized + Actor, + { + // This should be handled by utility code used to create load-balancers, + // so just some additional validations to ensure the addresses are present + // and the strategy is valid. + debug_assert!(init_msg.nodes.is_some()); + debug_assert!(match init_msg.strategy { + x if x == proto::Strategy::RoundRobin as i32 => true, + x if x == proto::Strategy::Random as i32 => true, + _ => false, + }); + + Self::new( + init_msg + .nodes + .map(|n| n.addresses) + .unwrap_or_else(Vec::new) + .into_iter() + .map(|a| a.into()) + .collect(), + match init_msg.strategy { + x if x == proto::Strategy::RoundRobin as i32 => { + StrategyState::RoundRobin { index: 0 } + } + x if x == proto::Strategy::Random as i32 => StrategyState::Random, + _ => StrategyState::Random, + }, + ) + } +} + +pub(crate) mod proto { + use crate::message::common_types::impl_busan_message; + use crate::message::Message; + include!(concat!(env!("OUT_DIR"), "/patterns.load_balancer.proto.rs")); + impl_busan_message!(Init); +} + +enum StrategyState { + RoundRobin { index: usize }, + Random, +} diff --git a/src/patterns/mod.rs b/src/patterns/mod.rs new file mode 100644 index 0000000..a3ce407 --- /dev/null +++ b/src/patterns/mod.rs @@ -0,0 +1 @@ +pub mod load_balancer;