diff --git a/examples/demo.rs b/examples/demo.rs index b86b5dc..a25fdc9 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, env, thread, time::Duration}; +use std::{collections::HashMap, env, io::Write, thread, time::Duration}; use appconfiguration::{ - AppConfigurationClient, AppConfigurationClientIBMCloud, ConfigurationId, Entity, Feature, - OfflineMode, Property, Value, + AppConfigurationClient, AppConfigurationClientIBMCloud, ConfigurationId, ConfigurationProvider, + Entity, Feature, OfflineMode, Property, Value, }; use dotenvy::dotenv; use std::error::Error; @@ -54,6 +54,10 @@ fn main() -> std::result::Result<(), Box> { let configuration = ConfigurationId::new(guid, environment_id, collection_id); let client = AppConfigurationClientIBMCloud::new(&apikey, ®ion, configuration, OfflineMode::Fail)?; + print!("Waiting for initial data..."); + std::io::stdout().flush().unwrap(); + client.wait_until_online(); + println!(" DONE"); let entity = CustomerEntity { id: "user123".to_string(), diff --git a/src/client/app_configuration_client.rs b/src/client/app_configuration_client.rs index 4afc0de..5218388 100644 --- a/src/client/app_configuration_client.rs +++ b/src/client/app_configuration_client.rs @@ -67,6 +67,9 @@ pub trait ConfigurationProvider { /// For remote configurations, it returns whether it's connected to the /// remote or not fn is_online(&self) -> Result; + + /// For remote configurations: Blocks until it's connected to the remote. + fn wait_until_online(&self); } /// AppConfiguration client for browsing, and evaluating features and properties. diff --git a/src/client/app_configuration_http.rs b/src/client/app_configuration_http.rs index 887a9ba..8e402d8 100644 --- a/src/client/app_configuration_http.rs +++ b/src/client/app_configuration_http.rs @@ -92,6 +92,10 @@ impl ConfigurationProvider for AppConfigurationClientHttp< fn is_online(&self) -> Result { self.live_configuration.is_online() } + + fn wait_until_online(&self) { + self.live_configuration.wait_until_online(); + } } #[cfg(test)] @@ -131,6 +135,10 @@ mod tests { fn is_online(&self) -> Result { todo!() } + + fn wait_until_online(&self) { + todo!() + } } impl LiveConfiguration for LiveConfigurationMock { fn get_thread_status( diff --git a/src/client/app_configuration_ibm_cloud.rs b/src/client/app_configuration_ibm_cloud.rs index 5c3126d..2c44052 100644 --- a/src/client/app_configuration_ibm_cloud.rs +++ b/src/client/app_configuration_ibm_cloud.rs @@ -86,6 +86,10 @@ impl ConfigurationProvider for AppConfigurationClientIBMCloud { fn is_online(&self) -> Result { self.client.is_online() } + + fn wait_until_online(&self) { + self.client.wait_until_online(); + } } #[cfg(test)] diff --git a/src/client/app_configuration_offline.rs b/src/client/app_configuration_offline.rs index 82b6ba0..5638286 100644 --- a/src/client/app_configuration_offline.rs +++ b/src/client/app_configuration_offline.rs @@ -15,6 +15,7 @@ use crate::errors::Result; use crate::models::{Configuration, FeatureSnapshot, PropertySnapshot}; use crate::ConfigurationProvider; +use log::error; /// AppConfiguration client using a local file with a configuration snapshot #[derive(Debug)] @@ -55,4 +56,9 @@ impl ConfigurationProvider for AppConfigurationOffline { fn is_online(&self) -> Result { Ok(false) } + + fn wait_until_online(&self) { + error!("Waiting for AppConfigurationOffline to get online. This will never happen."); + std::thread::park(); // block forever + } } diff --git a/src/models/configuration.rs b/src/models/configuration.rs index 3f33569..4b0bc66 100644 --- a/src/models/configuration.rs +++ b/src/models/configuration.rs @@ -22,6 +22,7 @@ use crate::ConfigurationDataError; use super::feature_snapshot::FeatureSnapshot; use super::property_snapshot::PropertySnapshot; use crate::ConfigurationProvider; +use log::error; /// Represents all the configuration data needed for the client to perform /// feature/propery evaluation. @@ -192,6 +193,11 @@ impl ConfigurationProvider for Configuration { fn is_online(&self) -> Result { Ok(false) } + + fn wait_until_online(&self) { + error!("Waiting for Configuration to get online. This will never happen."); + std::thread::park(); // block forever + } } #[cfg(test)] diff --git a/src/network/live_configuration/live_configuration.rs b/src/network/live_configuration/live_configuration.rs index a1f79fc..83c3f5d 100644 --- a/src/network/live_configuration/live_configuration.rs +++ b/src/network/live_configuration/live_configuration.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use super::current_mode::CurrentModeOfflineReason; use super::update_thread_worker::UpdateThreadWorker; use super::{CurrentMode, Error, OfflineMode, Result}; use crate::models::Configuration; use crate::network::http_client::ServerClient; +use crate::network::live_configuration::current_mode; use crate::utils::{ThreadHandle, ThreadStatus}; -use crate::{ConfigurationId, ConfigurationProvider}; +use crate::{AppConfigurationOffline, ConfigurationId, ConfigurationProvider}; /// A [`ConfigurationProvider`] that keeps the configuration updated with some /// third-party source using an asyncronous mechanism. @@ -40,7 +41,7 @@ pub(crate) struct LiveConfigurationImpl { configuration: Arc>>, /// Current operation mode. - current_mode: Arc>, + current_mode: Arc<(Mutex, Condvar)>, /// Handler to the internal thread that takes care of updating the [`LiveConfigurationImpl::configuration`]. update_thread: ThreadHandle>, @@ -58,9 +59,10 @@ impl LiveConfigurationImpl { configuration_id: ConfigurationId, ) -> Self { let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( server_client, @@ -84,7 +86,8 @@ impl LiveConfigurationImpl { /// configured for this object. fn get_configuration(&self) -> Result { // TODO: Can we return a reference instead? - match &*self.current_mode.lock()? { + let (current_mode_mutex, _) = &*self.current_mode; + match &*current_mode_mutex.lock()? { CurrentMode::Online => { match &*self.configuration.lock()? { // We store the configuration retrieved from the server into the Arc before switching the flag to Online @@ -122,6 +125,8 @@ impl LiveConfigurationImpl { } } +fn assert_type(_val: T) {} + impl ConfigurationProvider for LiveConfigurationImpl { fn get_feature_ids(&self) -> crate::Result> { self.get_configuration()?.get_feature_ids() @@ -142,6 +147,16 @@ impl ConfigurationProvider for LiveConfigurationImpl { fn is_online(&self) -> crate::Result { Ok(self.get_current_mode()? == CurrentMode::Online) } + + fn wait_until_online(&self) { + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let current_mode_guard = current_mode_mutex.lock().unwrap(); + let _guard = condition_variable + .wait_while(current_mode_guard, |current_mode| { + *current_mode != CurrentMode::Online + }) + .unwrap(); + } } impl LiveConfiguration for LiveConfigurationImpl { @@ -150,7 +165,8 @@ impl LiveConfiguration for LiveConfigurationImpl { } fn get_current_mode(&self) -> Result { - Ok(self.current_mode.lock()?.clone()) + let (current_mode_mutex, _) = &*self.current_mode; + Ok(current_mode_mutex.lock()?.clone()) } } @@ -323,7 +339,7 @@ mod tests { let mut cfg = LiveConfigurationImpl { configuration: Arc::new(Mutex::new(Some(Configuration::default()))), offline_mode: OfflineMode::Fail, - current_mode: Arc::new(Mutex::new(CurrentMode::Online)), + current_mode: Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())), update_thread: ThreadHandle { _thread_termination_sender: tx, thread_handle: None, @@ -364,9 +380,12 @@ mod tests { let mut cfg = LiveConfigurationImpl { offline_mode: OfflineMode::Fail, configuration: Arc::new(Mutex::new(Some(Configuration::default()))), - current_mode: Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::WebsocketClosed, - ))), + current_mode: Arc::new(( + Mutex::new(CurrentMode::Offline( + CurrentModeOfflineReason::WebsocketClosed, + )), + Condvar::new(), + )), update_thread: ThreadHandle { _thread_termination_sender: tx, thread_handle: None, @@ -419,7 +438,7 @@ mod tests { let mut cfg = LiveConfigurationImpl { offline_mode: OfflineMode::Fail, configuration: Arc::new(Mutex::new(Some(Configuration::default()))), - current_mode: Arc::new(Mutex::new(CurrentMode::Defunct(Ok(())))), + current_mode: Arc::new((Mutex::new(CurrentMode::Defunct(Ok(()))), Condvar::new())), update_thread: ThreadHandle { _thread_termination_sender: tx, thread_handle: None, diff --git a/src/network/live_configuration/update_thread_worker.rs b/src/network/live_configuration/update_thread_worker.rs index aa09808..33a2b22 100644 --- a/src/network/live_configuration/update_thread_worker.rs +++ b/src/network/live_configuration/update_thread_worker.rs @@ -13,13 +13,14 @@ // limitations under the License. use std::sync::mpsc::Receiver; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use super::current_mode::CurrentModeOfflineReason; use super::CurrentMode; use super::{Error, Result}; use crate::models::Configuration; use crate::network::http_client::{ServerClient, WebsocketReader}; +use crate::network::live_configuration::current_mode; use crate::network::NetworkError; use crate::ConfigurationId; @@ -29,7 +30,7 @@ pub(crate) struct UpdateThreadWorker { server_client: T, configuration_id: ConfigurationId, configuration: Arc>>, - current_mode: Arc>, + current_mode: Arc<(Mutex, Condvar)>, } impl UpdateThreadWorker { @@ -37,7 +38,7 @@ impl UpdateThreadWorker { server_client: T, configuration_id: ConfigurationId, configuration: Arc>>, - current_mode: Arc>, + current_mode: Arc<(Mutex, Condvar)>, ) -> Self { Self { server_client, @@ -93,7 +94,10 @@ impl UpdateThreadWorker { /// [`UpdateThreadWorker::current_mode`] is set to [`CurrentMode::Defunct`]. pub(crate) fn run(&self, thread_termination_receiver: Receiver<()>) -> Result<()> { let result = self.run_internal(thread_termination_receiver); - *self.current_mode.lock().unwrap() = CurrentMode::Defunct(result.clone()); + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let mut current_mode = current_mode_mutex.lock().unwrap(); + *current_mode = CurrentMode::Defunct(result.clone()); + condition_variable.notify_all(); result } @@ -103,14 +107,25 @@ impl UpdateThreadWorker { fn update_configuration_from_server_and_current_mode(&self) -> Result<()> { match self.server_client.get_configuration(&self.configuration_id) { Ok(config) => { - *self.configuration.lock()? = Some(config); - *self.current_mode.lock()? = CurrentMode::Online; + let mut current_config = self.configuration.lock()?; + *current_config = Some(config); + + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let mut current_mode = current_mode_mutex.lock()?; + *current_mode = CurrentMode::Online; + condition_variable.notify_all(); + Ok(()) } Err(e) => { Self::recoverable_error(e)?; - *self.current_mode.lock()? = + + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let mut current_mode = current_mode_mutex.lock()?; + *current_mode = CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration); + condition_variable.notify_all(); + Ok(()) } } @@ -125,10 +140,11 @@ impl UpdateThreadWorker { /// there is any error receiving the messages. It's up to the caller to implement /// the recovery procedure for these scenarios. fn handle_websocket_message(&self, mut socket: WS) -> Result> { + let (current_mode_mutex, condition_variable) = &*self.current_mode; match socket.read_msg() { Ok(msg) => match msg { tungstenite::Message::Text(utf8_bytes) => { - let current_mode_clone = self.current_mode.lock()?.clone(); + let current_mode_clone = current_mode_mutex.lock()?.clone(); match (utf8_bytes.as_str(), current_mode_clone) { (SERVER_HEARTBEAT, CurrentMode::Offline(_)) => { self.update_configuration_from_server_and_current_mode()?; @@ -141,8 +157,9 @@ impl UpdateThreadWorker { Ok(Some(socket)) } tungstenite::Message::Close(_) => { - *self.current_mode.lock()? = + *current_mode_mutex.lock()? = CurrentMode::Offline(CurrentModeOfflineReason::WebsocketClosed); + condition_variable.notify_all(); Ok(None) } _ => { @@ -151,8 +168,9 @@ impl UpdateThreadWorker { } }, Err(_) => { - *self.current_mode.lock()? = + *current_mode_mutex.lock()? = CurrentMode::Offline(CurrentModeOfflineReason::WebsocketError); + condition_variable.notify_all(); Ok(None) } } @@ -209,9 +227,10 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -224,7 +243,7 @@ mod tests { assert!(r.is_ok()); assert!(configuration.lock().unwrap().is_some()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); } #[test] @@ -251,9 +270,10 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "not used".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -267,7 +287,7 @@ mod tests { assert!(r.is_ok()); assert!(configuration.lock().unwrap().is_none()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration) ); } @@ -293,7 +313,7 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -308,7 +328,7 @@ mod tests { assert!(r.is_ok()); assert!(configuration.lock().unwrap().is_none()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration) ); } @@ -334,7 +354,7 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -371,9 +391,10 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -386,9 +407,10 @@ mod tests { let r = worker.handle_websocket_message(WebsocketMockReader { message: Some(Ok(tungstenite::Message::text(SERVER_HEARTBEAT))), }); + assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_some()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // A repeated heartbeat should not re-fetch config (noop once online) *configuration.lock().unwrap() = None; @@ -397,7 +419,7 @@ mod tests { }); assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_none()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // Any other message type is a noop let r = worker.handle_websocket_message(WebsocketMockReader { @@ -405,7 +427,7 @@ mod tests { }); assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_none()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // any other text message should lead to a config update (None -> Some) let r = worker.handle_websocket_message(WebsocketMockReader { @@ -413,7 +435,7 @@ mod tests { }); assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_some()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // After websocket is closed, it is consumed and we are offline let r = worker.handle_websocket_message(WebsocketMockReader { @@ -422,7 +444,7 @@ mod tests { assert!(r.unwrap().is_none()); // WS consumed assert!(configuration.lock().unwrap().is_some()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::WebsocketClosed) ); } @@ -448,9 +470,10 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -474,7 +497,7 @@ mod tests { assert!(r.is_err()); // Additionally we check that a heartbeat when online is a noop - *current_mode.lock().unwrap() = CurrentMode::Online; + *current_mode.0.lock().unwrap() = CurrentMode::Online; let r = worker.handle_websocket_message(WebsocketMockReader { message: Some(Ok(tungstenite::Message::text(SERVER_HEARTBEAT))), }); @@ -502,7 +525,7 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -523,7 +546,7 @@ mod tests { // websocket read error changes current_mode to Offline assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::WebsocketError) ); } @@ -555,7 +578,7 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let (tx_serverclient_call_logs, rx_serverclient_call_logs) = std::sync::mpsc::channel(); let worker = UpdateThreadWorker::new( @@ -571,7 +594,7 @@ mod tests { let r = worker.run(rx_thread_terminator); assert!(r.is_err()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Defunct(Err(Error::UnrecoverableError("".into()))) ); @@ -612,7 +635,7 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -625,7 +648,7 @@ mod tests { let r = worker.run(rx); assert!(r.is_err()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Defunct(Err(Error::UnrecoverableError("".into()))) ); } @@ -652,7 +675,7 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -664,7 +687,10 @@ mod tests { drop(tx); let r = worker.run(rx); assert!(r.is_ok()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Defunct(Ok(()))); + assert_eq!( + *current_mode.0.lock().unwrap(), + CurrentMode::Defunct(Ok(())) + ); } #[test] @@ -689,7 +715,7 @@ mod tests { } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let (get_ws_tx, get_ws_rx) = std::sync::mpsc::channel();