diff --git a/examples/black-jack/src/services/table.rs b/examples/black-jack/src/services/table.rs index 78ec5e8..ae9735e 100644 --- a/examples/black-jack/src/services/table.rs +++ b/examples/black-jack/src/services/table.rs @@ -90,8 +90,8 @@ impl GameTable { fn send_player_command(&mut self, command: GameServerRequest) -> GameServerResponse { if let (Some(tx), Some(rx)) = (&self.request_tx, &self.response_rx) { - tx.send(command).expect("TODO"); - rx.recv().expect("TODO") + tx.send(command).expect("send command"); + rx.recv().expect("receive") } else { panic!("GameServer not initialized"); } diff --git a/examples/custom-storage/src/services.rs b/examples/custom-storage/src/services.rs index fa033dc..33f88f9 100644 --- a/examples/custom-storage/src/services.rs +++ b/examples/custom-storage/src/services.rs @@ -48,7 +48,7 @@ impl Handler for Room { ) -> Result { self.state.request_count += 1; let state_saver = app_data.get::(); - self.save_state(state_saver).await.expect("TODO"); + self.save_state(state_saver).await.expect("save state"); Ok(messages::Pong { ping_id: message.ping_id, request_count: self.state.request_count, diff --git a/examples/metric-aggregator/src/services.rs b/examples/metric-aggregator/src/services.rs index b16e799..359230c 100644 --- a/examples/metric-aggregator/src/services.rs +++ b/examples/metric-aggregator/src/services.rs @@ -143,7 +143,7 @@ impl Handler for MetricAggregator { app_data: Arc, ) -> Result { println!("got shudown"); - self.shutdown(app_data).await.expect("TODO shutdown"); + self.shutdown(app_data).await.expect("Shutdown Message"); Ok(()) } } diff --git a/examples/observability/src/bin/observability_server.rs b/examples/observability/src/bin/observability_server.rs index 0f7a528..e053e24 100644 --- a/examples/observability/src/bin/observability_server.rs +++ b/examples/observability/src/bin/observability_server.rs @@ -110,7 +110,7 @@ async fn main() { .max_connections(num_cpus) .connect("sqlite:///tmp/state.sqlite3?mode=rwc") .await - .expect("TODO: Connection failure"); + .expect("Connection failure"); let sql_state = SqliteState::new(sql_state_pool); StateSaver::<()>::prepare(&sql_state).await; server.app_data(sql_state); diff --git a/examples/ping-pong/src/bin/ping_pong_server.rs b/examples/ping-pong/src/bin/ping_pong_server.rs index 41f390b..d46fbe2 100644 --- a/examples/ping-pong/src/bin/ping_pong_server.rs +++ b/examples/ping-pong/src/bin/ping_pong_server.rs @@ -75,7 +75,7 @@ async fn main() { .max_connections(num_cpus) .connect("sqlite:///tmp/state.sqlite3?mode=rwc") .await - .expect("TODO: Connection failure"); + .expect("Connection failure"); let sql_state = SqliteState::new(sql_state_pool); StateSaver::<()>::prepare(&sql_state).await; server.app_data(sql_state); diff --git a/examples/presence/src/services.rs b/examples/presence/src/services.rs index 94a4942..ecf63e5 100644 --- a/examples/presence/src/services.rs +++ b/examples/presence/src/services.rs @@ -48,7 +48,7 @@ impl ServiceObject for PresenceService { "PresenceService".to_string(), self_id.clone(), )) - .expect("TODO"); + .expect("Shutdown Command"); }); Ok(()) } diff --git a/rio-rs/src/cluster/storage/http.rs b/rio-rs/src/cluster/storage/http.rs index b64c1ef..2449f1e 100644 --- a/rio-rs/src/cluster/storage/http.rs +++ b/rio-rs/src/cluster/storage/http.rs @@ -55,7 +55,7 @@ async fn list_members( .inner .members() .await - .expect("TODO") + .expect("list members error") .iter() .map(|x| HttpMember { active: x.active, diff --git a/rio-rs/src/cluster/storage/redis.rs b/rio-rs/src/cluster/storage/redis.rs index 28d72ab..30189fa 100644 --- a/rio-rs/src/cluster/storage/redis.rs +++ b/rio-rs/src/cluster/storage/redis.rs @@ -1,9 +1,12 @@ use std::fmt::Display; use async_trait::async_trait; +use bb8::Builder; use bb8_redis::{bb8::Pool, RedisConnectionManager}; use chrono::{DateTime, Utc}; -use redis::AsyncCommands; +use redis::{AsyncCommands, RedisError}; + +use crate::errors::MembershipError; use super::{Member, MembershipResult, MembershipStorage, MembershipUnitResult}; @@ -24,11 +27,17 @@ impl RedisMembershipStorage { } impl RedisMembershipStorage { - pub async fn from_connect_string(connection_string: &str, key_prefix: Option) -> Self { - let conn_manager = RedisConnectionManager::new(connection_string).expect("TODO"); - let pool = Pool::builder().build(conn_manager).await.expect("TODO"); + pub fn new(pool: Pool, key_prefix: Option) -> Self { let key_prefix = key_prefix.unwrap_or_default(); - RedisMembershipStorage { pool, key_prefix } + Self { pool, key_prefix } + } + + pub fn pool() -> Builder { + Pool::builder() + } + + pub fn connection_manager(url: impl ToString) -> Result { + RedisConnectionManager::new(url.to_string()) } } @@ -47,54 +56,60 @@ fn member_to_string(member: &Member) -> String { member } -fn parse_member(member: &str) -> Member { +fn parse_member(member: &str) -> MembershipResult { let mut split_member = member.split(";"); - let ip = split_member.next().expect("TODO").to_string(); - let port = split_member.next().expect("TODO").to_string(); + let ip = split_member + .next() + .ok_or(MembershipError::DeserializationError)? + .to_string(); + let port = split_member + .next() + .ok_or(MembershipError::DeserializationError)? + .to_string(); let mut parsed_member = Member::new(ip, port); parsed_member.active = split_member .next() - .expect("TODO next") + .ok_or(MembershipError::DeserializationError)? .parse() - .expect("TODO parse"); - let last_seen = split_member.next().expect("TODO"); + .map_err(|_| MembershipError::DeserializationError)?; + let last_seen = split_member + .next() + .ok_or(MembershipError::DeserializationError)?; parsed_member.last_seen = DateTime::parse_from_rfc3339(last_seen) - .expect("TODO") + .map_err(|_| MembershipError::DeserializationError)? .to_utc(); - parsed_member + Ok(parsed_member) } #[async_trait] impl MembershipStorage for RedisMembershipStorage { async fn push(&self, member: Member) -> MembershipUnitResult { - let mut client = self.pool.get().await.expect("TODO"); + let mut client = self.pool.get().await?; let member_key = member_key(&member.ip, &member.port); let member_val = member_to_string(&member); let key = self.members_key(); - let _: () = client - .hset(&key, member_key, member_val) - .await - .expect("TODO"); + let _: () = client.hset(&key, member_key, member_val).await?; Ok(()) } async fn remove(&self, ip: &str, port: &str) -> MembershipUnitResult { - let mut client = self.pool.get().await.expect("TODO"); + let mut client = self.pool.get().await?; let member_key = member_key(ip, port); let key = self.members_key(); - let _: () = client.hdel(&key, member_key).await.expect("TODO"); + let _: () = client.hdel(&key, member_key).await?; Ok(()) } async fn set_is_active(&self, ip: &str, port: &str, is_active: bool) -> MembershipUnitResult { let last_seen = Utc::now(); - let mut client = self.pool.get().await.expect("TODO"); + let mut client = self.pool.get().await?; let member_key = member_key(ip, port); let key = self.members_key(); - let raw_member: Option = client.hget(&key, &member_key).await.expect("TODO"); + let raw_member: Option = client.hget(&key, &member_key).await?; let mut member = raw_member .map(|x| parse_member(&x)) + .transpose()? .unwrap_or_else(|| Member::new(ip.to_string(), port.to_string())); if is_active { member.last_seen = last_seen; @@ -105,34 +120,41 @@ impl MembershipStorage for RedisMembershipStorage { } async fn members(&self) -> MembershipResult> { - let mut client = self.pool.get().await.expect("TODO"); + let mut client = self.pool.get().await?; let key = self.members_key(); - let members_raw: Vec<(String, String)> = client.hgetall(&key).await.expect("TODO"); - let members: Vec = members_raw.iter().map(|(_, x)| parse_member(x)).collect(); + let members_raw: Vec<(String, String)> = client.hgetall(&key).await?; + let members: Vec = members_raw + .iter() + .map(|(_, x)| parse_member(x)) + .collect::>>()?; Ok(members) } async fn notify_failure(&self, ip: &str, port: &str) -> MembershipUnitResult { - let mut client = self.pool.get().await.expect("TODO"); + let mut client = self.pool.get().await?; let key = self.member_failures_key(ip, port); let now = chrono::Local::now().to_utc(); let ts = now.timestamp(); - let _: () = client.rpush(&key, ts).await.expect("TODO"); - let _: () = client.ltrim(&key, 0, 1_000).await.expect("TODO"); + let _: () = client.rpush(&key, ts).await?; + let _: () = client.ltrim(&key, 0, 1_000).await?; Ok(()) } async fn member_failures(&self, ip: &str, port: &str) -> MembershipResult>> { - let mut client = self.pool.get().await.expect("TODO"); + let mut client = self.pool.get().await?; let key = self.member_failures_key(ip, port); - let values: Vec = client.lrange(&key, 0, -1).await.expect("TODO"); + let values: Vec = client.lrange(&key, 0, -1).await?; let parsed_values = values .iter() .map(|x| { - let ts: i64 = x.parse().expect("TODO"); - DateTime::from_timestamp(ts, 0).expect("TODO").to_utc() + let ts: i64 = x + .parse() + .map_err(|_| MembershipError::DeserializationError)?; + DateTime::from_timestamp(ts, 0) + .map(|dt| dt.to_utc()) + .ok_or(MembershipError::DeserializationError) }) - .collect(); + .collect::>>()?; Ok(parsed_values) } } diff --git a/rio-rs/src/cluster/storage/sqlite.rs b/rio-rs/src/cluster/storage/sqlite.rs index edf6f83..6c690bf 100644 --- a/rio-rs/src/cluster/storage/sqlite.rs +++ b/rio-rs/src/cluster/storage/sqlite.rs @@ -187,7 +187,7 @@ mod test { let pool = SqliteMembershipStorage::pool() .connect("sqlite::memory:") .await - .expect("TODO: Connection failure"); + .expect("Connection failure"); let members_storage = SqliteMembershipStorage::new(pool); members_storage.prepare().await; members_storage diff --git a/rio-rs/src/errors.rs b/rio-rs/src/errors.rs index 088aa04..db69af8 100644 --- a/rio-rs/src/errors.rs +++ b/rio-rs/src/errors.rs @@ -34,6 +34,9 @@ pub enum HandlerError { pub enum ServiceObjectLifeCycleError { #[error("unknown error")] Unknown, + + #[error("fail to shutdown properly")] + Shutdown, } /// Errors triggered while building an [crate::client::Client] using @@ -79,6 +82,9 @@ pub enum MembershipError { #[error("unknown")] Unknown(String), + #[error("deserialization error")] + DeserializationError, + #[error("This MembershipStorage is Read-only")] ReadOnly(String), } @@ -90,6 +96,20 @@ impl From for MembershipError { } } +#[cfg(feature = "redis")] +impl From for MembershipError { + fn from(err: redis::RedisError) -> Self { + MembershipError::Upstream(err.to_string()) + } +} + +#[cfg(feature = "redis")] +impl From> for MembershipError { + fn from(err: bb8::RunError) -> Self { + MembershipError::Upstream(err.to_string()) + } +} + /// Error type for the serve function of the cluster provider algorith trait /// ([crate::cluster::membership_protocol::ClusterProvider]) #[derive(Error, Debug, PartialEq, Eq)] diff --git a/rio-rs/src/object_placement/redis.rs b/rio-rs/src/object_placement/redis.rs index a887e25..6aae601 100644 --- a/rio-rs/src/object_placement/redis.rs +++ b/rio-rs/src/object_placement/redis.rs @@ -3,7 +3,9 @@ use std::collections::HashSet; use async_trait::async_trait; +use bb8::Builder; use bb8_redis::{bb8::Pool, redis::AsyncCommands, RedisConnectionManager}; +use redis::RedisError; use super::{ObjectPlacement, ObjectPlacementItem}; use crate::errors::ObjectPlacementError; @@ -16,12 +18,18 @@ pub struct RedisObjectPlacement { } impl RedisObjectPlacement { - pub async fn from_connect_string(connection_string: &str, key_prefix: Option) -> Self { - let conn_manager = RedisConnectionManager::new(connection_string).expect("TODO"); - let pool = Pool::builder().build(conn_manager).await.expect("TODO"); + pub fn new(pool: Pool, key_prefix: Option) -> Self { let key_prefix = key_prefix.unwrap_or_default(); Self { pool, key_prefix } } + + pub fn pool() -> Builder { + Pool::builder() + } + + pub fn connection_manager(url: impl ToString) -> Result { + RedisConnectionManager::new(url.to_string()) + } } #[async_trait] diff --git a/rio-rs/src/object_placement/sqlite.rs b/rio-rs/src/object_placement/sqlite.rs index 56d3d11..9c37ff7 100644 --- a/rio-rs/src/object_placement/sqlite.rs +++ b/rio-rs/src/object_placement/sqlite.rs @@ -136,7 +136,7 @@ mod test { .max_connections(5) .connect("sqlite::memory:") .await - .expect("TODO: Connection failure") + .expect("Connection failure") } async fn object_placement_provider() -> (SqlitePool, impl ObjectPlacement) { diff --git a/rio-rs/src/protocol.rs b/rio-rs/src/protocol.rs index 441aac3..5967ba0 100644 --- a/rio-rs/src/protocol.rs +++ b/rio-rs/src/protocol.rs @@ -178,6 +178,9 @@ pub enum RequestError { #[error("client error")] ClientError(ClientError), + #[error("message serialization error")] + SerializationError, + #[error("application error")] ApplicationError(E), } diff --git a/rio-rs/src/registry/mod.rs b/rio-rs/src/registry/mod.rs index 9695531..3a07010 100644 --- a/rio-rs/src/registry/mod.rs +++ b/rio-rs/src/registry/mod.rs @@ -163,7 +163,10 @@ impl Registry { // We do this to support 'custom' error types for each one of the Handler's // implementation let ret = handler_result.map_err(|err| { - let ser_err = bincode::serialize(&err).expect("TODO"); + let ser_err = bincode::serialize(&err).unwrap_or_else(|_| { + tracing::error!("Error to serialize handler error"); + vec![] + }); HandlerError::ApplicationError(ser_err) })?; diff --git a/rio-rs/src/service.rs b/rio-rs/src/service.rs index 52d0332..67a9e5d 100644 --- a/rio-rs/src/service.rs +++ b/rio-rs/src/service.rs @@ -174,8 +174,7 @@ where this.check_address_mismatch(server_address).await?; // TODO deal with redirect this.start_service_object(&req.handler_type, &req.handler_id) - .await - .expect("TODO"); + .await?; let receiver = this .app_data .get_or_default::() @@ -326,7 +325,8 @@ impl Service let lifecycle_result = { let object_guard = self.registry.read().await; let lifecycle_msg = LifecycleMessage::Load; - let lifecycle_ser_msg = bincode::serialize(&lifecycle_msg).expect("TODO"); + let lifecycle_ser_msg = bincode::serialize(&lifecycle_msg) + .map_err(|e| ResponseError::SeralizationError(e.to_string()))?; let lifecycle_fut = object_guard.send( handler_type, handler_id, diff --git a/rio-rs/src/service_object.rs b/rio-rs/src/service_object.rs index 33a4ab4..b7cf4fa 100644 --- a/rio-rs/src/service_object.rs +++ b/rio-rs/src/service_object.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tracing::error; use crate::app_data::AppData; use crate::errors::ServiceObjectLifeCycleError; @@ -60,7 +61,7 @@ pub trait ServiceObject: Default + WithId + IdentifiableType { V: Serialize + IdentifiableType + Send + Sync, { let client = app_data.get::(); - let payload = bincode::serialize(&payload).expect("TODO"); + let payload = bincode::serialize(&payload).map_err(|_| RequestError::SerializationError)?; let request = RequestEnvelope::new( handler_type_id.to_string(), handler_id.to_string(), @@ -107,7 +108,10 @@ pub trait ServiceObject: Default + WithId + IdentifiableType { Self::user_defined_type_id().to_string(), self.id().to_string(), )) - .expect("TODO"); + .map_err(|err| { + error!("{}", err); + ServiceObjectLifeCycleError::Shutdown + })?; Ok(()) } } diff --git a/rio-rs/src/state/redis.rs b/rio-rs/src/state/redis.rs index 8cb15ea..485119f 100644 --- a/rio-rs/src/state/redis.rs +++ b/rio-rs/src/state/redis.rs @@ -1,5 +1,7 @@ use async_trait::async_trait; +use bb8::Builder; use bb8_redis::{bb8::Pool, redis::AsyncCommands, RedisConnectionManager}; +use redis::RedisError; use serde::de::DeserializeOwned; use serde::Serialize; @@ -14,12 +16,18 @@ pub struct RedisState { } impl RedisState { - pub async fn from_connect_string(connection_string: &str, key_prefix: Option) -> Self { - let conn_manager = RedisConnectionManager::new(connection_string).expect("TODO"); - let pool = Pool::builder().build(conn_manager).await.expect("TODO"); + pub fn new(pool: Pool, key_prefix: Option) -> Self { let key_prefix = key_prefix.unwrap_or_default(); Self { pool, key_prefix } } + + pub fn pool() -> Builder { + Pool::builder() + } + + pub fn connection_manager(url: impl ToString) -> Result { + RedisConnectionManager::new(url.to_string()) + } } #[async_trait] @@ -38,7 +46,10 @@ impl StateLoader for RedisState { self.key_prefix, object_kind, object_id, state_type ); let mut client = self.pool.get().await.map_err(|_| LoadStateError::Unknown)?; - let se_data: Option = client.get(key).await.expect("TODO"); + let se_data: Option = client.get(key).await.map_err(|e| { + tracing::error!("Error fetching state from Redis: {}", e); + LoadStateError::ObjectNotFound + })?; if let Some(x) = se_data { let data = serde_json::from_str(&x); data.map_err(|_| LoadStateError::DeserializationError) diff --git a/rio-rs/tests/cluster_storage_backend.rs b/rio-rs/tests/cluster_storage_backend.rs index 7ee29ad..53353a6 100644 --- a/rio-rs/tests/cluster_storage_backend.rs +++ b/rio-rs/tests/cluster_storage_backend.rs @@ -45,18 +45,26 @@ mod redis { #[tokio::test] async fn members_sanity_check() { let prefix = chrono::Local::now().timestamp().to_string(); - let storage = - RedisMembershipStorage::from_connect_string("redis://localhost:16379", Some(prefix)) - .await; + let conn_manager = + RedisMembershipStorage::connection_manager("redis://localhost:16379").unwrap(); + let pool = RedisMembershipStorage::pool() + .build(conn_manager) + .await + .unwrap(); + let storage = RedisMembershipStorage::new(pool, Some(prefix)); super::members_sanity_check(storage).await; } #[tokio::test] async fn failures_sanity_check() { let prefix = chrono::Local::now().timestamp().to_string(); - let storage = - RedisMembershipStorage::from_connect_string("redis://localhost:16379", Some(prefix)) - .await; + let conn_manager = + RedisMembershipStorage::connection_manager("redis://localhost:16379").unwrap(); + let pool = RedisMembershipStorage::pool() + .build(conn_manager) + .await + .unwrap(); + let storage = RedisMembershipStorage::new(pool, Some(prefix)); super::failures_sanity_check(storage).await; } } diff --git a/rio-rs/tests/object_placement_backend.rs b/rio-rs/tests/object_placement_backend.rs index cbe6415..86de024 100644 --- a/rio-rs/tests/object_placement_backend.rs +++ b/rio-rs/tests/object_placement_backend.rs @@ -37,19 +37,27 @@ mod redis { #[tokio::test] async fn no_placement() { - let prefix = chrono::Local::now().timestamp_micros().to_string(); - let provider = - RedisObjectPlacement::from_connect_string("redis://localhost:16379", Some(prefix)) - .await; + let prefix = chrono::Local::now().timestamp().to_string(); + let conn_manager = + RedisObjectPlacement::connection_manager("redis://localhost:16379").unwrap(); + let pool = RedisObjectPlacement::pool() + .build(conn_manager) + .await + .unwrap(); + let provider = RedisObjectPlacement::new(pool, Some(prefix)); super::no_placement(provider).await; } #[tokio::test] async fn save_and_load() { - let prefix = chrono::Local::now().timestamp_micros().to_string(); - let provider = - RedisObjectPlacement::from_connect_string("redis://localhost:16379", Some(prefix)) - .await; + let prefix = chrono::Local::now().timestamp().to_string(); + let conn_manager = + RedisObjectPlacement::connection_manager("redis://localhost:16379").unwrap(); + let pool = RedisObjectPlacement::pool() + .build(conn_manager) + .await + .unwrap(); + let provider = RedisObjectPlacement::new(pool, Some(prefix)); super::save_and_load(provider).await; } } diff --git a/rio-rs/tests/state.rs b/rio-rs/tests/state.rs index 7129c5c..dc12025 100644 --- a/rio-rs/tests/state.rs +++ b/rio-rs/tests/state.rs @@ -44,17 +44,19 @@ mod redis { #[tokio::test] async fn state_save_sanity_check() { - let prefix = chrono::Local::now().timestamp_micros().to_string(); - let storage = - RedisState::from_connect_string("redis://localhost:16379", Some(prefix)).await; + let prefix = chrono::Local::now().timestamp().to_string(); + let conn_manager = RedisState::connection_manager("redis://localhost:16379").unwrap(); + let pool = RedisState::pool().build(conn_manager).await.unwrap(); + let storage = RedisState::new(pool, Some(prefix)); super::state_save_sanity_check(storage).await; } #[tokio::test] async fn state_load_not_found() { - let prefix = chrono::Local::now().timestamp_micros().to_string(); - let storage = - RedisState::from_connect_string("redis://localhost:16379", Some(prefix)).await; + let prefix = chrono::Local::now().timestamp().to_string(); + let conn_manager = RedisState::connection_manager("redis://localhost:16379").unwrap(); + let pool = RedisState::pool().build(conn_manager).await.unwrap(); + let storage = RedisState::new(pool, Some(prefix)); super::state_load_not_found(storage).await; } }