Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ license = "GPL-3.0"
members = [
"busan-derive",

"examples/greet_machine",
"examples/hello_world",
"examples/ping_pong",
]
Expand Down
6 changes: 5 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ use std::io::Result;

fn main() -> Result<()> {
prost_build::compile_protos(
&["src/message/wrappers.proto", "src/actor/address.proto"],
&[
"src/message/wrappers.proto",
"src/actor/address.proto",
"src/patterns/load_balancer.proto",
],
&["src/"],
)?;
Ok(())
Expand Down
12 changes: 12 additions & 0 deletions examples/greet_machine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "examples_greet_machine"
version = "0.0.0"
publish = false
edition = "2021"

[dependencies]
busan = { path = "../../" }
prost = "0.11"

[build-dependencies]
prost-build = "0.11"
35 changes: 35 additions & 0 deletions examples/greet_machine/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use busan::message::common_types::I32Wrapper;
use busan::prelude::*;

fn main() {
let mut system = ActorSystem::init(ActorSystemConfig::default());
}

struct GreetMachine {
num_greeters: i32,
}

impl ActorInit for GreetMachine {
type Init = I32Wrapper;

fn init(init_msg: Self::Init) -> Self
where
Self: Sized + Actor,
{
GreetMachine {
num_greeters: init_msg.value,
}
}
}

impl Actor for GreetMachine {
fn before_start(&mut self, mut ctx: Context) {
for n in 0..self.num_greeters {
ctx.spawn_child("greeter", n);
}
}

fn receive(&mut self, ctx: Context, msg: Box<dyn Message>) {
todo!()
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod actor;
pub mod config;
pub mod executor;
pub mod message;
pub mod patterns;
pub mod prelude;
pub mod util;

Expand Down
20 changes: 20 additions & 0 deletions src/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Core message types used by Busan and primitive type wrappers

use std::any::Any;

pub mod common_types;

pub trait Message: prost::Message {
Expand All @@ -22,6 +24,24 @@ pub trait Message: prost::Message {
}
}

impl<M> Message for Box<M>
where
M: Message,
{
fn as_any(&self) -> &dyn Any {
(**self).as_any()
}
fn encode_to_vec2(&self) -> Vec<u8> {
(**self).encode_to_vec2()
}
fn merge2(&mut self, buf: &[u8]) -> Result<(), prost::DecodeError> {
(**self).merge2(buf)
}
fn encoded_len(&self) -> usize {
prost::Message::encoded_len(&**self)
}
}

pub trait ToMessage<M: Message> {
fn to_message(self) -> M;

Expand Down
15 changes: 15 additions & 0 deletions src/patterns/load_balancer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package patterns.load_balancer.proto;

import "actor/address.proto";

message Init {
actor.proto.AddressList nodes = 1;
Strategy strategy = 2;
}

enum Strategy {
ROUND_ROBIN = 0;
RANDOM = 1;
}
92 changes: 92 additions & 0 deletions src/patterns/load_balancer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use crate::actor::{Actor, ActorAddress, ActorInit, Context};
use crate::message::Message;

// TODO: Create some initialization functions for the LB in the parent module
// load_balancer(ctx, ROUND_ROBIN, actors);

// TODO: Things needed to fully implement a load balancer
// + [ ] Test support for doing an end-to-end integration test (test system/executor)
// + [ ] Shutdown component of lifecycle management
// + [ ] Including a death-watch mechanism so that nodes can be removed from LBs

// TODO: Write some docs
struct LoadBalancer {
actors: Vec<ActorAddress>,

strategy_state: StrategyState,
}

impl Actor for LoadBalancer {
fn receive(&mut self, ctx: Context, msg: Box<dyn Message>) {
let next = self.next();
let actor = &self.actors[next];
ctx.send_message(actor, msg);
}
}

impl LoadBalancer {
fn new(actors: Vec<ActorAddress>, strategy_state: StrategyState) -> Self {
Self {
actors,
strategy_state,
}
}

fn next(&mut self) -> usize {
match &mut self.strategy_state {
StrategyState::RoundRobin { index } => {
*index = (*index + 1) % self.actors.len();
*index
}
StrategyState::Random => 0, // rand::thread_rng() % self.actors.len(),
}
}
}

impl ActorInit for LoadBalancer {
type Init = proto::Init;

fn init(init_msg: Self::Init) -> Self
where
Self: Sized + Actor,
{
// This should be handled by utility code used to create load-balancers,
// so just some additional validations to ensure the addresses are present
// and the strategy is valid.
debug_assert!(init_msg.nodes.is_some());
debug_assert!(match init_msg.strategy {
x if x == proto::Strategy::RoundRobin as i32 => true,
x if x == proto::Strategy::Random as i32 => true,
_ => false,
});

Self::new(
init_msg
.nodes
.map(|n| n.addresses)
.unwrap_or_else(Vec::new)
.into_iter()
.map(|a| a.into())
.collect(),
match init_msg.strategy {
x if x == proto::Strategy::RoundRobin as i32 => {
StrategyState::RoundRobin { index: 0 }
}
x if x == proto::Strategy::Random as i32 => StrategyState::Random,
_ => StrategyState::Random,
},
)
}
}

pub(crate) mod proto {
use crate::message::common_types::impl_busan_message;
use crate::message::Message;
include!(concat!(env!("OUT_DIR"), "/patterns.load_balancer.proto.rs"));
impl_busan_message!(Init);
}

enum StrategyState {
RoundRobin { index: usize },
Random,
}
1 change: 1 addition & 0 deletions src/patterns/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod load_balancer;