From 0557b0ce04146bfe89d46bc77ce2431d62f5d6ca Mon Sep 17 00:00:00 2001 From: Travis Date: Fri, 5 Sep 2025 14:13:10 +0200 Subject: [PATCH 1/6] implement waiting for config and online (first draft) Signed-off-by: Travis --- src/client/app_configuration_client.rs | 8 +++ src/client/app_configuration_http.rs | 9 +++ src/client/app_configuration_ibm_cloud.rs | 8 +++ src/client/app_configuration_offline.rs | 11 +++ src/models/configuration.rs | 10 +++ .../live_configuration/live_configuration.rs | 71 +++++++++++++------ .../update_thread_worker.rs | 44 ++++++++---- 7 files changed, 128 insertions(+), 33 deletions(-) diff --git a/src/client/app_configuration_client.rs b/src/client/app_configuration_client.rs index 4afc0de..b5d2abf 100644 --- a/src/client/app_configuration_client.rs +++ b/src/client/app_configuration_client.rs @@ -67,6 +67,14 @@ pub trait ConfigurationProvider { /// For remote configurations, it returns whether it's connected to the /// remote or not fn is_online(&self) -> Result; + + /// For remote configurations: Blocks until it's connected to the remote. + fn wait_until_online(&self); + + /// Blocks until a configuration is available. + /// Note: This is different than wait_until_online, as configuration could be available + /// through alternate sources (Cache / Fallback) + fn wait_until_configuration_is_available(&self); } /// AppConfiguration client for browsing, and evaluating features and properties. diff --git a/src/client/app_configuration_http.rs b/src/client/app_configuration_http.rs index 887a9ba..8fc2d93 100644 --- a/src/client/app_configuration_http.rs +++ b/src/client/app_configuration_http.rs @@ -92,6 +92,15 @@ impl ConfigurationProvider for AppConfigurationClientHttp< fn is_online(&self) -> Result { self.live_configuration.is_online() } + + fn wait_until_online(&self) { + self.live_configuration.wait_until_online(); + } + + fn wait_until_configuration_is_available(&self) { + self.live_configuration + .wait_until_configuration_is_available(); + } } #[cfg(test)] diff --git a/src/client/app_configuration_ibm_cloud.rs b/src/client/app_configuration_ibm_cloud.rs index 5c3126d..67c1817 100644 --- a/src/client/app_configuration_ibm_cloud.rs +++ b/src/client/app_configuration_ibm_cloud.rs @@ -86,6 +86,14 @@ impl ConfigurationProvider for AppConfigurationClientIBMCloud { fn is_online(&self) -> Result { self.client.is_online() } + + fn wait_until_online(&self) { + self.client.wait_until_online(); + } + + fn wait_until_configuration_is_available(&self) { + self.client.wait_until_configuration_is_available(); + } } #[cfg(test)] diff --git a/src/client/app_configuration_offline.rs b/src/client/app_configuration_offline.rs index 82b6ba0..ab932f4 100644 --- a/src/client/app_configuration_offline.rs +++ b/src/client/app_configuration_offline.rs @@ -15,6 +15,7 @@ use crate::errors::Result; use crate::models::{Configuration, FeatureSnapshot, PropertySnapshot}; use crate::ConfigurationProvider; +use log::error; /// AppConfiguration client using a local file with a configuration snapshot #[derive(Debug)] @@ -55,4 +56,14 @@ impl ConfigurationProvider for AppConfigurationOffline { fn is_online(&self) -> Result { Ok(false) } + + fn wait_until_online(&self) { + error!("Waiting for AppConfigurationOffline to get online. This will never happen."); + std::thread::park(); // block forever + } + + fn wait_until_configuration_is_available(&self) { + // No wait required: + // AppConfigurationOffline always has a configuration available. + } } diff --git a/src/models/configuration.rs b/src/models/configuration.rs index 3f33569..abeb799 100644 --- a/src/models/configuration.rs +++ b/src/models/configuration.rs @@ -22,6 +22,7 @@ use crate::ConfigurationDataError; use super::feature_snapshot::FeatureSnapshot; use super::property_snapshot::PropertySnapshot; use crate::ConfigurationProvider; +use log::error; /// Represents all the configuration data needed for the client to perform /// feature/propery evaluation. @@ -192,6 +193,15 @@ impl ConfigurationProvider for Configuration { fn is_online(&self) -> Result { Ok(false) } + + fn wait_until_online(&self) { + error!("Waiting for Configuration to get online. This will never happen."); + std::thread::park(); // block forever + } + + fn wait_until_configuration_is_available(&self) { + // No wait required + } } #[cfg(test)] diff --git a/src/network/live_configuration/live_configuration.rs b/src/network/live_configuration/live_configuration.rs index a1f79fc..be4e7c2 100644 --- a/src/network/live_configuration/live_configuration.rs +++ b/src/network/live_configuration/live_configuration.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use super::current_mode::CurrentModeOfflineReason; use super::update_thread_worker::UpdateThreadWorker; use super::{CurrentMode, Error, OfflineMode, Result}; use crate::models::Configuration; use crate::network::http_client::ServerClient; +use crate::network::live_configuration::current_mode; use crate::utils::{ThreadHandle, ThreadStatus}; use crate::{ConfigurationId, ConfigurationProvider}; @@ -37,10 +38,10 @@ pub trait LiveConfiguration: ConfigurationProvider { pub(crate) struct LiveConfigurationImpl { /// Configuration object that will be returned to consumers. This is also the object /// that the thread in the backend will be updating. - configuration: Arc>>, + configuration: Arc<(Mutex>, Condvar)>, /// Current operation mode. - current_mode: Arc>, + current_mode: Arc<(Mutex, Condvar)>, /// Handler to the internal thread that takes care of updating the [`LiveConfigurationImpl::configuration`]. update_thread: ThreadHandle>, @@ -57,10 +58,11 @@ impl LiveConfigurationImpl { server_client: T, configuration_id: ConfigurationId, ) -> Self { - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( server_client, @@ -84,9 +86,11 @@ impl LiveConfigurationImpl { /// configured for this object. fn get_configuration(&self) -> Result { // TODO: Can we return a reference instead? - match &*self.current_mode.lock()? { + let (current_mode_mutex, _) = &*self.current_mode; + match &*current_mode_mutex.lock()? { CurrentMode::Online => { - match &*self.configuration.lock()? { + let (configuration_mutex, _) = &*self.configuration; + match &*configuration_mutex.lock()? { // We store the configuration retrieved from the server into the Arc before switching the flag to Online None => unreachable!(), Some(configuration) => Ok(configuration.clone()), @@ -94,10 +98,13 @@ impl LiveConfigurationImpl { } CurrentMode::Offline(current_mode_offline_reason) => match &self.offline_mode { OfflineMode::Fail => Err(Error::Offline(current_mode_offline_reason.clone())), - OfflineMode::Cache => match &*self.configuration.lock()? { - None => Err(Error::ConfigurationNotYetAvailable), - Some(configuration) => Ok(configuration.clone()), - }, + OfflineMode::Cache => { + let (configuration_mutex, _) = &*self.configuration; + match &*configuration_mutex.lock()? { + None => Err(Error::ConfigurationNotYetAvailable), + Some(configuration) => Ok(configuration.clone()), + } + } OfflineMode::FallbackData(app_configuration_offline) => { Ok(app_configuration_offline.config_snapshot.clone()) } @@ -107,13 +114,16 @@ impl LiveConfigurationImpl { "Thread finished with status: {:?}", result ))), - OfflineMode::Cache => match &*self.configuration.lock()? { - None => Err(Error::UnrecoverableError(format!( - "Initial configuration failed to retrieve: {:?}", - result - ))), - Some(configuration) => Ok(configuration.clone()), - }, + OfflineMode::Cache => { + let (configuration_mutex, _) = &*self.configuration; + match &*configuration_mutex.lock()? { + None => Err(Error::UnrecoverableError(format!( + "Initial configuration failed to retrieve: {:?}", + result + ))), + Some(configuration) => Ok(configuration.clone()), + } + } OfflineMode::FallbackData(app_configuration_offline) => { Ok(app_configuration_offline.config_snapshot.clone()) } @@ -142,6 +152,24 @@ impl ConfigurationProvider for LiveConfigurationImpl { fn is_online(&self) -> crate::Result { Ok(self.get_current_mode()? == CurrentMode::Online) } + + fn wait_until_configuration_is_available(&self) { + let (configuration_mutex, condition_variable) = &*self.configuration; + let configuration_guard = configuration_mutex.lock().unwrap(); + condition_variable + .wait_while(configuration_guard, |configuration| configuration.is_none()) + .unwrap(); + } + + fn wait_until_online(&self) { + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let current_mode_guard = current_mode_mutex.lock().unwrap(); + condition_variable + .wait_while(current_mode_guard, |current_mode| { + *current_mode == CurrentMode::Online + }) + .unwrap(); + } } impl LiveConfiguration for LiveConfigurationImpl { @@ -150,7 +178,8 @@ impl LiveConfiguration for LiveConfigurationImpl { } fn get_current_mode(&self) -> Result { - Ok(self.current_mode.lock()?.clone()) + let (current_mode_mutex, _) = &*self.current_mode; + Ok(current_mode_mutex.lock()?.clone()) } } diff --git a/src/network/live_configuration/update_thread_worker.rs b/src/network/live_configuration/update_thread_worker.rs index aa09808..d711ff6 100644 --- a/src/network/live_configuration/update_thread_worker.rs +++ b/src/network/live_configuration/update_thread_worker.rs @@ -13,13 +13,14 @@ // limitations under the License. use std::sync::mpsc::Receiver; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use super::current_mode::CurrentModeOfflineReason; use super::CurrentMode; use super::{Error, Result}; use crate::models::Configuration; use crate::network::http_client::{ServerClient, WebsocketReader}; +use crate::network::live_configuration::current_mode; use crate::network::NetworkError; use crate::ConfigurationId; @@ -28,16 +29,16 @@ pub(crate) const SERVER_HEARTBEAT: &str = "test message"; pub(crate) struct UpdateThreadWorker { server_client: T, configuration_id: ConfigurationId, - configuration: Arc>>, - current_mode: Arc>, + configuration: Arc<(Mutex>, Condvar)>, + current_mode: Arc<(Mutex, Condvar)>, } impl UpdateThreadWorker { pub(crate) fn new( server_client: T, configuration_id: ConfigurationId, - configuration: Arc>>, - current_mode: Arc>, + configuration: Arc<(Mutex>, Condvar)>, + current_mode: Arc<(Mutex, Condvar)>, ) -> Self { Self { server_client, @@ -93,7 +94,10 @@ impl UpdateThreadWorker { /// [`UpdateThreadWorker::current_mode`] is set to [`CurrentMode::Defunct`]. pub(crate) fn run(&self, thread_termination_receiver: Receiver<()>) -> Result<()> { let result = self.run_internal(thread_termination_receiver); - *self.current_mode.lock().unwrap() = CurrentMode::Defunct(result.clone()); + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let mut current_mode = current_mode_mutex.lock().unwrap(); + *current_mode = CurrentMode::Defunct(result.clone()); + condition_variable.notify_all(); result } @@ -103,14 +107,27 @@ impl UpdateThreadWorker { fn update_configuration_from_server_and_current_mode(&self) -> Result<()> { match self.server_client.get_configuration(&self.configuration_id) { Ok(config) => { - *self.configuration.lock()? = Some(config); - *self.current_mode.lock()? = CurrentMode::Online; + let (current_config_mutex, condition_variable) = &*self.configuration; + let mut current_config = current_config_mutex.lock()?; + *current_config = Some(config); + condition_variable.notify_all(); + + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let mut current_mode = current_mode_mutex.lock()?; + *current_mode = CurrentMode::Online; + condition_variable.notify_all(); + Ok(()) } Err(e) => { Self::recoverable_error(e)?; - *self.current_mode.lock()? = + + let (current_mode_mutex, condition_variable) = &*self.current_mode; + let mut current_mode = current_mode_mutex.lock()?; + *current_mode = CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration); + condition_variable.notify_all(); + Ok(()) } } @@ -125,10 +142,11 @@ impl UpdateThreadWorker { /// there is any error receiving the messages. It's up to the caller to implement /// the recovery procedure for these scenarios. fn handle_websocket_message(&self, mut socket: WS) -> Result> { + let (current_mode_mutex, condition_variable) = &*self.current_mode; match socket.read_msg() { Ok(msg) => match msg { tungstenite::Message::Text(utf8_bytes) => { - let current_mode_clone = self.current_mode.lock()?.clone(); + let current_mode_clone = current_mode_mutex.lock()?.clone(); match (utf8_bytes.as_str(), current_mode_clone) { (SERVER_HEARTBEAT, CurrentMode::Offline(_)) => { self.update_configuration_from_server_and_current_mode()?; @@ -141,8 +159,9 @@ impl UpdateThreadWorker { Ok(Some(socket)) } tungstenite::Message::Close(_) => { - *self.current_mode.lock()? = + *current_mode_mutex.lock()? = CurrentMode::Offline(CurrentModeOfflineReason::WebsocketClosed); + condition_variable.notify_all(); Ok(None) } _ => { @@ -151,8 +170,9 @@ impl UpdateThreadWorker { } }, Err(_) => { - *self.current_mode.lock()? = + *current_mode_mutex.lock()? = CurrentMode::Offline(CurrentModeOfflineReason::WebsocketError); + condition_variable.notify_all(); Ok(None) } } From c8d06c95c61576e18d5af0fbffd899c069cc8136 Mon Sep 17 00:00:00 2001 From: Travis Date: Fri, 5 Sep 2025 14:57:16 +0200 Subject: [PATCH 2/6] fix tests Signed-off-by: Travis --- src/client/app_configuration_http.rs | 8 ++ .../live_configuration/live_configuration.rs | 33 ++++--- .../update_thread_worker.rs | 92 ++++++++++--------- 3 files changed, 78 insertions(+), 55 deletions(-) diff --git a/src/client/app_configuration_http.rs b/src/client/app_configuration_http.rs index 8fc2d93..16148c6 100644 --- a/src/client/app_configuration_http.rs +++ b/src/client/app_configuration_http.rs @@ -140,6 +140,14 @@ mod tests { fn is_online(&self) -> Result { todo!() } + + fn wait_until_online(&self) { + todo!() + } + + fn wait_until_configuration_is_available(&self) { + todo!() + } } impl LiveConfiguration for LiveConfigurationMock { fn get_thread_status( diff --git a/src/network/live_configuration/live_configuration.rs b/src/network/live_configuration/live_configuration.rs index be4e7c2..bbb9544 100644 --- a/src/network/live_configuration/live_configuration.rs +++ b/src/network/live_configuration/live_configuration.rs @@ -156,7 +156,7 @@ impl ConfigurationProvider for LiveConfigurationImpl { fn wait_until_configuration_is_available(&self) { let (configuration_mutex, condition_variable) = &*self.configuration; let configuration_guard = configuration_mutex.lock().unwrap(); - condition_variable + let _guard = condition_variable .wait_while(configuration_guard, |configuration| configuration.is_none()) .unwrap(); } @@ -164,7 +164,7 @@ impl ConfigurationProvider for LiveConfigurationImpl { fn wait_until_online(&self) { let (current_mode_mutex, condition_variable) = &*self.current_mode; let current_mode_guard = current_mode_mutex.lock().unwrap(); - condition_variable + let _guard = condition_variable .wait_while(current_mode_guard, |current_mode| { *current_mode == CurrentMode::Online }) @@ -350,9 +350,9 @@ mod tests { ) { let (tx, _) = std::sync::mpsc::channel(); let mut cfg = LiveConfigurationImpl { - configuration: Arc::new(Mutex::new(Some(Configuration::default()))), + configuration: Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())), offline_mode: OfflineMode::Fail, - current_mode: Arc::new(Mutex::new(CurrentMode::Online)), + current_mode: Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())), update_thread: ThreadHandle { _thread_termination_sender: tx, thread_handle: None, @@ -392,10 +392,13 @@ mod tests { let (tx, _) = std::sync::mpsc::channel(); let mut cfg = LiveConfigurationImpl { offline_mode: OfflineMode::Fail, - configuration: Arc::new(Mutex::new(Some(Configuration::default()))), - current_mode: Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::WebsocketClosed, - ))), + configuration: Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())), + current_mode: Arc::new(( + Mutex::new(CurrentMode::Offline( + CurrentModeOfflineReason::WebsocketClosed, + )), + Condvar::new(), + )), update_thread: ThreadHandle { _thread_termination_sender: tx, thread_handle: None, @@ -416,13 +419,14 @@ mod tests { { cfg.offline_mode = OfflineMode::Cache; { - cfg.configuration = Arc::new(Mutex::new(None)); + cfg.configuration = Arc::new((Mutex::new(None), Condvar::new())); let r = cfg.get_configuration(); assert!(r.is_err()); assert_eq!(r.unwrap_err(), Error::ConfigurationNotYetAvailable); } { - cfg.configuration = Arc::new(Mutex::new(Some(Configuration::default()))); + cfg.configuration = + Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())); let r = cfg.get_configuration(); assert!(r.is_ok(), "Error: {}", r.unwrap_err()); assert!(r.unwrap().features.is_empty()); @@ -447,8 +451,8 @@ mod tests { let (tx, _) = std::sync::mpsc::channel(); let mut cfg = LiveConfigurationImpl { offline_mode: OfflineMode::Fail, - configuration: Arc::new(Mutex::new(Some(Configuration::default()))), - current_mode: Arc::new(Mutex::new(CurrentMode::Defunct(Ok(())))), + configuration: Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())), + current_mode: Arc::new((Mutex::new(CurrentMode::Defunct(Ok(()))), Condvar::new())), update_thread: ThreadHandle { _thread_termination_sender: tx, thread_handle: None, @@ -469,7 +473,7 @@ mod tests { { cfg.offline_mode = OfflineMode::Cache; { - cfg.configuration = Arc::new(Mutex::new(None)); + cfg.configuration = Arc::new((Mutex::new(None), Condvar::new())); let r = cfg.get_configuration(); assert!(r.is_err()); assert_eq!( @@ -480,7 +484,8 @@ mod tests { ); } { - cfg.configuration = Arc::new(Mutex::new(Some(Configuration::default()))); + cfg.configuration = + Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())); let r = cfg.get_configuration(); assert!(r.is_ok(), "Error: {}", r.unwrap_err()); assert!(r.unwrap().features.is_empty()); diff --git a/src/network/live_configuration/update_thread_worker.rs b/src/network/live_configuration/update_thread_worker.rs index d711ff6..ee1f1c7 100644 --- a/src/network/live_configuration/update_thread_worker.rs +++ b/src/network/live_configuration/update_thread_worker.rs @@ -228,10 +228,11 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -243,8 +244,8 @@ mod tests { let r = worker.update_configuration_from_server_and_current_mode(); assert!(r.is_ok()); - assert!(configuration.lock().unwrap().is_some()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert!(configuration.0.lock().unwrap().is_some()); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); } #[test] @@ -270,10 +271,11 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "not used".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -285,9 +287,9 @@ mod tests { let r = worker.update_configuration_from_server_and_current_mode(); assert!(r.is_ok()); - assert!(configuration.lock().unwrap().is_none()); + assert!(configuration.0.lock().unwrap().is_none()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration) ); } @@ -312,8 +314,8 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -326,9 +328,9 @@ mod tests { // check if we transition from online to offline: assert!(r.is_ok()); - assert!(configuration.lock().unwrap().is_none()); + assert!(configuration.0.lock().unwrap().is_none()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration) ); } @@ -353,8 +355,8 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -390,10 +392,11 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -406,6 +409,9 @@ mod tests { let r = worker.handle_websocket_message(WebsocketMockReader { message: Some(Ok(tungstenite::Message::text(SERVER_HEARTBEAT))), }); + + let configuration = &configuration.0; + let current_mode = ¤t_mode.0; assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_some()); assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); @@ -467,10 +473,11 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Offline( - CurrentModeOfflineReason::Initializing, - ))); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new(( + Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), + Condvar::new(), + )); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -494,7 +501,7 @@ mod tests { assert!(r.is_err()); // Additionally we check that a heartbeat when online is a noop - *current_mode.lock().unwrap() = CurrentMode::Online; + *current_mode.0.lock().unwrap() = CurrentMode::Online; let r = worker.handle_websocket_message(WebsocketMockReader { message: Some(Ok(tungstenite::Message::text(SERVER_HEARTBEAT))), }); @@ -521,8 +528,8 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -543,7 +550,7 @@ mod tests { // websocket read error changes current_mode to Offline assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::WebsocketError) ); } @@ -574,8 +581,8 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let (tx_serverclient_call_logs, rx_serverclient_call_logs) = std::sync::mpsc::channel(); let worker = UpdateThreadWorker::new( @@ -591,7 +598,7 @@ mod tests { let r = worker.run(rx_thread_terminator); assert!(r.is_err()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Defunct(Err(Error::UnrecoverableError("".into()))) ); @@ -631,8 +638,8 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -645,7 +652,7 @@ mod tests { let r = worker.run(rx); assert!(r.is_err()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Defunct(Err(Error::UnrecoverableError("".into()))) ); } @@ -671,8 +678,8 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( ServerClientMock {}, @@ -684,7 +691,10 @@ mod tests { drop(tx); let r = worker.run(rx); assert!(r.is_ok()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Defunct(Ok(()))); + assert_eq!( + *current_mode.0.lock().unwrap(), + CurrentMode::Defunct(Ok(())) + ); } #[test] @@ -708,8 +718,8 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new(Mutex::new(None)); - let current_mode = Arc::new(Mutex::new(CurrentMode::Online)); + let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let (get_ws_tx, get_ws_rx) = std::sync::mpsc::channel(); From ec8d75c8d0e8e71bac74a046831aeb47c62c6987 Mon Sep 17 00:00:00 2001 From: Travis Date: Fri, 5 Sep 2025 18:13:10 +0200 Subject: [PATCH 3/6] Handle Fallback data + change demo + manual testing Signed-off-by: Travis --- examples/demo.rs | 10 ++++--- .../live_configuration/live_configuration.rs | 27 ++++++++++++++----- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/examples/demo.rs b/examples/demo.rs index ec06f4f..c66cd28 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, env, thread, time::Duration}; +use std::{collections::HashMap, env, io::Write, thread, time::Duration}; use appconfiguration::{ - AppConfigurationClient, AppConfigurationClientIBMCloud, ConfigurationId, Entity, Feature, - OfflineMode, Property, Value, + AppConfigurationClient, AppConfigurationClientIBMCloud, ConfigurationId, ConfigurationProvider, + Entity, Feature, OfflineMode, Property, Value, }; use dotenvy::dotenv; use std::error::Error; @@ -54,6 +54,10 @@ fn main() -> std::result::Result<(), Box> { let configuration = ConfigurationId::new(guid, environment_id, collection_id); let client = AppConfigurationClientIBMCloud::new(&apikey, ®ion, configuration, OfflineMode::Fail)?; + print!("Waiting for initial data..."); + std::io::stdout().flush().unwrap(); + client.wait_until_configuration_is_available(); + println!(" DONE"); let entity = CustomerEntity { id: "user123".to_string(), diff --git a/src/network/live_configuration/live_configuration.rs b/src/network/live_configuration/live_configuration.rs index bbb9544..5c98a3d 100644 --- a/src/network/live_configuration/live_configuration.rs +++ b/src/network/live_configuration/live_configuration.rs @@ -21,7 +21,7 @@ use crate::models::Configuration; use crate::network::http_client::ServerClient; use crate::network::live_configuration::current_mode; use crate::utils::{ThreadHandle, ThreadStatus}; -use crate::{ConfigurationId, ConfigurationProvider}; +use crate::{AppConfigurationOffline, ConfigurationId, ConfigurationProvider}; /// A [`ConfigurationProvider`] that keeps the configuration updated with some /// third-party source using an asyncronous mechanism. @@ -132,6 +132,8 @@ impl LiveConfigurationImpl { } } +fn assert_type(_val: T) {} + impl ConfigurationProvider for LiveConfigurationImpl { fn get_feature_ids(&self) -> crate::Result> { self.get_configuration()?.get_feature_ids() @@ -154,11 +156,22 @@ impl ConfigurationProvider for LiveConfigurationImpl { } fn wait_until_configuration_is_available(&self) { - let (configuration_mutex, condition_variable) = &*self.configuration; - let configuration_guard = configuration_mutex.lock().unwrap(); - let _guard = condition_variable - .wait_while(configuration_guard, |configuration| configuration.is_none()) - .unwrap(); + match &self.offline_mode { + OfflineMode::FallbackData(fallback) => { + // Have fallback data available. + // No wait required. + assert_type::<&AppConfigurationOffline>(fallback); + // NOTE: Asserting the type here, as this works only because currently offline data is allowed as fallback. + // Once we allow more complex fallbacks, this needs to be handled better. + } + _ => { + let (configuration_mutex, condition_variable) = &*self.configuration; + let configuration_guard = configuration_mutex.lock().unwrap(); + let _guard = condition_variable + .wait_while(configuration_guard, |configuration| configuration.is_none()) + .unwrap(); + } + } } fn wait_until_online(&self) { @@ -166,7 +179,7 @@ impl ConfigurationProvider for LiveConfigurationImpl { let current_mode_guard = current_mode_mutex.lock().unwrap(); let _guard = condition_variable .wait_while(current_mode_guard, |current_mode| { - *current_mode == CurrentMode::Online + *current_mode != CurrentMode::Online }) .unwrap(); } From 3778d5f832e39f335708db153f51665171c5ec41 Mon Sep 17 00:00:00 2001 From: Travis Date: Mon, 29 Sep 2025 09:46:16 +0200 Subject: [PATCH 4/6] Remove wait until config available Signed-off-by: Travis --- examples/demo.rs | 2 +- src/client/app_configuration_client.rs | 5 ----- src/client/app_configuration_http.rs | 9 --------- src/client/app_configuration_ibm_cloud.rs | 4 ---- src/client/app_configuration_offline.rs | 5 ----- src/models/configuration.rs | 4 ---- .../live_configuration/live_configuration.rs | 19 ------------------- 7 files changed, 1 insertion(+), 47 deletions(-) diff --git a/examples/demo.rs b/examples/demo.rs index 131d7fd..a25fdc9 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -56,7 +56,7 @@ fn main() -> std::result::Result<(), Box> { AppConfigurationClientIBMCloud::new(&apikey, ®ion, configuration, OfflineMode::Fail)?; print!("Waiting for initial data..."); std::io::stdout().flush().unwrap(); - client.wait_until_configuration_is_available(); + client.wait_until_online(); println!(" DONE"); let entity = CustomerEntity { diff --git a/src/client/app_configuration_client.rs b/src/client/app_configuration_client.rs index b5d2abf..5218388 100644 --- a/src/client/app_configuration_client.rs +++ b/src/client/app_configuration_client.rs @@ -70,11 +70,6 @@ pub trait ConfigurationProvider { /// For remote configurations: Blocks until it's connected to the remote. fn wait_until_online(&self); - - /// Blocks until a configuration is available. - /// Note: This is different than wait_until_online, as configuration could be available - /// through alternate sources (Cache / Fallback) - fn wait_until_configuration_is_available(&self); } /// AppConfiguration client for browsing, and evaluating features and properties. diff --git a/src/client/app_configuration_http.rs b/src/client/app_configuration_http.rs index 16148c6..8e402d8 100644 --- a/src/client/app_configuration_http.rs +++ b/src/client/app_configuration_http.rs @@ -96,11 +96,6 @@ impl ConfigurationProvider for AppConfigurationClientHttp< fn wait_until_online(&self) { self.live_configuration.wait_until_online(); } - - fn wait_until_configuration_is_available(&self) { - self.live_configuration - .wait_until_configuration_is_available(); - } } #[cfg(test)] @@ -144,10 +139,6 @@ mod tests { fn wait_until_online(&self) { todo!() } - - fn wait_until_configuration_is_available(&self) { - todo!() - } } impl LiveConfiguration for LiveConfigurationMock { fn get_thread_status( diff --git a/src/client/app_configuration_ibm_cloud.rs b/src/client/app_configuration_ibm_cloud.rs index 67c1817..2c44052 100644 --- a/src/client/app_configuration_ibm_cloud.rs +++ b/src/client/app_configuration_ibm_cloud.rs @@ -90,10 +90,6 @@ impl ConfigurationProvider for AppConfigurationClientIBMCloud { fn wait_until_online(&self) { self.client.wait_until_online(); } - - fn wait_until_configuration_is_available(&self) { - self.client.wait_until_configuration_is_available(); - } } #[cfg(test)] diff --git a/src/client/app_configuration_offline.rs b/src/client/app_configuration_offline.rs index ab932f4..5638286 100644 --- a/src/client/app_configuration_offline.rs +++ b/src/client/app_configuration_offline.rs @@ -61,9 +61,4 @@ impl ConfigurationProvider for AppConfigurationOffline { error!("Waiting for AppConfigurationOffline to get online. This will never happen."); std::thread::park(); // block forever } - - fn wait_until_configuration_is_available(&self) { - // No wait required: - // AppConfigurationOffline always has a configuration available. - } } diff --git a/src/models/configuration.rs b/src/models/configuration.rs index abeb799..4b0bc66 100644 --- a/src/models/configuration.rs +++ b/src/models/configuration.rs @@ -198,10 +198,6 @@ impl ConfigurationProvider for Configuration { error!("Waiting for Configuration to get online. This will never happen."); std::thread::park(); // block forever } - - fn wait_until_configuration_is_available(&self) { - // No wait required - } } #[cfg(test)] diff --git a/src/network/live_configuration/live_configuration.rs b/src/network/live_configuration/live_configuration.rs index 5c98a3d..c74729d 100644 --- a/src/network/live_configuration/live_configuration.rs +++ b/src/network/live_configuration/live_configuration.rs @@ -155,25 +155,6 @@ impl ConfigurationProvider for LiveConfigurationImpl { Ok(self.get_current_mode()? == CurrentMode::Online) } - fn wait_until_configuration_is_available(&self) { - match &self.offline_mode { - OfflineMode::FallbackData(fallback) => { - // Have fallback data available. - // No wait required. - assert_type::<&AppConfigurationOffline>(fallback); - // NOTE: Asserting the type here, as this works only because currently offline data is allowed as fallback. - // Once we allow more complex fallbacks, this needs to be handled better. - } - _ => { - let (configuration_mutex, condition_variable) = &*self.configuration; - let configuration_guard = configuration_mutex.lock().unwrap(); - let _guard = condition_variable - .wait_while(configuration_guard, |configuration| configuration.is_none()) - .unwrap(); - } - } - } - fn wait_until_online(&self) { let (current_mode_mutex, condition_variable) = &*self.current_mode; let current_mode_guard = current_mode_mutex.lock().unwrap(); From 044dc231b8a7904f8e99bf55cd87afc5db261ff8 Mon Sep 17 00:00:00 2001 From: Travis Date: Mon, 29 Sep 2025 10:15:02 +0200 Subject: [PATCH 5/6] start to remove condvar from config Signed-off-by: Travis --- .../live_configuration/live_configuration.rs | 35 ++++++++----------- .../update_thread_worker.rs | 10 +++--- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/src/network/live_configuration/live_configuration.rs b/src/network/live_configuration/live_configuration.rs index c74729d..5fe6164 100644 --- a/src/network/live_configuration/live_configuration.rs +++ b/src/network/live_configuration/live_configuration.rs @@ -38,7 +38,7 @@ pub trait LiveConfiguration: ConfigurationProvider { pub(crate) struct LiveConfigurationImpl { /// Configuration object that will be returned to consumers. This is also the object /// that the thread in the backend will be updating. - configuration: Arc<(Mutex>, Condvar)>, + configuration: Arc>>, /// Current operation mode. current_mode: Arc<(Mutex, Condvar)>, @@ -58,7 +58,7 @@ impl LiveConfigurationImpl { server_client: T, configuration_id: ConfigurationId, ) -> Self { - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new(( Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), Condvar::new(), @@ -89,8 +89,7 @@ impl LiveConfigurationImpl { let (current_mode_mutex, _) = &*self.current_mode; match &*current_mode_mutex.lock()? { CurrentMode::Online => { - let (configuration_mutex, _) = &*self.configuration; - match &*configuration_mutex.lock()? { + match &*self.configuration.lock()? { // We store the configuration retrieved from the server into the Arc before switching the flag to Online None => unreachable!(), Some(configuration) => Ok(configuration.clone()), @@ -98,13 +97,10 @@ impl LiveConfigurationImpl { } CurrentMode::Offline(current_mode_offline_reason) => match &self.offline_mode { OfflineMode::Fail => Err(Error::Offline(current_mode_offline_reason.clone())), - OfflineMode::Cache => { - let (configuration_mutex, _) = &*self.configuration; - match &*configuration_mutex.lock()? { - None => Err(Error::ConfigurationNotYetAvailable), - Some(configuration) => Ok(configuration.clone()), - } - } + OfflineMode::Cache => match &*self.configuration.lock()? { + None => Err(Error::ConfigurationNotYetAvailable), + Some(configuration) => Ok(configuration.clone()), + }, OfflineMode::FallbackData(app_configuration_offline) => { Ok(app_configuration_offline.config_snapshot.clone()) } @@ -114,16 +110,13 @@ impl LiveConfigurationImpl { "Thread finished with status: {:?}", result ))), - OfflineMode::Cache => { - let (configuration_mutex, _) = &*self.configuration; - match &*configuration_mutex.lock()? { - None => Err(Error::UnrecoverableError(format!( - "Initial configuration failed to retrieve: {:?}", - result - ))), - Some(configuration) => Ok(configuration.clone()), - } - } + OfflineMode::Cache => match &*self.configuration.lock()? { + None => Err(Error::UnrecoverableError(format!( + "Initial configuration failed to retrieve: {:?}", + result + ))), + Some(configuration) => Ok(configuration.clone()), + }, OfflineMode::FallbackData(app_configuration_offline) => { Ok(app_configuration_offline.config_snapshot.clone()) } diff --git a/src/network/live_configuration/update_thread_worker.rs b/src/network/live_configuration/update_thread_worker.rs index ee1f1c7..e9f9983 100644 --- a/src/network/live_configuration/update_thread_worker.rs +++ b/src/network/live_configuration/update_thread_worker.rs @@ -29,7 +29,7 @@ pub(crate) const SERVER_HEARTBEAT: &str = "test message"; pub(crate) struct UpdateThreadWorker { server_client: T, configuration_id: ConfigurationId, - configuration: Arc<(Mutex>, Condvar)>, + configuration: Arc>>, current_mode: Arc<(Mutex, Condvar)>, } @@ -37,7 +37,7 @@ impl UpdateThreadWorker { pub(crate) fn new( server_client: T, configuration_id: ConfigurationId, - configuration: Arc<(Mutex>, Condvar)>, + configuration: Arc>>, current_mode: Arc<(Mutex, Condvar)>, ) -> Self { Self { @@ -107,10 +107,8 @@ impl UpdateThreadWorker { fn update_configuration_from_server_and_current_mode(&self) -> Result<()> { match self.server_client.get_configuration(&self.configuration_id) { Ok(config) => { - let (current_config_mutex, condition_variable) = &*self.configuration; - let mut current_config = current_config_mutex.lock()?; + let mut current_config = self.configuration.lock()?; *current_config = Some(config); - condition_variable.notify_all(); let (current_mode_mutex, condition_variable) = &*self.current_mode; let mut current_mode = current_mode_mutex.lock()?; @@ -355,7 +353,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( From 7c1a8cc4a4c72e7155ccd59c1618f3bf927978f3 Mon Sep 17 00:00:00 2001 From: Travis Date: Mon, 29 Sep 2025 10:53:19 +0200 Subject: [PATCH 6/6] Fix tests Signed-off-by: Travis --- .../live_configuration/live_configuration.rs | 16 ++++---- .../update_thread_worker.rs | 38 +++++++++---------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/network/live_configuration/live_configuration.rs b/src/network/live_configuration/live_configuration.rs index 5fe6164..83c3f5d 100644 --- a/src/network/live_configuration/live_configuration.rs +++ b/src/network/live_configuration/live_configuration.rs @@ -337,7 +337,7 @@ mod tests { ) { let (tx, _) = std::sync::mpsc::channel(); let mut cfg = LiveConfigurationImpl { - configuration: Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())), + configuration: Arc::new(Mutex::new(Some(Configuration::default()))), offline_mode: OfflineMode::Fail, current_mode: Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())), update_thread: ThreadHandle { @@ -379,7 +379,7 @@ mod tests { let (tx, _) = std::sync::mpsc::channel(); let mut cfg = LiveConfigurationImpl { offline_mode: OfflineMode::Fail, - configuration: Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())), + configuration: Arc::new(Mutex::new(Some(Configuration::default()))), current_mode: Arc::new(( Mutex::new(CurrentMode::Offline( CurrentModeOfflineReason::WebsocketClosed, @@ -406,14 +406,13 @@ mod tests { { cfg.offline_mode = OfflineMode::Cache; { - cfg.configuration = Arc::new((Mutex::new(None), Condvar::new())); + cfg.configuration = Arc::new(Mutex::new(None)); let r = cfg.get_configuration(); assert!(r.is_err()); assert_eq!(r.unwrap_err(), Error::ConfigurationNotYetAvailable); } { - cfg.configuration = - Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())); + cfg.configuration = Arc::new(Mutex::new(Some(Configuration::default()))); let r = cfg.get_configuration(); assert!(r.is_ok(), "Error: {}", r.unwrap_err()); assert!(r.unwrap().features.is_empty()); @@ -438,7 +437,7 @@ mod tests { let (tx, _) = std::sync::mpsc::channel(); let mut cfg = LiveConfigurationImpl { offline_mode: OfflineMode::Fail, - configuration: Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())), + configuration: Arc::new(Mutex::new(Some(Configuration::default()))), current_mode: Arc::new((Mutex::new(CurrentMode::Defunct(Ok(()))), Condvar::new())), update_thread: ThreadHandle { _thread_termination_sender: tx, @@ -460,7 +459,7 @@ mod tests { { cfg.offline_mode = OfflineMode::Cache; { - cfg.configuration = Arc::new((Mutex::new(None), Condvar::new())); + cfg.configuration = Arc::new(Mutex::new(None)); let r = cfg.get_configuration(); assert!(r.is_err()); assert_eq!( @@ -471,8 +470,7 @@ mod tests { ); } { - cfg.configuration = - Arc::new((Mutex::new(Some(Configuration::default())), Condvar::new())); + cfg.configuration = Arc::new(Mutex::new(Some(Configuration::default()))); let r = cfg.get_configuration(); assert!(r.is_ok(), "Error: {}", r.unwrap_err()); assert!(r.unwrap().features.is_empty()); diff --git a/src/network/live_configuration/update_thread_worker.rs b/src/network/live_configuration/update_thread_worker.rs index e9f9983..33a2b22 100644 --- a/src/network/live_configuration/update_thread_worker.rs +++ b/src/network/live_configuration/update_thread_worker.rs @@ -226,7 +226,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new(( Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), Condvar::new(), @@ -242,7 +242,7 @@ mod tests { let r = worker.update_configuration_from_server_and_current_mode(); assert!(r.is_ok()); - assert!(configuration.0.lock().unwrap().is_some()); + assert!(configuration.lock().unwrap().is_some()); assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); } @@ -269,7 +269,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "not used".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new(( Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), Condvar::new(), @@ -285,7 +285,7 @@ mod tests { let r = worker.update_configuration_from_server_and_current_mode(); assert!(r.is_ok()); - assert!(configuration.0.lock().unwrap().is_none()); + assert!(configuration.lock().unwrap().is_none()); assert_eq!( *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration) @@ -312,7 +312,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( @@ -326,7 +326,7 @@ mod tests { // check if we transition from online to offline: assert!(r.is_ok()); - assert!(configuration.0.lock().unwrap().is_none()); + assert!(configuration.lock().unwrap().is_none()); assert_eq!( *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration) @@ -390,7 +390,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new(( Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), Condvar::new(), @@ -408,11 +408,9 @@ mod tests { message: Some(Ok(tungstenite::Message::text(SERVER_HEARTBEAT))), }); - let configuration = &configuration.0; - let current_mode = ¤t_mode.0; assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_some()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // A repeated heartbeat should not re-fetch config (noop once online) *configuration.lock().unwrap() = None; @@ -421,7 +419,7 @@ mod tests { }); assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_none()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // Any other message type is a noop let r = worker.handle_websocket_message(WebsocketMockReader { @@ -429,7 +427,7 @@ mod tests { }); assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_none()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // any other text message should lead to a config update (None -> Some) let r = worker.handle_websocket_message(WebsocketMockReader { @@ -437,7 +435,7 @@ mod tests { }); assert!(r.unwrap().is_some()); assert!(configuration.lock().unwrap().is_some()); - assert_eq!(*current_mode.lock().unwrap(), CurrentMode::Online); + assert_eq!(*current_mode.0.lock().unwrap(), CurrentMode::Online); // After websocket is closed, it is consumed and we are offline let r = worker.handle_websocket_message(WebsocketMockReader { @@ -446,7 +444,7 @@ mod tests { assert!(r.unwrap().is_none()); // WS consumed assert!(configuration.lock().unwrap().is_some()); assert_eq!( - *current_mode.lock().unwrap(), + *current_mode.0.lock().unwrap(), CurrentMode::Offline(CurrentModeOfflineReason::WebsocketClosed) ); } @@ -471,7 +469,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new(( Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)), Condvar::new(), @@ -526,7 +524,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( @@ -579,7 +577,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let (tx_serverclient_call_logs, rx_serverclient_call_logs) = std::sync::mpsc::channel(); @@ -636,7 +634,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( @@ -676,7 +674,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let worker = UpdateThreadWorker::new( @@ -716,7 +714,7 @@ mod tests { } } let configuration_id = ConfigurationId::new("".into(), "environment_id".into(), "".into()); - let configuration = Arc::new((Mutex::new(None), Condvar::new())); + let configuration = Arc::new(Mutex::new(None)); let current_mode = Arc::new((Mutex::new(CurrentMode::Online), Condvar::new())); let (get_ws_tx, get_ws_rx) = std::sync::mpsc::channel();