Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/black-jack/src/services/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ impl GameTable {

fn send_player_command(&mut self, command: GameServerRequest) -> GameServerResponse {
if let (Some(tx), Some(rx)) = (&self.request_tx, &self.response_rx) {
tx.send(command).expect("TODO");
rx.recv().expect("TODO")
tx.send(command).expect("send command");
rx.recv().expect("receive")
} else {
panic!("GameServer not initialized");
}
Expand Down
2 changes: 1 addition & 1 deletion examples/custom-storage/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Handler<messages::Ping> for Room {
) -> Result<Self::Returns, Self::Error> {
self.state.request_count += 1;
let state_saver = app_data.get::<PingState>();
self.save_state(state_saver).await.expect("TODO");
self.save_state(state_saver).await.expect("save state");
Ok(messages::Pong {
ping_id: message.ping_id,
request_count: self.state.request_count,
Expand Down
2 changes: 1 addition & 1 deletion examples/metric-aggregator/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Handler<messages::Drop> for MetricAggregator {
app_data: Arc<AppData>,
) -> Result<Self::Returns, Self::Error> {
println!("got shudown");
self.shutdown(app_data).await.expect("TODO shutdown");
self.shutdown(app_data).await.expect("Shutdown Message");
Ok(())
}
}
2 changes: 1 addition & 1 deletion examples/observability/src/bin/observability_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn main() {
.max_connections(num_cpus)
.connect("sqlite:///tmp/state.sqlite3?mode=rwc")
.await
.expect("TODO: Connection failure");
.expect("Connection failure");
let sql_state = SqliteState::new(sql_state_pool);
StateSaver::<()>::prepare(&sql_state).await;
server.app_data(sql_state);
Expand Down
2 changes: 1 addition & 1 deletion examples/ping-pong/src/bin/ping_pong_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() {
.max_connections(num_cpus)
.connect("sqlite:///tmp/state.sqlite3?mode=rwc")
.await
.expect("TODO: Connection failure");
.expect("Connection failure");
let sql_state = SqliteState::new(sql_state_pool);
StateSaver::<()>::prepare(&sql_state).await;
server.app_data(sql_state);
Expand Down
2 changes: 1 addition & 1 deletion examples/presence/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl ServiceObject for PresenceService {
"PresenceService".to_string(),
self_id.clone(),
))
.expect("TODO");
.expect("Shutdown Command");
});
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion rio-rs/src/cluster/storage/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn list_members<S: MembershipStorage + 'static>(
.inner
.members()
.await
.expect("TODO")
.expect("list members error")
.iter()
.map(|x| HttpMember {
active: x.active,
Expand Down
88 changes: 55 additions & 33 deletions rio-rs/src/cluster/storage/redis.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::fmt::Display;

use async_trait::async_trait;
use bb8::Builder;
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use chrono::{DateTime, Utc};
use redis::AsyncCommands;
use redis::{AsyncCommands, RedisError};

use crate::errors::MembershipError;

use super::{Member, MembershipResult, MembershipStorage, MembershipUnitResult};

Expand All @@ -24,11 +27,17 @@ impl RedisMembershipStorage {
}

impl RedisMembershipStorage {
pub async fn from_connect_string(connection_string: &str, key_prefix: Option<String>) -> Self {
let conn_manager = RedisConnectionManager::new(connection_string).expect("TODO");
let pool = Pool::builder().build(conn_manager).await.expect("TODO");
pub fn new(pool: Pool<RedisConnectionManager>, key_prefix: Option<String>) -> Self {
let key_prefix = key_prefix.unwrap_or_default();
RedisMembershipStorage { pool, key_prefix }
Self { pool, key_prefix }
}

pub fn pool() -> Builder<RedisConnectionManager> {
Pool::builder()
}

pub fn connection_manager(url: impl ToString) -> Result<RedisConnectionManager, RedisError> {
RedisConnectionManager::new(url.to_string())
}
}

Expand All @@ -47,54 +56,60 @@ fn member_to_string(member: &Member) -> String {
member
}

fn parse_member(member: &str) -> Member {
fn parse_member(member: &str) -> MembershipResult<Member> {
let mut split_member = member.split(";");
let ip = split_member.next().expect("TODO").to_string();
let port = split_member.next().expect("TODO").to_string();
let ip = split_member
.next()
.ok_or(MembershipError::DeserializationError)?
.to_string();
let port = split_member
.next()
.ok_or(MembershipError::DeserializationError)?
.to_string();
let mut parsed_member = Member::new(ip, port);
parsed_member.active = split_member
.next()
.expect("TODO next")
.ok_or(MembershipError::DeserializationError)?
.parse()
.expect("TODO parse");
let last_seen = split_member.next().expect("TODO");
.map_err(|_| MembershipError::DeserializationError)?;
let last_seen = split_member
.next()
.ok_or(MembershipError::DeserializationError)?;
parsed_member.last_seen = DateTime::parse_from_rfc3339(last_seen)
.expect("TODO")
.map_err(|_| MembershipError::DeserializationError)?
.to_utc();
parsed_member
Ok(parsed_member)
}

#[async_trait]
impl MembershipStorage for RedisMembershipStorage {
async fn push(&self, member: Member) -> MembershipUnitResult {
let mut client = self.pool.get().await.expect("TODO");
let mut client = self.pool.get().await?;
let member_key = member_key(&member.ip, &member.port);
let member_val = member_to_string(&member);

let key = self.members_key();
let _: () = client
.hset(&key, member_key, member_val)
.await
.expect("TODO");
let _: () = client.hset(&key, member_key, member_val).await?;
Ok(())
}

async fn remove(&self, ip: &str, port: &str) -> MembershipUnitResult {
let mut client = self.pool.get().await.expect("TODO");
let mut client = self.pool.get().await?;
let member_key = member_key(ip, port);
let key = self.members_key();
let _: () = client.hdel(&key, member_key).await.expect("TODO");
let _: () = client.hdel(&key, member_key).await?;
Ok(())
}

async fn set_is_active(&self, ip: &str, port: &str, is_active: bool) -> MembershipUnitResult {
let last_seen = Utc::now();
let mut client = self.pool.get().await.expect("TODO");
let mut client = self.pool.get().await?;
let member_key = member_key(ip, port);
let key = self.members_key();
let raw_member: Option<String> = client.hget(&key, &member_key).await.expect("TODO");
let raw_member: Option<String> = client.hget(&key, &member_key).await?;
let mut member = raw_member
.map(|x| parse_member(&x))
.transpose()?
.unwrap_or_else(|| Member::new(ip.to_string(), port.to_string()));
if is_active {
member.last_seen = last_seen;
Expand All @@ -105,34 +120,41 @@ impl MembershipStorage for RedisMembershipStorage {
}

async fn members(&self) -> MembershipResult<Vec<Member>> {
let mut client = self.pool.get().await.expect("TODO");
let mut client = self.pool.get().await?;
let key = self.members_key();
let members_raw: Vec<(String, String)> = client.hgetall(&key).await.expect("TODO");
let members: Vec<Member> = members_raw.iter().map(|(_, x)| parse_member(x)).collect();
let members_raw: Vec<(String, String)> = client.hgetall(&key).await?;
let members: Vec<Member> = members_raw
.iter()
.map(|(_, x)| parse_member(x))
.collect::<MembershipResult<Vec<Member>>>()?;
Ok(members)
}

async fn notify_failure(&self, ip: &str, port: &str) -> MembershipUnitResult {
let mut client = self.pool.get().await.expect("TODO");
let mut client = self.pool.get().await?;
let key = self.member_failures_key(ip, port);
let now = chrono::Local::now().to_utc();
let ts = now.timestamp();
let _: () = client.rpush(&key, ts).await.expect("TODO");
let _: () = client.ltrim(&key, 0, 1_000).await.expect("TODO");
let _: () = client.rpush(&key, ts).await?;
let _: () = client.ltrim(&key, 0, 1_000).await?;
Ok(())
}

async fn member_failures(&self, ip: &str, port: &str) -> MembershipResult<Vec<DateTime<Utc>>> {
let mut client = self.pool.get().await.expect("TODO");
let mut client = self.pool.get().await?;
let key = self.member_failures_key(ip, port);
let values: Vec<String> = client.lrange(&key, 0, -1).await.expect("TODO");
let values: Vec<String> = client.lrange(&key, 0, -1).await?;
let parsed_values = values
.iter()
.map(|x| {
let ts: i64 = x.parse().expect("TODO");
DateTime::from_timestamp(ts, 0).expect("TODO").to_utc()
let ts: i64 = x
.parse()
.map_err(|_| MembershipError::DeserializationError)?;
DateTime::from_timestamp(ts, 0)
.map(|dt| dt.to_utc())
.ok_or(MembershipError::DeserializationError)
})
.collect();
.collect::<MembershipResult<Vec<_>>>()?;
Ok(parsed_values)
}
}
2 changes: 1 addition & 1 deletion rio-rs/src/cluster/storage/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ mod test {
let pool = SqliteMembershipStorage::pool()
.connect("sqlite::memory:")
.await
.expect("TODO: Connection failure");
.expect("Connection failure");
let members_storage = SqliteMembershipStorage::new(pool);
members_storage.prepare().await;
members_storage
Expand Down
20 changes: 20 additions & 0 deletions rio-rs/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum HandlerError {
pub enum ServiceObjectLifeCycleError {
#[error("unknown error")]
Unknown,

#[error("fail to shutdown properly")]
Shutdown,
}

/// Errors triggered while building an [crate::client::Client] using
Expand Down Expand Up @@ -79,6 +82,9 @@ pub enum MembershipError {
#[error("unknown")]
Unknown(String),

#[error("deserialization error")]
DeserializationError,

#[error("This MembershipStorage is Read-only")]
ReadOnly(String),
}
Expand All @@ -90,6 +96,20 @@ impl From<sqlx::Error> for MembershipError {
}
}

#[cfg(feature = "redis")]
impl From<redis::RedisError> for MembershipError {
fn from(err: redis::RedisError) -> Self {
MembershipError::Upstream(err.to_string())
}
}

#[cfg(feature = "redis")]
impl From<bb8::RunError<redis::RedisError>> for MembershipError {
fn from(err: bb8::RunError<redis::RedisError>) -> Self {
MembershipError::Upstream(err.to_string())
}
}

/// Error type for the serve function of the cluster provider algorith trait
/// ([crate::cluster::membership_protocol::ClusterProvider])
#[derive(Error, Debug, PartialEq, Eq)]
Expand Down
14 changes: 11 additions & 3 deletions rio-rs/src/object_placement/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use std::collections::HashSet;

use async_trait::async_trait;
use bb8::Builder;
use bb8_redis::{bb8::Pool, redis::AsyncCommands, RedisConnectionManager};
use redis::RedisError;

use super::{ObjectPlacement, ObjectPlacementItem};
use crate::errors::ObjectPlacementError;
Expand All @@ -16,12 +18,18 @@ pub struct RedisObjectPlacement {
}

impl RedisObjectPlacement {
pub async fn from_connect_string(connection_string: &str, key_prefix: Option<String>) -> Self {
let conn_manager = RedisConnectionManager::new(connection_string).expect("TODO");
let pool = Pool::builder().build(conn_manager).await.expect("TODO");
pub fn new(pool: Pool<RedisConnectionManager>, key_prefix: Option<String>) -> Self {
let key_prefix = key_prefix.unwrap_or_default();
Self { pool, key_prefix }
}

pub fn pool() -> Builder<RedisConnectionManager> {
Pool::builder()
}

pub fn connection_manager(url: impl ToString) -> Result<RedisConnectionManager, RedisError> {
RedisConnectionManager::new(url.to_string())
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion rio-rs/src/object_placement/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ mod test {
.max_connections(5)
.connect("sqlite::memory:")
.await
.expect("TODO: Connection failure")
.expect("Connection failure")
}

async fn object_placement_provider() -> (SqlitePool, impl ObjectPlacement) {
Expand Down
3 changes: 3 additions & 0 deletions rio-rs/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ pub enum RequestError<E: std::error::Error> {
#[error("client error")]
ClientError(ClientError),

#[error("message serialization error")]
SerializationError,

#[error("application error")]
ApplicationError(E),
}
Expand Down
5 changes: 4 additions & 1 deletion rio-rs/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ impl Registry {
// We do this to support 'custom' error types for each one of the Handler's
// implementation
let ret = handler_result.map_err(|err| {
let ser_err = bincode::serialize(&err).expect("TODO");
let ser_err = bincode::serialize(&err).unwrap_or_else(|_| {
tracing::error!("Error to serialize handler error");
vec![]
});
HandlerError::ApplicationError(ser_err)
})?;

Expand Down
6 changes: 3 additions & 3 deletions rio-rs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ where
this.check_address_mismatch(server_address).await?;
// TODO deal with redirect
this.start_service_object(&req.handler_type, &req.handler_id)
.await
.expect("TODO");
.await?;
let receiver = this
.app_data
.get_or_default::<MessageRouter>()
Expand Down Expand Up @@ -326,7 +325,8 @@ impl<S: MembershipStorage + 'static, P: ObjectPlacement + 'static> Service<S, P>
let lifecycle_result = {
let object_guard = self.registry.read().await;
let lifecycle_msg = LifecycleMessage::Load;
let lifecycle_ser_msg = bincode::serialize(&lifecycle_msg).expect("TODO");
let lifecycle_ser_msg = bincode::serialize(&lifecycle_msg)
.map_err(|e| ResponseError::SeralizationError(e.to_string()))?;
let lifecycle_fut = object_guard.send(
handler_type,
handler_id,
Expand Down
8 changes: 6 additions & 2 deletions rio-rs/src/service_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::error;

use crate::app_data::AppData;
use crate::errors::ServiceObjectLifeCycleError;
Expand Down Expand Up @@ -60,7 +61,7 @@ pub trait ServiceObject: Default + WithId + IdentifiableType {
V: Serialize + IdentifiableType + Send + Sync,
{
let client = app_data.get::<InternalClientSender>();
let payload = bincode::serialize(&payload).expect("TODO");
let payload = bincode::serialize(&payload).map_err(|_| RequestError::SerializationError)?;
let request = RequestEnvelope::new(
handler_type_id.to_string(),
handler_id.to_string(),
Expand Down Expand Up @@ -107,7 +108,10 @@ pub trait ServiceObject: Default + WithId + IdentifiableType {
Self::user_defined_type_id().to_string(),
self.id().to_string(),
))
.expect("TODO");
.map_err(|err| {
error!("{}", err);
ServiceObjectLifeCycleError::Shutdown
})?;
Ok(())
}
}
Expand Down
Loading