diff --git a/.gitignore b/.gitignore index 248f2ad0c..983e88fd7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ Cargo.lock /device/thunder_ripple_sdk/target/ .DS_Store + +/device/mock_device/stats.json diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs index a752fce7d..911122770 100644 --- a/core/main/src/broker/broker_utils.rs +++ b/core/main/src/broker/broker_utils.rs @@ -15,7 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_port}; +use crate::state::platform_state::PlatformState; use futures::stream::{SplitSink, SplitStream}; use futures_util::StreamExt; use jsonrpsee::core::RpcResult; @@ -23,6 +23,7 @@ use ripple_sdk::{ api::gateway::rpc_gateway_api::{JsonRpcApiError, RpcRequest}, log::{error, info}, tokio::{self, net::TcpStream}, + utils::rpc_utils::extract_tcp_port, }; use serde_json::Value; use std::time::Duration; @@ -46,12 +47,14 @@ impl BrokerUtils { }; let url = url::Url::parse(&url_path).unwrap(); let port = extract_tcp_port(endpoint); + let tcp_port = port.unwrap(); + info!("Url host str {}", url.host_str().unwrap()); let mut index = 0; loop { // Try connecting to the tcp port first - if let Ok(v) = TcpStream::connect(&port).await { + if let Ok(v) = TcpStream::connect(&tcp_port).await { // Setup handshake for websocket with the tcp port // Some WS servers lock on to the Port but not setup handshake till they are fully setup if let Ok((stream, _)) = client_async(url_path.clone(), v).await { @@ -61,7 +64,7 @@ impl BrokerUtils { if (index % 10).eq(&0) { error!( "Broker with {} failed with retry for last {} secs in {}", - url_path, index, port + url_path, index, tcp_port ); } index += 1; diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index bc05cc99c..c2b404167 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -41,15 +41,14 @@ use ripple_sdk::{ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use crate::broker::endpoint_broker::{ - self, BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState, +use crate::broker::{ + endpoint_broker::{self, BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState}, + rules_engine::{Rule, RuleTransformType}, + thunder_broker::ThunderBroker, }; -use crate::broker::rules_engine::{Rule, RuleTransformType}; use futures::stream::SplitSink; use futures_util::SinkExt; - -use crate::broker::thunder_broker::ThunderBroker; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; // TBD get the storage dir from manifest or other Ripple config file diff --git a/core/main/src/utils/rpc_utils.rs b/core/main/src/utils/rpc_utils.rs index 9097a5224..9fc1f5953 100644 --- a/core/main/src/utils/rpc_utils.rs +++ b/core/main/src/utils/rpc_utils.rs @@ -109,13 +109,3 @@ pub fn get_base_method(method: &str) -> String { let method_vec: Vec<&str> = method.split('.').collect(); method_vec.first().unwrap().to_string().to_lowercase() } - -pub fn extract_tcp_port(url: &str) -> String { - let url_split: Vec<&str> = url.split("://").collect(); - if let Some(domain) = url_split.get(1) { - let domain_split: Vec<&str> = domain.split('/').collect(); - domain_split.first().unwrap().to_string() - } else { - url.to_owned() - } -} diff --git a/core/sdk/src/api/device/mod.rs b/core/sdk/src/api/device/mod.rs index 94ca527b2..afd70c419 100644 --- a/core/sdk/src/api/device/mod.rs +++ b/core/sdk/src/api/device/mod.rs @@ -21,7 +21,6 @@ pub mod device_apps; pub mod device_browser; pub mod device_events; pub mod device_info_request; -pub mod device_operator; pub mod device_peristence; pub mod device_request; pub mod device_user_grants_data; diff --git a/core/sdk/src/api/gateway/rpc_gateway_api.rs b/core/sdk/src/api/gateway/rpc_gateway_api.rs index 88a9f1282..256a7d2f0 100644 --- a/core/sdk/src/api/gateway/rpc_gateway_api.rs +++ b/core/sdk/src/api/gateway/rpc_gateway_api.rs @@ -224,7 +224,7 @@ impl ApiBaseRequest { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct JsonRpcApiRequest { pub jsonrpc: String, pub id: Option, @@ -241,6 +241,11 @@ impl JsonRpcApiRequest { params, } } + + pub fn with_id(mut self, id: u64) -> Self { + self.id = Some(id); + self + } } #[derive(Clone, Default, Debug)] diff --git a/core/sdk/src/utils/rpc_utils.rs b/core/sdk/src/utils/rpc_utils.rs index c77ea07a9..79ad8c6ec 100644 --- a/core/sdk/src/utils/rpc_utils.rs +++ b/core/sdk/src/utils/rpc_utils.rs @@ -20,3 +20,17 @@ use jsonrpsee::core::Error; pub fn rpc_err(msg: impl Into) -> Error { Error::Custom(msg.into()) } + +pub fn extract_tcp_port(url: &str) -> Result { + let url_split: Vec<&str> = url.split("://").collect(); + if let Some(domain) = url_split.get(1) { + let domain_split: Vec<&str> = domain.split('/').collect(); + if let Some(first_part) = domain_split.first() { + Ok(first_part.to_string()) + } else { + Err(Error::Custom("Invalid domain format".to_string())) + } + } else { + Err(Error::Custom("Invalid URL format".to_string())) + } +} diff --git a/device/thunder_ripple_sdk/Cargo.toml b/device/thunder_ripple_sdk/Cargo.toml index 9fc186ca6..f08d3e926 100644 --- a/device/thunder_ripple_sdk/Cargo.toml +++ b/device/thunder_ripple_sdk/Cargo.toml @@ -44,6 +44,9 @@ regex.workspace = true jsonrpsee = { workspace = true, features = ["macros", "ws-client"] } serde.workspace = true url.workspace = true +serde_json.workspace = true +futures-channel.workspace = true +futures.workspace = true strum = { version = "0.24", default-features = false } strum_macros = "0.24" @@ -56,8 +59,9 @@ csv = "1.1" # Allowing minor updates home = { version = "=0.5.5", optional = true } tree_magic_mini = { version = "=3.0.3", optional = true } rstest = { version = "0.18.2", optional = true, default-features = false } +tokio-tungstenite = { workspace = true, features = ["handshake"] } +futures-util = { version = "0.3.28", features = ["sink", "std"], default-features = false} [dev-dependencies] -tokio-tungstenite = { workspace = true, features = ["native-tls"] } ripple_sdk = { path = "../../core/sdk", features = ["tdk"] } - +rstest = "0.18.0" diff --git a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs index 61ba1de8b..e6ad12f5d 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs @@ -21,25 +21,129 @@ use crate::{ }; use ripple_sdk::{ extn::client::extn_client::ExtnClient, - log::{error, info}, + log::{debug, error, info, warn}, }; use super::{get_config_step::ThunderGetConfigStep, setup_thunder_pool_step::ThunderPoolStep}; +use crate::client::thunder_client::ThunderClientBuilder; +use crate::thunder_state::ThunderBootstrapStateWithConfig; +use crate::thunder_state::ThunderState; +use ripple_sdk::api::config::Config; +use ripple_sdk::utils::error::RippleError; +use serde::Deserialize; + +use ripple_sdk::extn::extn_client_message::{ExtnMessage, ExtnResponse}; + +const GATEWAY_DEFAULT: &str = "ws://127.0.0.1:9998/jsonrpc"; + +#[derive(Deserialize, Clone)] +pub struct ThunderPlatformParams { + #[serde(default = "gateway_default")] + gateway: String, +} + +fn gateway_default() -> String { + String::from(GATEWAY_DEFAULT) +} pub async fn boot_thunder( - state: ExtnClient, + ext_client: ExtnClient, plugin_param: ThunderPluginBootParam, ) -> Option { - info!("Booting thunder"); - if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await { + info!("Booting thunder initiated"); + let state = if ext_client.get_bool_config("use_with_thunder_async_client") { + info!("Using thunder_async_clinet"); + let mut extn_client = ext_client.clone(); + let mut gateway_url = match url::Url::parse(GATEWAY_DEFAULT) { + Ok(url) => url, + Err(e) => { + error!( + "Could not parse default gateway URL '{}': {}", + GATEWAY_DEFAULT, e + ); + return None; + } + }; + + let extn_message_response: Result = + extn_client.request(Config::PlatformParameters).await; + + if let Ok(message) = extn_message_response { + if let Some(_response) = message.payload.extract().map(|response| { + if let ExtnResponse::Value(v) = response { + serde_json::from_value::(v) + .map(|thunder_parameters| { + url::Url::parse(&thunder_parameters.gateway).map_or_else( + |_| { + warn!( + "Could not parse thunder gateway '{}', using default {}", + thunder_parameters.gateway, GATEWAY_DEFAULT + ); + }, + |gtway_url| { + debug!("Got url from device manifest"); + gateway_url = gtway_url; + }, + ); + }) + .unwrap_or_else(|_| { + warn!( + "Could not read thunder platform parameters, using default {}", + GATEWAY_DEFAULT + ); + }); + } + if let Ok(host_override) = std::env::var("DEVICE_HOST") { + gateway_url.set_host(Some(&host_override)).ok(); + } + }) {} + } + + if let Ok(thndr_client) = ThunderClientBuilder::start_thunder_client( + gateway_url.clone(), + None, + None, + None, + None, + true, + ) + .await + { + let thunder_state = ThunderState::new(ext_client.clone(), thndr_client); + + let thndr_boot_statecfg = ThunderBootstrapStateWithConfig { + extn_client: ext_client, + url: gateway_url, + pool_size: None, + plugin_param: None, + thunder_connection_state: None, + }; + + let thndr_boot_stateclient = ThunderBootstrapStateWithClient { + prev: thndr_boot_statecfg, + state: thunder_state, + }; + + thndr_boot_stateclient.clone().state.start_event_thread(); + + Some(thndr_boot_stateclient) + } else { + None + } + } else if let Ok(state) = ThunderGetConfigStep::setup(ext_client, plugin_param).await { if let Ok(state) = ThunderPoolStep::setup(state).await { - SetupThunderProcessor::setup(state.clone()).await; - return Some(state); + Some(state) } else { error!("Unable to connect to Thunder, error in ThunderPoolStep"); + None } } else { error!("Unable to connect to Thunder, error in ThunderGetConfigStep"); + None + }; + + if let Some(s) = state.clone() { + SetupThunderProcessor::setup(s).await; } - None + state } diff --git a/device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs b/device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs index 77e477fb9..d0e28207d 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs @@ -93,9 +93,9 @@ impl ThunderGetConfigStep { return Ok(ThunderBootstrapStateWithConfig { extn_client: state, url: gateway_url, - pool_size, - plugin_param: expected_plugins, - thunder_connection_state: Arc::new(ThunderConnectionState::new()), + pool_size: Some(pool_size), + plugin_param: Some(expected_plugins), + thunder_connection_state: Some(Arc::new(ThunderConnectionState::new())), }); } } diff --git a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs index 252b2971a..68149dd43 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs @@ -40,13 +40,16 @@ impl ThunderPoolStep { pub async fn setup( state: ThunderBootstrapStateWithConfig, ) -> Result { - let pool_size = state.pool_size; let url = state.url.clone(); let thunder_connection_state = state.thunder_connection_state.clone(); - if pool_size < 2 { - warn!("Pool size of 1 is not recommended, there will be no dedicated connection for Controller events"); - return Err(RippleError::BootstrapError); - } + let pool_size = match state.pool_size { + Some(s) => s, + None => { + warn!("Pool size of 1 is not recommended, there will be no dedicated connection for Controller events"); + return Err(RippleError::BootstrapError); + } + }; + let controller_pool = ripple_sdk::tokio::time::timeout( Duration::from_secs(10), ThunderClientPool::start(url.clone(), None, thunder_connection_state.clone(), 1), @@ -68,7 +71,13 @@ impl ThunderPoolStep { }; info!("Received Controller pool"); - let expected_plugins = state.plugin_param.clone(); + let expected_plugins = match state.plugin_param.clone() { + Some(plugins) => plugins, + None => { + error!("Expected plugins are not provided."); + return Err(RippleError::BootstrapError); + } + }; let tc = Box::new(controller_pool); let (plugin_manager_tx, failed_plugins) = PluginManager::start(tc, expected_plugins.clone()).await; @@ -98,7 +107,7 @@ impl ThunderPoolStep { } let client = ThunderClientPool::start( - url, + url.clone(), Some(plugin_manager_tx), thunder_connection_state.clone(), pool_size - 1, diff --git a/core/sdk/src/api/device/device_operator.rs b/device/thunder_ripple_sdk/src/client/device_operator.rs similarity index 72% rename from core/sdk/src/api/device/device_operator.rs rename to device/thunder_ripple_sdk/src/client/device_operator.rs index 6a0a3deaf..9b42dbe60 100644 --- a/core/sdk/src/api/device/device_operator.rs +++ b/device/thunder_ripple_sdk/src/client/device_operator.rs @@ -14,11 +14,14 @@ // // SPDX-License-Identifier: Apache-2.0 // - -use async_trait::async_trait; +use ripple_sdk::{ + api::gateway::rpc_gateway_api::JsonRpcApiResponse, + async_trait::async_trait, + log::error, + tokio::sync::{mpsc, oneshot::error::RecvError}, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::sync::{mpsc, oneshot::error::RecvError}; pub const DEFAULT_DEVICE_OPERATION_TIMEOUT_SECS: u64 = 5; @@ -37,6 +40,12 @@ pub trait DeviceOperator: Clone { async fn unsubscribe(&self, request: DeviceUnsubscribeRequest); } +#[derive(Debug, Clone)] +pub struct DeviceResponseSubscription { + pub sub_id: Option, + pub handlers: Vec>, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum DeviceChannelRequest { Call(DeviceCallRequest), @@ -44,6 +53,21 @@ pub enum DeviceChannelRequest { Unsubscribe(DeviceUnsubscribeRequest), } +impl DeviceChannelRequest { + pub fn get_callsign_method(&self) -> (String, String) { + match self { + DeviceChannelRequest::Call(c) => { + let mut collection: Vec<&str> = c.method.split('.').collect(); + let method = collection.pop().unwrap_or_default(); + let callsign = collection.join("."); + (callsign, method.into()) + } + DeviceChannelRequest::Subscribe(s) => (s.module.clone(), s.event_name.clone()), + DeviceChannelRequest::Unsubscribe(u) => (u.module.clone(), u.event_name.clone()), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeviceCallRequest { pub method: String, @@ -116,6 +140,31 @@ impl DeviceResponseMessage { sub_id: Some(sub_id), } } + + pub fn new(message: Value, sub_id: Option) -> DeviceResponseMessage { + DeviceResponseMessage { message, sub_id } + } + + pub fn create( + json_resp: &JsonRpcApiResponse, + sub_id: Option, + ) -> Option { + let mut device_response_msg = None; + if let Some(res) = &json_resp.result { + device_response_msg = Some(DeviceResponseMessage::new(res.clone(), sub_id)); + } else if let Some(er) = &json_resp.error { + device_response_msg = Some(DeviceResponseMessage::new(er.clone(), sub_id)); + } else if json_resp.clone().method.is_some() { + if let Some(params) = &json_resp.params { + if let Ok(dev_resp) = serde_json::to_value(params) { + device_response_msg = Some(DeviceResponseMessage::new(dev_resp, sub_id)); + } + } + } else { + error!("deviceresponse msg extraction failed."); + } + device_response_msg + } } #[cfg(test)] diff --git a/device/thunder_ripple_sdk/src/client/plugin_manager.rs b/device/thunder_ripple_sdk/src/client/plugin_manager.rs index 4b348f3f7..3dcacc30b 100644 --- a/device/thunder_ripple_sdk/src/client/plugin_manager.rs +++ b/device/thunder_ripple_sdk/src/client/plugin_manager.rs @@ -15,25 +15,26 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::collections::{HashMap, HashSet}; -use std::time::Duration; - use ripple_sdk::log::info; use ripple_sdk::tokio; +use ripple_sdk::{log::error, serde_json}; use ripple_sdk::{ - api::device::device_operator::{DeviceCallRequest, DeviceSubscribeRequest}, - log::error, - serde_json, -}; -use ripple_sdk::{ - api::device::device_operator::{DeviceChannelParams, DeviceOperator, DeviceResponseMessage}, tokio::sync::{mpsc, oneshot}, utils::channel_utils::{mpsc_send_and_log, oneshot_send_and_log}, }; use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::time::Duration; -use super::thunder_plugin::ThunderPlugin::Controller; -use super::{thunder_client::ThunderClient, thunder_plugin::ThunderPlugin}; +use super::{ + device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, + DeviceSubscribeRequest, + }, + thunder_client::ThunderClient, + thunder_plugin::ThunderPlugin, + thunder_plugin::ThunderPlugin::Controller, +}; pub struct ActivationSubscriber { pub callsign: String, @@ -497,7 +498,7 @@ mod tests { let controller_pool = ThunderClientPool::start( url.clone(), None, - Arc::new(ThunderConnectionState::new()), + Some(Arc::new(ThunderConnectionState::new())), 1, ) .await; @@ -520,7 +521,7 @@ mod tests { let client = ThunderClientPool::start( url, Some(plugin_manager_tx_clone), - Arc::new(ThunderConnectionState::new()), + Some(Arc::new(ThunderConnectionState::new())), 4, ) .await; diff --git a/device/thunder_ripple_sdk/src/client/thunder_async_client.rs b/device/thunder_ripple_sdk/src/client/thunder_async_client.rs new file mode 100644 index 000000000..e9ebdb3ab --- /dev/null +++ b/device/thunder_ripple_sdk/src/client/thunder_async_client.rs @@ -0,0 +1,578 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use super::{ + device_operator::{DeviceChannelParams, DeviceChannelRequest, DeviceResponseMessage}, + thunder_async_client_plugins_status_mgr::{AsyncCallback, AsyncSender, StatusManager}, +}; +use crate::utils::get_next_id; +use futures::stream::{SplitSink, SplitStream}; +use futures_util::{SinkExt, StreamExt}; +use ripple_sdk::{ + api::gateway::rpc_gateway_api::{JsonRpcApiRequest, JsonRpcApiResponse}, + log::{debug, error, info}, + tokio::{self, net::TcpStream, sync::mpsc::Receiver}, + utils::{error::RippleError, rpc_utils::extract_tcp_port}, +}; +use serde_json::{json, Value}; +use std::{collections::HashMap, time::Duration}; +use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream}; + +#[derive(Clone, Debug)] +pub struct ThunderAsyncClient { + status_manager: StatusManager, + sender: AsyncSender, + callback: AsyncCallback, + subscriptions: HashMap, +} + +#[derive(Clone, Debug)] +pub struct ThunderAsyncRequest { + pub id: u64, + request: DeviceChannelRequest, +} + +impl std::fmt::Display for ThunderAsyncRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ThunderAsyncRequest {{ id: {}, request: {:?} }}", + self.id, self.request + ) + } +} + +impl ThunderAsyncRequest { + pub fn new(request: DeviceChannelRequest) -> Self { + Self { + id: get_next_id(), + request, + } + } +} + +#[derive(Clone, Debug)] +pub struct ThunderAsyncResponse { + pub id: Option, + pub result: Result, +} + +impl ThunderAsyncClient {} + +impl ThunderAsyncResponse { + fn new_response(response: JsonRpcApiResponse) -> Self { + Self { + id: response.id, + result: Ok(response), + } + } + + fn new_error(id: u64, e: RippleError) -> Self { + Self { + id: Some(id), + result: Err(e), + } + } + + pub fn get_method(&self) -> Option { + if let Ok(e) = &self.result { + return e.method.clone(); + } + None + } + + pub fn get_id(&self) -> Option { + match &self.result { + Ok(response) => response.id, + Err(_) => None, + } + } + + pub fn get_device_resp_msg(&self, sub_id: Option) -> Option { + let json_resp = match &self.result { + Ok(json_resp_res) => json_resp_res, + _ => return None, + }; + DeviceResponseMessage::create(json_resp, sub_id) + } +} + +impl ThunderAsyncClient { + pub fn get_sender(&self) -> AsyncSender { + self.sender.clone() + } + + pub fn get_callback(&self) -> AsyncCallback { + self.callback.clone() + } + async fn create_ws( + endpoint: &str, + ) -> ( + SplitSink, Message>, + SplitStream>, + ) { + debug!("create_ws: {}", endpoint); + let port = extract_tcp_port(endpoint); + let tcp_port = port.unwrap(); + let mut index = 0; + + loop { + // Try connecting to the tcp port first + if let Ok(v) = TcpStream::connect(&tcp_port).await { + debug!("create_ws: Connected"); + // Setup handshake for websocket with the tcp port + // Some WS servers lock on to the Port but not setup handshake till they are fully setup + if let Ok((stream, _)) = client_async(endpoint, v).await { + break stream.split(); + } + } + if (index % 10).eq(&0) { + error!( + "create_ws: endpoint {} failed with retry for last {} secs in {}", + endpoint, index, tcp_port + ); + } + index += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + fn check_plugin_status_n_prepare_request( + &self, + request: &ThunderAsyncRequest, + ) -> Result { + let id: u64 = request.id; + let (callsign, method) = request.request.get_callsign_method(); + // Check if the method is empty and return an error if it is + if method.is_empty() { + return Err(RippleError::InvalidInput); + } + // Check the status of the plugin using the status manager + let status = match self.status_manager.get_status(callsign.clone()) { + Some(v) => v.clone(), + None => { + // If the plugin status is not available, add the request to the pending list + self.status_manager + .add_async_client_request_to_pending_list(callsign.clone(), request.clone()); + // Generate a request to check the plugin status and add it to the requests list + let request = self + .status_manager + .generate_plugin_status_request(callsign.clone()); + return Ok(request.to_string()); + } + }; + // If the plugin is missing, return a service error + if status.state.is_missing() { + error!("Plugin {} is missing", callsign); + return Err(RippleError::ServiceError); + } + // If the plugin is activating, return a service not ready error + if status.state.is_activating() { + info!("Plugin {} is activating", callsign); + return Err(RippleError::ServiceNotReady); + } + // If the plugin is not activated, add the request to the pending list and generate an activation request + if !status.state.is_activated() { + self.status_manager + .add_async_client_request_to_pending_list(callsign.clone(), request.clone()); + let request = self + .status_manager + .generate_plugin_activation_request(callsign.clone()); + return Ok(request.to_string()); + } + + // Generate the appropriate JSON-RPC request based on the type of DeviceChannelRequest + let json_rpc_api_request = match &request.request { + DeviceChannelRequest::Call(device_call_request) => { + let mut params_value = None; + if let Some(device_channel_params) = &device_call_request.params { + match device_channel_params { + DeviceChannelParams::Json(params_str) => { + params_value = serde_json::from_str::(params_str).ok(); + } + DeviceChannelParams::Bool(params_bool) => { + params_value = Some(Value::Bool(*params_bool)); + } + DeviceChannelParams::Literal(params_literal) => { + params_value = Some(Value::String(params_literal.clone())); + } + }; + } + + JsonRpcApiRequest::new(device_call_request.method.clone(), params_value).with_id(id) + } + DeviceChannelRequest::Unsubscribe(_) => JsonRpcApiRequest::new( + format!("{}.unregister", callsign), + Some(json!({ + "event": method, + "id": "client.events" + })), + ) + .with_id(id), + DeviceChannelRequest::Subscribe(_) => JsonRpcApiRequest::new( + format!("{}.register", callsign), + Some(json!({ + "event": method, + "id": "client.events" + })), + ) + .with_id(id), + }; + + serde_json::to_string(&json_rpc_api_request).map_err(|_e| RippleError::ParseError) + } + + pub fn new(callback: AsyncCallback, sender: AsyncSender) -> Self { + Self { + status_manager: StatusManager::new(), + sender, + callback, + subscriptions: HashMap::new(), + } + } + + async fn handle_response(&mut self, message: Message) { + if let Message::Text(t) = message { + let request = t.as_bytes(); + //check controller response or not + if self + .status_manager + .is_controller_response(self.get_sender(), self.callback.clone(), request) + .await + { + self.status_manager + .handle_controller_response(self.get_sender(), self.callback.clone(), request) + .await; + } else { + self.handle_jsonrpc_response(request).await + } + } + } + + async fn process_subscribe_requests( + &mut self, + ws_tx: &mut SplitSink, Message>, + ) { + for (_, subscription_request) in self.subscriptions.iter_mut() { + let new_id = get_next_id(); + + debug!( + "process_subscribe_requests: method={}, params={:?}, old_id={:?}, new_id={}", + subscription_request.method, + subscription_request.params, + subscription_request.id, + new_id + ); + + subscription_request.id = Some(new_id); + + let request_json = serde_json::to_string(&subscription_request).unwrap(); + let _feed = ws_tx + .feed(tokio_tungstenite::tungstenite::Message::Text(request_json)) + .await; + } + } + + pub async fn start( + &mut self, + url: &str, + mut thunder_async_request_rx: Receiver, + ) { + loop { + info!("start: (re)establishing websocket connection: url={}", url); + + let (mut thunder_tx, mut thunder_rx) = Self::create_ws(url).await; + + // send the controller statechange subscription request + let status_request = self + .status_manager + .generate_state_change_subscribe_request(); + + let _feed = thunder_tx + .feed(tokio_tungstenite::tungstenite::Message::Text( + status_request.to_string(), + )) + .await; + + self.process_subscribe_requests(&mut thunder_tx).await; + + let _flush = thunder_tx.flush().await; + + tokio::pin! { + let subscriptions_socket = thunder_rx.next(); + } + + loop { + tokio::select! { + Some(value) = &mut subscriptions_socket => { + match value { + Ok(message) => { + self.handle_response(message).await; + }, + Err(e) => { + error!("Thunder_async_client Websocket error on read {:?}", e); + break; + } + } + }, + Some(request) = thunder_async_request_rx.recv() => { + match self.check_plugin_status_n_prepare_request(&request) { + Ok(updated_request) => { + if let Ok(jsonrpc_request) = serde_json::from_str::(&updated_request) { + if jsonrpc_request.method.ends_with(".register") { + if let Some(Value::Object(ref params)) = jsonrpc_request.params { + if let Some(Value::String(event)) = params.get("event") { + debug!("thunder_async_request_rx: Rerouting subscription request for {}", event); + + // Store the subscription request in the subscriptions list in case we need to + // resubscribe later due to a socket disconnect. + self.subscriptions.insert(event.to_string(), jsonrpc_request.clone()); + debug!("thunder_async_request_rx: subscription request={}", updated_request); + // Reroute subsubscription requests through the persistent websocket so all notifications + // are sent to the same websocket connection. + let _feed = thunder_tx.feed(tokio_tungstenite::tungstenite::Message::Text(updated_request)).await; + let _flush = thunder_tx.flush().await; + } else { + error!("thunder_async_request_rx: Missing 'event' parameter"); + } + } else { + error!("thunder_async_request_rx: Missing 'params' object"); + } + } + else { + debug!("thunder_async_request_rx: call request={}", updated_request); + let _feed = thunder_tx.feed(tokio_tungstenite::tungstenite::Message::Text(updated_request)).await; + let _flush = thunder_tx.flush().await; + } + } + } + Err(e) => { + let response = ThunderAsyncResponse::new_error(request.id,e.clone()); + match e { + RippleError::ServiceNotReady => { + info!("prepare request failed for request {:?}", request); + }, + _ => { + error!("error preparing request {:?}", e) + } + } + self.callback.send(response).await; + } + } + } + } + } + } + } + + async fn handle_jsonrpc_response(&mut self, result: &[u8]) { + if let Ok(message) = serde_json::from_slice::(result) { + self.callback + .send(ThunderAsyncResponse::new_response(message)) + .await + } else { + error!("handle_jsonrpc_response: Invalid JSON RPC message sent by Thunder"); + } + } + + pub async fn send(&self, request: ThunderAsyncRequest) { + if let Err(e) = self.sender.send(request).await { + error!("Failed to send thunder Async Request: {:?}", e); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::device_operator::DeviceCallRequest; + use ripple_sdk::api::gateway::rpc_gateway_api::JsonRpcApiResponse; + use ripple_sdk::utils::error::RippleError; + use tokio::sync::mpsc; + + #[tokio::test] + async fn test_thunder_async_request_new() { + let callrequest = DeviceCallRequest { + method: "org.rdk.System.1.getSerialNumber".to_string(), + params: None, + }; + + let request = DeviceChannelRequest::Call(callrequest); + let _async_request = ThunderAsyncRequest::new(request.clone()); + assert_eq!( + _async_request.request.get_callsign_method(), + request.get_callsign_method() + ); + } + + #[tokio::test] + async fn test_thunder_async_response_new_response() { + let response = JsonRpcApiResponse { + jsonrpc: "2.0".to_string(), + id: Some(6), + result: Some(json!({"key": "value"})), + error: None, + method: None, + params: None, + }; + + let _async_response = ThunderAsyncResponse::new_response(response.clone()); + assert_eq!(_async_response.result.unwrap().result, response.result); + } + + #[tokio::test] + async fn test_thunder_async_response_new_error() { + let error = RippleError::ServiceError; + let async_response = ThunderAsyncResponse::new_error(1, error.clone()); + assert_eq!(async_response.id, Some(1)); + assert_eq!(async_response.result.unwrap_err(), error); + } + + #[tokio::test] + async fn test_thunder_async_response_get_event() { + let response = JsonRpcApiResponse { + jsonrpc: "2.0".to_string(), + id: Some(6), + result: Some(json!({"key": "value"})), + error: None, + method: Some("event_1".to_string()), + params: None, + }; + let async_response = ThunderAsyncResponse::new_response(response); + assert_eq!(async_response.get_method(), Some("event_1".to_string())); + } + + #[tokio::test] + async fn test_thunder_async_response_get_id() { + let response = JsonRpcApiResponse { + jsonrpc: "2.0".to_string(), + id: Some(42), + result: Some(json!({"key": "value"})), + error: None, + method: Some("event_1".to_string()), + params: None, + }; + let async_response = ThunderAsyncResponse::new_response(response); + assert_eq!(async_response.get_id(), Some(42)); + } + + #[tokio::test] + async fn test_thunder_async_response_get_device_resp_msg() { + let response = JsonRpcApiResponse { + jsonrpc: "2.0".to_string(), + id: Some(6), + result: Some(json!({"key": "value"})), + error: None, + method: Some("event_1".to_string()), + params: None, + }; + let async_response = ThunderAsyncResponse::new_response(response); + let device_resp_msg = async_response.get_device_resp_msg(None); + assert_eq!(device_resp_msg.unwrap().message, json!({"key": "value"})); + } + + #[tokio::test] + async fn test_thunder_async_client_prepare_request() { + let (resp_tx, _resp_rx) = mpsc::channel(10); + let callback = AsyncCallback { sender: resp_tx }; + let (async_tx, _async_rx) = mpsc::channel(10); + let async_sender = AsyncSender { sender: async_tx }; + let client = ThunderAsyncClient::new(callback, async_sender); + + let callrequest = DeviceCallRequest { + method: "org.rdk.System.1.getSerialNumber".to_string(), + params: None, + }; + + let request = DeviceChannelRequest::Call(callrequest); + let async_request = ThunderAsyncRequest::new(request); + let result = client.check_plugin_status_n_prepare_request(&async_request); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_thunder_async_client_send() { + let (resp_tx, _resp_rx) = mpsc::channel(10); + let callback = AsyncCallback { sender: resp_tx }; + let (async_tx, mut async_rx) = mpsc::channel(10); + let async_sender = AsyncSender { sender: async_tx }; + let client = ThunderAsyncClient::new(callback, async_sender); + + let callrequest = DeviceCallRequest { + method: "org.rdk.System.1.getSerialNumber".to_string(), + params: None, + }; + + let request = DeviceChannelRequest::Call(callrequest); + let async_request = ThunderAsyncRequest::new(request); + client.send(async_request.clone()).await; + let received = async_rx.recv().await; + assert_eq!(received.unwrap().id, async_request.id); + } + + #[tokio::test] + async fn test_thunder_async_client_handle_jsonrpc_response() { + let (resp_tx, mut resp_rx) = mpsc::channel(10); + let callback = AsyncCallback { sender: resp_tx }; + let response = JsonRpcApiResponse { + jsonrpc: "2.0".to_string(), + id: Some(6), + result: Some(json!({"key": "value"})), + error: None, + method: Some("event_1".to_string()), + params: None, + }; + let response_bytes = serde_json::to_vec(&response).unwrap(); + let (async_tx, _async_rx) = mpsc::channel(1); + let async_sender = AsyncSender { sender: async_tx }; + let mut client = ThunderAsyncClient::new(callback, async_sender); + client.handle_jsonrpc_response(&response_bytes).await; + + let received = resp_rx.recv().await; + assert_eq!( + received.unwrap().result.unwrap().result, + Some(json!({"key": "value"})) + ); + } + + #[tokio::test] + async fn test_thunder_async_client_start() { + let (resp_tx, mut resp_rx) = mpsc::channel(10); + let callback = AsyncCallback { sender: resp_tx }; + let (async_tx, _async_rx) = mpsc::channel(10); + let async_sender = AsyncSender { sender: async_tx }; + let mut client = ThunderAsyncClient::new(callback.clone(), async_sender); + + let response = json!({ + "jsonrpc": "2.0", + "result": { + "key": "value" + } + }); + + client + .handle_jsonrpc_response(response.to_string().as_bytes()) + .await; + let received = resp_rx.recv().await; + assert!(received.is_some()); + let async_response = received.unwrap(); + assert_eq!( + async_response.result.unwrap().result, + Some(json!({"key": "value"})) + ); + } +} diff --git a/device/thunder_ripple_sdk/src/client/thunder_async_client_plugins_status_mgr.rs b/device/thunder_ripple_sdk/src/client/thunder_async_client_plugins_status_mgr.rs new file mode 100644 index 000000000..263026bf6 --- /dev/null +++ b/device/thunder_ripple_sdk/src/client/thunder_async_client_plugins_status_mgr.rs @@ -0,0 +1,712 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use ripple_sdk::{ + api::gateway::rpc_gateway_api::JsonRpcApiResponse, + chrono::{DateTime, Duration, Utc}, + framework::RippleResponse, + log::{error, info, warn}, + tokio::sync::mpsc::Sender, + utils::error::RippleError, +}; + +use super::thunder_async_client::{ThunderAsyncRequest, ThunderAsyncResponse}; +use crate::utils::get_next_id; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Debug)] +pub struct AsyncSender { + pub sender: Sender, +} + +impl AsyncSender { + // Method to send the request to the underlying broker for handling. + pub async fn send(&self, request: ThunderAsyncRequest) -> RippleResponse { + if let Err(e) = self.sender.send(request).await { + error!("Error sending to broker {:?}", e); + Err(RippleError::SendFailure) + } else { + Ok(()) + } + } +} + +/// BrokerCallback will be used by the communication broker to send the firebolt response +/// back to the gateway for client consumption +#[derive(Clone, Debug)] +pub struct AsyncCallback { + pub sender: Sender, +} + +impl AsyncCallback { + pub async fn send(&self, response: ThunderAsyncResponse) { + if (self.sender.send(response).await).is_err() { + error!("error returning callback for request") + } + } +} + +// defautl timeout for plugin activation in seconds +const DEFAULT_PLUGIN_ACTIVATION_TIMEOUT: i64 = 8; + +// As per thunder 4_4 documentation, the statechange event is published under the method "client.events.1.statechange" +// But it didn't work, most probably a documentation issue. +// const STATE_CHANGE_EVENT_METHOD: &str = "client.events.1.statechange"; + +const STATE_CHANGE_EVENT_METHOD: &str = "thunder.Broker.Controller.events.statechange"; + +#[derive(Debug, Deserialize, PartialEq, Serialize, Clone)] +pub struct Status { + pub callsign: String, + pub state: String, +} + +#[derive(Debug, Deserialize)] +pub struct ThunderError { + pub code: i32, + pub message: String, +} + +impl ThunderError { + pub fn get_state(&self) -> State { + match self.message.as_str() { + "ERROR_INPROGRESS" | "ERROR_PENDING_CONDITIONS" => State::InProgress, + "ERROR_UNKNOWN_KEY" => State::Missing, + _ => State::Unknown, + } + } +} + +impl Status { + pub fn to_state(&self) -> State { + match self.state.as_str() { + "activated" | "resumed" | "suspended" => State::Activated, + "deactivated" => State::Deactivated, + "deactivation" => State::Deactivation, + "activation" | "precondition" => State::Activation, + _ => State::Unavailable, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct StateChangeEvent { + pub callsign: String, + pub state: State, +} + +#[derive(Debug, Deserialize, PartialEq, Serialize, Clone)] +pub enum State { + Activated, + Activation, + Deactivated, + Deactivation, + Unavailable, + Precondition, + Suspended, + Resumed, + Missing, + Error, + InProgress, + Unknown, +} + +impl State { + pub fn is_activated(&self) -> bool { + matches!(self, State::Activated) + } + pub fn is_activating(&self) -> bool { + matches!(self, State::Activation) + } + pub fn is_missing(&self) -> bool { + matches!(self, State::Missing) + } + pub fn is_unavailable(&self) -> bool { + matches!(self, State::Unavailable | State::Unknown | State::Missing) + } +} + +#[derive(Debug, Clone)] +pub struct ThunderPluginState { + pub state: State, + pub activation_timestamp: DateTime, + pub pending_requests: Vec, +} +#[derive(Debug, Clone)] +pub struct StatusManager { + pub status: Arc>>, + pub inprogress_plugins_request: Arc>>, +} + +impl Default for StatusManager { + fn default() -> Self { + Self::new() + } +} + +impl StatusManager { + pub fn new() -> Self { + Self { + status: Arc::new(RwLock::new(HashMap::new())), + inprogress_plugins_request: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn get_controller_call_sign() -> String { + "Controller.1.".to_string() + } + + pub fn update_status(&self, plugin_name: String, state: State) { + info!( + "Updating the status of the plugin: {:?} to state: {:?}", + plugin_name, state + ); + let mut status = self.status.write().unwrap(); + // get the current plugin state from hashmap and update the State + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.state = state; + } else { + // if the plugin is not present in the hashmap, add it + status.insert( + plugin_name, + ThunderPluginState { + state, + activation_timestamp: Utc::now(), + pending_requests: Vec::new(), + }, + ); + } + } + + pub fn add_async_client_request_to_pending_list( + &self, + plugin_name: String, + request: ThunderAsyncRequest, + ) { + let mut status = self.status.write().unwrap(); + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.pending_requests.push(request); + } else { + status.insert( + plugin_name.clone(), + ThunderPluginState { + state: State::Unknown, + activation_timestamp: Utc::now(), + pending_requests: vec![request], + }, + ); + } + // update the time stamp + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.activation_timestamp = Utc::now(); + } + } + + // clear all pending requests for the given plugin and return the list of requests to the caller + // Also return a flag to indicate if activation time has expired. + pub fn retrive_pending_broker_requests( + &self, + plugin_name: String, + ) -> (Vec, bool) { + let mut status = self.status.write().unwrap(); + if let Some(plugin_state) = status.get_mut(&plugin_name) { + let pending_requests = plugin_state.pending_requests.clone(); + plugin_state.pending_requests.clear(); + // check if the activation time has expired. + let now = Utc::now(); + if now - plugin_state.activation_timestamp + > Duration::seconds(DEFAULT_PLUGIN_ACTIVATION_TIMEOUT) + { + return (pending_requests, true); + } else { + return (pending_requests, false); + } + } + (Vec::new(), false) + } + + pub fn get_all_pending_broker_requests(&self, plugin_name: String) -> Vec { + let status = self.status.read().unwrap(); + if let Some(plugin_state) = status.get(&plugin_name) { + plugin_state.pending_requests.clone() + } else { + Vec::new() + } + } + + pub fn clear_all_pending_broker_requests(&self, plugin_name: String) { + let mut status = self.status.write().unwrap(); + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.pending_requests.clear(); + } + } + + pub fn get_status(&self, plugin_name: String) -> Option { + let status = self.status.read().unwrap(); + status.get(&plugin_name).cloned() + } + + pub fn generate_plugin_activation_request(&self, plugin_name: String) -> String { + let id = get_next_id(); + let controller_call_sign = Self::get_controller_call_sign(); + + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}activate", controller_call_sign), + "params": json!({ + "callsign": plugin_name, + }) + }) + .to_string(); + // Add this request to the inprogress_plugins_request + self.add_thunder_request_to_inprogress_list(id, request.clone()); + request + } + + pub fn generate_plugin_status_request(&self, plugin_name: String) -> String { + let id = get_next_id(); + let controller_call_sign = Self::get_controller_call_sign(); + + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}status@{}", controller_call_sign, plugin_name), + }) + .to_string(); + // Add this request to the inprogress_plugins_request + self.add_thunder_request_to_inprogress_list(id, request.clone()); + request + } + + pub fn generate_state_change_subscribe_request(&self) -> String { + let id = get_next_id(); + let controller_call_sign = Self::get_controller_call_sign(); + + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}register", controller_call_sign), + "params": json!({ + "event": "statechange", + "id": "thunder.Broker.Controller.events" + }) + }) + .to_string(); + // Add this request to the inprogress_plugins_request + self.add_thunder_request_to_inprogress_list(id, request.clone()); + request + } + + fn add_thunder_request_to_inprogress_list(&self, id: u64, request: String) { + let mut inprogress_plugins_request = self.inprogress_plugins_request.write().unwrap(); + inprogress_plugins_request.insert(id, request); + } + + pub async fn is_controller_response( + &self, + sender: AsyncSender, + callback: AsyncCallback, + result: &[u8], + ) -> bool { + let data = match serde_json::from_slice::(result) { + Ok(data) => data, + Err(_) => return false, + }; + + if let Some(method) = data.method { + info!("is_controller_response Method: {:?}", method); + if method == STATE_CHANGE_EVENT_METHOD { + // intercept the statechange event and update plugin status. + let params = match data.params { + Some(params) => params, + None => return false, + }; + + let event: StateChangeEvent = match serde_json::from_value(params) { + Ok(event) => event, + Err(_) => return false, + }; + + self.update_status(event.callsign.clone(), event.state.clone()); + + if event.state.is_activated() { + // get the pending ThunderAsyncRequest and process. + let (pending_requests, expired) = + self.retrive_pending_broker_requests(event.callsign); + if !pending_requests.is_empty() { + for pending_request in pending_requests { + if expired { + error!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; + } + } + } + } + + return true; + } + } + + if let Some(id) = data.id { + let inprogress_plugins_request = self.inprogress_plugins_request.read().unwrap(); + return inprogress_plugins_request.contains_key(&id); + } + + false + } + async fn on_activate_response( + &self, + sender: AsyncSender, + callback: AsyncCallback, + data: &JsonRpcApiResponse, + request: &str, + ) { + let result = match &data.result { + Some(result) => result, + None => return, + }; + + let callsign = match request.split("callsign\":").last() { + Some(callsign) => callsign.trim_matches(|c| c == '"' || c == '}'), + None => return, + }; + + let (pending_requests, expired) = + self.retrive_pending_broker_requests(callsign.to_string()); + + if result.is_null() { + self.update_status(callsign.to_string(), State::Activated); + + for pending_request in pending_requests { + if expired { + error!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; + } + } + } else if let Some(_e) = &data.error { + Self::on_thunder_error_response(self, callback, data, &callsign.to_string()).await; + } + } + + async fn on_status_response( + &self, + sender: AsyncSender, + callback: AsyncCallback, + data: &JsonRpcApiResponse, + request: &str, + ) { + let result = match &data.result { + Some(result) => result, + None => return, + }; + + let callsign = match request.split('@').last() { + Some(callsign) => callsign.trim_matches(|c| c == '"' || c == '}'), + None => return, + }; + + let status_res: Vec = match serde_json::from_value(result.clone()) { + Ok(status_res) => status_res, + Err(_) => { + Self::on_thunder_error_response(self, callback, data, &callsign.to_string()).await; + return; + } + }; + + for status in status_res { + if status.callsign != callsign { + // it's not required to enforce callsign matching. But it's good to log a warning. + // Already chekced the id in the request, so it's safe to ignore this. + warn!( + "Call Sign not matching callsign from response: {:?} callsign : {:?}", + status.callsign, callsign + ); + } + self.update_status(callsign.to_string(), status.to_state()); + + let (pending_requests, expired) = + self.retrive_pending_broker_requests(callsign.to_string()); + + for pending_request in pending_requests { + if expired { + error!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; + } + } + } + } + + async fn on_thunder_error_response( + &self, + callback: AsyncCallback, + data: &JsonRpcApiResponse, + plugin_name: &String, + ) { + let error = match &data.error { + Some(error) => error, + None => return, + }; + + error!( + "Error Received from Thunder on getting the status of the plugin: {:?}", + error + ); + + let thunder_error: ThunderError = match serde_json::from_value(error.clone()) { + Ok(error) => error, + Err(_) => return, + }; + + let state = thunder_error.get_state(); + self.update_status(plugin_name.to_string(), state.clone()); + + if state.is_unavailable() { + let (pending_requests, _) = + self.retrive_pending_broker_requests(plugin_name.to_string()); + + for pending_request in pending_requests { + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } + } + } + + pub fn get_from_inprogress_plugins_request_list(&self, id: u64) -> Option { + let inprogress_plugins_request = self.inprogress_plugins_request.read().unwrap(); + inprogress_plugins_request.get(&id).cloned() + } + + pub async fn handle_controller_response( + &self, + sender: AsyncSender, + callback: AsyncCallback, + result: &[u8], + ) { + let data = match serde_json::from_slice::(result) { + Ok(data) => data, + Err(_) => return, + }; + + let id = match data.id { + Some(id) => id, + None => return, + }; + + let request = match self.get_from_inprogress_plugins_request_list(id) { + Some(request) => request, + None => return, + }; + + if request.contains("Controller.1.activate") { + // handle activate response + self.on_activate_response(sender, callback, &data, &request) + .await; + } else if request.contains("Controller.1.status@") { + // handle status response + self.on_status_response(sender, callback, &data, &request) + .await; + } else if request.contains("Controller.1.register") { + // nothing to do here + info!("StatusManger Received response for register request"); + } + + let mut inprogress_plugins_request = self.inprogress_plugins_request.write().unwrap(); + inprogress_plugins_request.remove(&id); + } +} + +impl AsyncCallback { + /// Default method used for sending errors via the BrokerCallback + pub async fn send_error(&self, request: ThunderAsyncRequest, error: RippleError) { + let response = ThunderAsyncResponse { + id: Some(request.id), + result: Err(error), + }; + self.send(response).await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ripple_sdk::tokio::{ + self, + sync::mpsc::{self, channel}, + }; + + #[test] + fn test_generate_state_change_subscribe_request() { + let status_manager = StatusManager::new(); + let request = status_manager.generate_state_change_subscribe_request(); + assert!(request.contains("register")); + assert!(request.contains("statechange")); + } + + #[tokio::test] + async fn test_on_activate_response() { + let status_manager = StatusManager::new(); + let (tx, _tr) = mpsc::channel(10); + let broker = AsyncSender { sender: tx }; + + let (tx_1, _tr_1) = channel(2); + let callback = AsyncCallback { sender: tx_1 }; + + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: Some(serde_json::json!(null)), + error: None, + method: None, + params: None, + }; + let request = r#"{"jsonrpc":"2.0","id":1,"method":"Controller.1.activate","params":{"callsign":"TestPlugin"}}"#; + status_manager + .on_activate_response(broker, callback, &data, request) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Activated); + } + + #[tokio::test] + async fn test_on_status_response() { + let status_manager = StatusManager::new(); + let (tx, _tr) = mpsc::channel(10); + let broker = AsyncSender { sender: tx }; + + let (tx_1, _tr_1) = channel(2); + let callback = AsyncCallback { sender: tx_1 }; + + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: Some(serde_json::json!([{"callsign":"TestPlugin","state":"activated"}])), + error: None, + method: None, + params: None, + }; + let request = r#"{"jsonrpc":"2.0","id":1,"method":"Controller.1.status@TestPlugin"}"#; + status_manager + .on_status_response(broker, callback, &data, request) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Activated); + } + + #[tokio::test] + async fn test_on_thunder_error_response() { + let status_manager = StatusManager::new(); + + let (tx_1, _tr_1) = channel(2); + let callback = AsyncCallback { sender: tx_1 }; + + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: None, + error: Some(serde_json::json!({"code":1,"message":"ERROR_UNKNOWN_KEY"})), + method: None, + params: None, + }; + let plugin_name = "TestPlugin".to_string(); + status_manager + .on_thunder_error_response(callback, &data, &plugin_name) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Missing); + } + + // Uncomment and use the following unit test only for local testing. Not use as part of the CI/CD pipeline. + /* + use ripple_sdk::{ + api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest}, + }; + use crate::broker::rules_engine::{Rule, RuleTransform}; + #[tokio::test] + async fn test_expired_broker_request() { + let status_manager = StatusManager::new(); + let (tx, _tr) = mpsc::channel(10); + let broker = BrokerSender { sender: tx }; + let (tx_1, _tr_1) = channel(2); + let callback = BrokerCallback { sender: tx_1 }; + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: Some(serde_json::json!(null)), + error: None, + method: None, + params: None, + }; + let request = r#"{"jsonrpc":"2.0","id":1,"method":"Controller.1.activate","params":{"callsign":"TestPlugin"}}"#; + status_manager + .on_activate_response(broker, callback, &data, request) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Activated); + let ctx = CallContext::new( + "session_id".to_string(), + "request_id".to_string(), + "app_id".to_string(), + 1, + ApiProtocol::Bridge, + "method".to_string(), + Some("cid".to_string()), + true, + ); + // Add a request to the pending list + let request = DeviceChannelRequest { + rpc: RpcRequest { + ctx, + params_json: "".to_string(), + method: "TestPlugin".to_string(), + }, + rule: Rule { + alias: "TestPlugin".to_string(), + transform: RuleTransform::default(), + endpoint: None, + }, + subscription_processed: None, + }; + status_manager.add_broker_request_to_pending_list("TestPlugin".to_string(), request); + // Sleep for 10 seconds to expire the request + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + // Check if the request is expired + let (pending_requests, expired) = + status_manager.retrive_pending_broker_requests("TestPlugin".to_string()); + assert_eq!(expired, true); + assert_eq!(pending_requests.len(), 1); + } + */ +} diff --git a/device/thunder_ripple_sdk/src/client/thunder_client.rs b/device/thunder_ripple_sdk/src/client/thunder_client.rs index 03ae0bbea..269948579 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_client.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_client.rs @@ -15,54 +15,101 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::collections::{BTreeMap, HashMap}; -use std::str::FromStr; -use std::sync::Arc; - +use super::thunder_async_client::{ThunderAsyncClient, ThunderAsyncRequest, ThunderAsyncResponse}; +use super::thunder_async_client_plugins_status_mgr::{AsyncCallback, AsyncSender}; +use super::thunder_client_pool::ThunderPoolCommand; +use super::{ + device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceChannelRequest, DeviceOperator, + DeviceResponseMessage, DeviceResponseSubscription, DeviceSubscribeRequest, + DeviceUnsubscribeRequest, + }, + jsonrpc_method_locator::JsonRpcMethodLocator, + plugin_manager::{PluginActivatedResult, PluginManagerCommand}, +}; +use crate::thunder_state::ThunderConnectionState; +use crate::utils::get_error_value; use jsonrpsee::core::client::{Client, ClientT, SubscriptionClientT}; -use jsonrpsee::ws_client::WsClientBuilder; - use jsonrpsee::core::{async_trait, error::Error as JsonRpcError}; use jsonrpsee::types::ParamsSer; +use jsonrpsee::ws_client::WsClientBuilder; use regex::Regex; -use ripple_sdk::serde_json::json; -use ripple_sdk::tokio::sync::oneshot::error::RecvError; -use ripple_sdk::{ - api::device::device_operator::DeviceResponseMessage, - tokio::sync::mpsc::{self, Sender as MpscSender}, - tokio::{sync::Mutex, task::JoinHandle, time::sleep}, -}; -use ripple_sdk::{ - api::device::device_operator::{ - DeviceCallRequest, DeviceSubscribeRequest, DeviceUnsubscribeRequest, - }, - serde_json::{self, Value}, - tokio, -}; -use ripple_sdk::{ - api::device::device_operator::{DeviceChannelParams, DeviceOperator}, - uuid::Uuid, -}; + use ripple_sdk::{ log::{error, info, warn}, + serde_json::{self, json, Value}, + tokio, + tokio::sync::mpsc::{self, Receiver, Sender as MpscSender}, + tokio::sync::oneshot::{self, error::RecvError, Sender as OneShotSender}, + tokio::{sync::Mutex, task::JoinHandle, time::sleep}, utils::channel_utils::{mpsc_send_and_log, oneshot_send_and_log}, -}; -use ripple_sdk::{ - tokio::sync::oneshot::{self, Sender as OneShotSender}, utils::error::RippleError, + uuid::Uuid, }; use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, HashMap}; +use std::str::FromStr; +use std::sync::Arc; +use std::sync::RwLock; +use std::{env, process::Command}; +use tokio::sync::oneshot::Sender; use url::Url; -use crate::thunder_state::ThunderConnectionState; -use crate::utils::get_error_value; +pub type BrokerSubMap = HashMap; +pub type BrokerCallbackMap = HashMap>>; -use super::thunder_client_pool::ThunderPoolCommand; -use super::{ - jsonrpc_method_locator::JsonRpcMethodLocator, - plugin_manager::{PluginActivatedResult, PluginManagerCommand}, -}; -use std::{env, process::Command}; +#[derive(Debug)] +pub struct ThunderClientManager; + +impl ThunderClientManager { + fn start( + client: ThunderClient, + request_tr: Receiver, + mut response_tr: Receiver, + thndr_endpoint_url: String, + ) { + if let Some(ref thunder_async_client) = client.thunder_async_client { + let mut tac = thunder_async_client.clone(); + tokio::spawn(async move { + tac.start(&thndr_endpoint_url, request_tr).await; + }); + } + + /*thunder async response will get here */ + tokio::spawn(async move { + while let Some(response) = response_tr.recv().await { + if let Some(id) = response.get_id() { + if let Some(thunder_async_callbacks) = client.clone().thunder_async_callbacks { + let mut callbacks = thunder_async_callbacks.write().unwrap(); + if let Some(Some(callback)) = callbacks.remove(&id) { + if let Some(resp) = response.get_device_resp_msg(None) { + oneshot_send_and_log(callback, resp, "ThunderResponse"); + }; + } + } + } else if let Some(event_name) = response.get_method() { + if let Some(broker_subs) = client.clone().thunder_async_subscriptions { + let subs = { + let mut br_subs = broker_subs.write().unwrap(); + br_subs.get_mut(&event_name).cloned() + }; + + if let Some(dev_resp_sub) = subs { + //let subc = subs; + for s in &dev_resp_sub.handlers { + if let Some(resp_msg) = + response.get_device_resp_msg(dev_resp_sub.clone().sub_id) + { + mpsc_send_and_log(s, resp_msg, "ThunderResponse").await; + } + } + } + } + } + } + }); + } +} pub struct ThunderClientBuilder; @@ -165,6 +212,10 @@ pub struct ThunderClient { pub id: Uuid, pub plugin_manager_tx: Option>, pub subscriptions: Option>>>, + pub thunder_async_client: Option, + pub thunder_async_subscriptions: Option>>, + pub thunder_async_callbacks: Option>>, + pub use_thunder_async: bool, } #[derive(Debug, Serialize, Deserialize)] @@ -192,20 +243,24 @@ impl ThunderClient { #[async_trait] impl DeviceOperator for ThunderClient { async fn call(&self, request: DeviceCallRequest) -> DeviceResponseMessage { - let (tx, rx) = oneshot::channel::(); - let message = ThunderMessage::ThunderCallMessage(ThunderCallMessage { - method: request.method, - params: request.params, - callback: tx, - }); - self.send_message(message).await; - - match rx.await { - Ok(response) => response, - Err(_) => DeviceResponseMessage { - message: Value::Null, - sub_id: None, - }, + if !self.use_thunder_async { + let (tx, rx) = oneshot::channel::(); + let message = ThunderMessage::ThunderCallMessage(ThunderCallMessage { + method: request.method, + params: request.params, + callback: tx, + }); + self.send_message(message).await; + + rx.await.unwrap() + } else { + let (tx, rx) = oneshot::channel::(); + let async_request = ThunderAsyncRequest::new(DeviceChannelRequest::Call(request)); + self.add_callback(&async_request, tx); + if let Some(async_client) = &self.thunder_async_client { + async_client.send(async_request).await; + } + rx.await.unwrap() } } @@ -214,32 +269,56 @@ impl DeviceOperator for ThunderClient { request: DeviceSubscribeRequest, handler: mpsc::Sender, ) -> Result { - let (tx, rx) = oneshot::channel::(); - let message = ThunderSubscribeMessage { - module: request.module, - event_name: request.event_name, - params: request.params, - handler, - callback: Some(tx), - sub_id: request.sub_id, - }; - let msg = ThunderMessage::ThunderSubscribeMessage(message); - self.send_message(msg).await; - let result = rx.await; - if let Err(ref e) = result { - error!("subscribe: e={:?}", e); + if !self.use_thunder_async { + let (tx, rx) = oneshot::channel::(); + let message = ThunderSubscribeMessage { + module: request.module, + event_name: request.event_name, + params: request.params, + handler, + callback: Some(tx), + sub_id: request.sub_id, + }; + let msg = ThunderMessage::ThunderSubscribeMessage(message); + self.send_message(msg).await; + let result = rx.await; + if let Err(ref e) = result { + error!("subscribe: e={:?}", e); + } + result + } else if let Some(subscribe_request) = + self.add_subscription_handler(&request, handler.clone()) + { + let (tx, rx) = oneshot::channel::(); + self.add_callback(&subscribe_request, tx); + if let Some(async_client) = &self.thunder_async_client { + async_client.send(subscribe_request).await; + } + let result = rx.await; + if let Err(ref e) = result { + error!("subscribe: e={:?}", e); + } + result + } else { + Ok(DeviceResponseMessage { + message: Value::Null, + sub_id: None, + }) } - result } async fn unsubscribe(&self, request: DeviceUnsubscribeRequest) { - let message = ThunderUnsubscribeMessage { - module: request.module, - event_name: request.event_name, - subscription_id: None, - }; - let msg = ThunderMessage::ThunderUnsubscribeMessage(message); - self.send_message(msg).await; + if !self.use_thunder_async { + let message = ThunderUnsubscribeMessage { + module: request.module, + event_name: request.event_name, + subscription_id: None, + }; + let msg = ThunderMessage::ThunderUnsubscribeMessage(message); + self.send_message(msg).await; + } else { + // unsubscribe() deprecate + } } } @@ -481,6 +560,58 @@ impl ThunderClient { } None } + + fn add_callback( + &self, + request: &ThunderAsyncRequest, + dev_resp_callback: Sender, + ) { + let mut callbacks = self + .thunder_async_callbacks + .as_ref() + .unwrap() + .write() + .unwrap(); + callbacks.insert(request.id, Some(dev_resp_callback)); + } + + // if already subscribed updated handlers + fn add_subscription_handler( + &self, + request: &DeviceSubscribeRequest, + handler: MpscSender, + ) -> Option { + let mut thunder_async_subscriptions = self + .thunder_async_subscriptions + .as_ref() + .unwrap() + .write() + .unwrap(); + + // Create a key for the subscription based on the event name + let key = format!("client.events.{}", request.event_name); + + // Check if there are existing subscriptions for the given key + if let Some(subs) = thunder_async_subscriptions.get_mut(&key) { + // If a subscription exists, add the handler to the list of handlers + subs.handlers.push(handler); + None + } else { + // If no subscription exists, create a new async request for subscription + let async_request = + ThunderAsyncRequest::new(DeviceChannelRequest::Subscribe(request.clone())); + + // Create a new DeviceResponseSubscription with the handler + let dev_resp_sub = DeviceResponseSubscription { + sub_id: request.clone().sub_id, + handlers: vec![handler], + }; + + // Insert the new subscription into the thunder_async_subscriptions map + thunder_async_subscriptions.insert(key, dev_resp_sub); + Some(async_request) + } + } } impl ThunderClientBuilder { @@ -497,69 +628,23 @@ impl ThunderClientBuilder { } None } - async fn create_client( - url: Url, - thunder_connection_state: Arc, - ) -> Result { - // Ensure that only one connection attempt is made at a time - { - let mut is_connecting = thunder_connection_state.conn_status_mutex.lock().await; - // check if we are already reconnecting - if *is_connecting { - drop(is_connecting); - // wait for the connection to be ready - thunder_connection_state.conn_status_notify.notified().await; - } else { - //Mark the connection as reconnecting - *is_connecting = true; - } - } // Lock is released here - - let mut client: Result; - let mut delay_duration = tokio::time::Duration::from_millis(50); - loop { - // get the token from the environment anew each time - let url_with_token = if let Ok(token) = env::var("THUNDER_TOKEN") { - Url::parse_with_params(url.as_str(), &[("token", token)]).unwrap() - } else { - url.clone() - }; - client = WsClientBuilder::default() - .build(url_with_token.to_string()) - .await; - if client.is_err() { - error!( - "Thunder Websocket is not available. Attempt to connect to thunder, retrying" - ); - sleep(delay_duration).await; - if delay_duration < tokio::time::Duration::from_secs(3) { - delay_duration *= 2; - } - continue; - } - //break from the loop after signalling that we are no longer reconnecting - let mut is_connecting = thunder_connection_state.conn_status_mutex.lock().await; - *is_connecting = false; - thunder_connection_state.conn_status_notify.notify_waiters(); - break; - } - client - } - pub async fn get_client( + async fn start_thunderpool_client( url: Url, plugin_manager_tx: Option>, pool_tx: Option>, - thunder_connection_state: Arc, + thunder_connection_state: Option>, existing_client: Option, ) -> Result { - let id = Uuid::new_v4(); + let uid = Uuid::new_v4(); + info!("initiating thunder connection URL:{} ", url); - info!("initiating thunder connection {}", url); let subscriptions = Arc::new(Mutex::new(HashMap::::default())); let (s, mut r) = mpsc::channel::(32); let pmtx_c = plugin_manager_tx.clone(); - let client = Self::create_client(url, thunder_connection_state.clone()).await; + let client = + Self::create_client(url.clone(), thunder_connection_state.clone().unwrap()).await; + // add error handling here if client.is_err() { error!("Unable to connect to thunder: {client:?}"); @@ -574,17 +659,17 @@ impl ThunderClientBuilder { if let Some(ptx) = pool_tx { warn!( "Client {} became disconnected, removing from pool message {:?}", - id, message + uid, message ); // Remove the client and then try the message again with a new client - let pool_msg = ThunderPoolCommand::ResetThunderClient(id); + let pool_msg = ThunderPoolCommand::ResetThunderClient(uid); mpsc_send_and_log(&ptx, pool_msg, "ResetThunderClient").await; let pool_msg = ThunderPoolCommand::ThunderMessage(message); mpsc_send_and_log(&ptx, pool_msg, "RetryThunderMessage").await; return; } } - info!("Client {} sending thunder message {:?}", id, message); + info!("Client {} sending thunder message {:?}", uid, message); match message { ThunderMessage::ThunderCallMessage(thunder_message) => { ThunderClient::call(&client, thunder_message, plugin_manager_tx.clone()) @@ -592,7 +677,7 @@ impl ThunderClientBuilder { } ThunderMessage::ThunderSubscribeMessage(thunder_message) => { ThunderClient::subscribe( - id, + uid, &client, &subscriptions_c, thunder_message, @@ -657,12 +742,111 @@ impl ThunderClientBuilder { Ok(ThunderClient { sender: Some(s), pooled_sender: None, - id, + id: uid, plugin_manager_tx: pmtx_c, subscriptions: Some(subscriptions), + thunder_async_client: None, + thunder_async_subscriptions: None, + thunder_async_callbacks: None, + use_thunder_async: false, }) } + async fn create_client( + url: Url, + thunder_connection_state: Arc, + ) -> Result { + // Ensure that only one connection attempt is made at a time + { + let mut is_connecting = thunder_connection_state.conn_status_mutex.lock().await; + // check if we are already reconnecting + if *is_connecting { + drop(is_connecting); + // wait for the connection to be ready + thunder_connection_state.conn_status_notify.notified().await; + } else { + //Mark the connection as reconnecting + *is_connecting = true; + } + } // Lock is released here + + let mut client: Result; + let mut delay_duration = tokio::time::Duration::from_millis(50); + loop { + // get the token from the environment anew each time + let url_with_token = if let Ok(token) = env::var("THUNDER_TOKEN") { + Url::parse_with_params(url.as_str(), &[("token", token)]).unwrap() + } else { + url.clone() + }; + client = WsClientBuilder::default() + .build(url_with_token.to_string()) + .await; + if client.is_err() { + error!( + "Thunder Websocket is not available. Attempt to connect to thunder, retrying" + ); + sleep(delay_duration).await; + if delay_duration < tokio::time::Duration::from_secs(3) { + delay_duration *= 2; + } + continue; + } + //break from the loop after signalling that we are no longer reconnecting + let mut is_connecting = thunder_connection_state.conn_status_mutex.lock().await; + *is_connecting = false; + thunder_connection_state.conn_status_notify.notify_waiters(); + break; + } + client + } + + pub async fn start_thunder_client( + url: Url, + plugin_manager_tx: Option>, + pool_tx: Option>, + thunder_connection_state: Option>, + existing_client: Option, + use_thunderasync_client: bool, + ) -> Result { + if !use_thunderasync_client { + Self::start_thunderpool_client( + url, + plugin_manager_tx, + pool_tx, + thunder_connection_state, + existing_client, + ) + .await + } else { + let (resp_tx, resp_rx) = mpsc::channel(10); + let callback = AsyncCallback { sender: resp_tx }; + let (broker_tx, broker_rx) = mpsc::channel(10); + let broker_sender = AsyncSender { sender: broker_tx }; + let client = ThunderAsyncClient::new(callback, broker_sender); + + let thunder_client = ThunderClient { + sender: None, + pooled_sender: None, + id: Uuid::new_v4(), + plugin_manager_tx: None, + subscriptions: None, + thunder_async_client: Some(client), + thunder_async_subscriptions: Some(Arc::new(RwLock::new(HashMap::new()))), + thunder_async_callbacks: Some(Arc::new(RwLock::new(HashMap::new()))), + use_thunder_async: true, + }; + + ThunderClientManager::start( + thunder_client.clone(), + broker_rx, + resp_rx, + url.to_string(), + ); + Ok(thunder_client) + } + } + #[cfg(test)] pub fn mock(sender: MpscSender) -> ThunderClient { ThunderClient { @@ -671,6 +855,10 @@ impl ThunderClientBuilder { id: Uuid::new_v4(), plugin_manager_tx: None, subscriptions: None, + thunder_async_client: None, + thunder_async_subscriptions: None, + thunder_async_callbacks: None, + use_thunder_async: false, } } } diff --git a/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs b/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs index 6cb59c19a..3532b445b 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs @@ -20,9 +20,11 @@ use std::sync::{ Arc, }; -use crate::{client::thunder_client::ThunderClientBuilder, thunder_state::ThunderConnectionState}; +use crate::{ + client::{device_operator::DeviceResponseMessage, thunder_client::ThunderClientBuilder}, + thunder_state::ThunderConnectionState, +}; use ripple_sdk::{ - api::device::device_operator::DeviceResponseMessage, log::{debug, error}, tokio::sync::{mpsc, oneshot}, utils::channel_utils::oneshot_send_and_log, @@ -57,7 +59,7 @@ impl ThunderClientPool { pub async fn start( url: Url, plugin_manager_tx: Option>, - thunder_connection_state: Arc, + thunder_connection_state: Option>, size: u32, ) -> Result { debug!("Starting a Thunder connection pool of size {}", size); @@ -65,12 +67,13 @@ impl ThunderClientPool { let thunder_connection_state = thunder_connection_state.clone(); let mut clients = Vec::default(); for _ in 0..size { - let client = ThunderClientBuilder::get_client( + let client = ThunderClientBuilder::start_thunder_client( url.clone(), plugin_manager_tx.clone(), Some(s.clone()), thunder_connection_state.clone(), None, + false, ) .await; if let Ok(c) = client { @@ -130,12 +133,13 @@ impl ThunderClientPool { let mut itr = pool.clients.iter(); let i = itr.position(|x| x.client.id == client_id); if let Some(index) = i { - let client = ThunderClientBuilder::get_client( + let client = ThunderClientBuilder::start_thunder_client( url.clone(), plugin_manager_tx.clone(), Some(sender_for_thread.clone()), thunder_connection_state.clone(), pool.clients.get(index).map(|x| x.client.clone()), + false, ) .await; if let Ok(client) = client { @@ -159,6 +163,10 @@ impl ThunderClientPool { id: Uuid::new_v4(), plugin_manager_tx: pmtx_c, subscriptions: None, + thunder_async_client: None, + thunder_async_subscriptions: None, + thunder_async_callbacks: None, + use_thunder_async: false, }) } @@ -184,19 +192,19 @@ impl ThunderClientPool { mod tests { use super::*; use crate::{ - client::plugin_manager::{PluginActivatedResult, PluginManagerCommand}, + client::{ + device_operator::{ + DeviceCallRequest, DeviceOperator, DeviceSubscribeRequest, DeviceUnsubscribeRequest, + }, + plugin_manager::{PluginActivatedResult, PluginManagerCommand}, + }, tests::thunder_client_pool_test_utility::{ CustomMethodHandler, MethodHandler, MockWebSocketServer, }, }; - use ripple_sdk::api::device::device_operator::DeviceUnsubscribeRequest; use ripple_sdk::{ - api::device::device_operator::DeviceSubscribeRequest, - utils::channel_utils::oneshot_send_and_log, - }; - use ripple_sdk::{ - api::device::device_operator::{DeviceCallRequest, DeviceOperator}, tokio::time::{sleep, Duration}, + utils::channel_utils::oneshot_send_and_log, }; use url::Url; @@ -256,9 +264,13 @@ mod tests { // Test cases // 1. create a client pool of size 4 - let client = - ThunderClientPool::start(url, Some(tx), Arc::new(ThunderConnectionState::new()), 4) - .await; + let client = ThunderClientPool::start( + url, + Some(tx), + Some(Arc::new(ThunderConnectionState::new())), + 4, + ) + .await; assert!(client.is_ok()); let client = client.unwrap(); diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index 5d5b0588a..fcadbfa85 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -27,7 +27,6 @@ use ripple_sdk::{ context::RippleContextUpdateRequest, device::{ device_events::{DeviceEvent, DeviceEventCallback}, - device_operator::DeviceSubscribeRequest, device_request::{ AudioProfile, InternetConnectionStatus, NetworkResponse, PowerState, SystemPowerState, VoiceGuidanceState, @@ -41,7 +40,10 @@ use ripple_sdk::{ }; use serde::{Deserialize, Serialize}; -use crate::{thunder_state::ThunderState, utils::get_audio_profile_from_value}; +use crate::{ + client::device_operator::DeviceSubscribeRequest, thunder_state::ThunderState, + utils::get_audio_profile_from_value, +}; #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/device/thunder_ripple_sdk/src/lib.rs b/device/thunder_ripple_sdk/src/lib.rs index 33f41e5ab..c40a434c8 100644 --- a/device/thunder_ripple_sdk/src/lib.rs +++ b/device/thunder_ripple_sdk/src/lib.rs @@ -16,8 +16,11 @@ // pub mod client { + pub mod device_operator; pub mod jsonrpc_method_locator; pub mod plugin_manager; + pub mod thunder_async_client; + pub mod thunder_async_client_plugins_status_mgr; pub mod thunder_client; pub mod thunder_client_pool; pub mod thunder_plugin; diff --git a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs index 0bf919654..afc92582c 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs @@ -17,11 +17,7 @@ use ripple_sdk::{ api::{ - device::device_operator::{ - DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, - }, - firebolt::fb_metrics::BehavioralMetricsEvent, - observability::analytics::AnalyticsRequest, + firebolt::fb_metrics::BehavioralMetricsEvent, observability::analytics::AnalyticsRequest, }, async_trait::async_trait, extn::{ @@ -38,7 +34,15 @@ use ripple_sdk::{ utils::error::RippleError, }; -use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; +use crate::{ + client::{ + device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, + }, + thunder_plugin::ThunderPlugin, + }, + thunder_state::ThunderState, +}; #[derive(Debug)] pub struct ThunderAnalyticsProcessor { diff --git a/device/thunder_ripple_sdk/src/processors/thunder_browser.rs b/device/thunder_ripple_sdk/src/processors/thunder_browser.rs index eba89f38c..814cf76b3 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_browser.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_browser.rs @@ -16,11 +16,8 @@ // use ripple_sdk::{ - api::device::{ - device_browser::{ - BrowserDestroyParams, BrowserLaunchParams, BrowserNameRequestParams, BrowserRequest, - }, - device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, + api::device::device_browser::{ + BrowserDestroyParams, BrowserLaunchParams, BrowserNameRequestParams, BrowserRequest, }, async_trait::async_trait, extn::{ @@ -40,7 +37,13 @@ use ripple_sdk::{ }; use serde::{Deserialize, Serialize}; -use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; +use crate::{ + client::{ + device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, + thunder_plugin::ThunderPlugin, + }, + thunder_state::ThunderState, +}; #[derive(Debug)] pub struct ThunderBrowserRequestProcessor { diff --git a/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs b/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs index daad19614..220d8a53c 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs @@ -22,7 +22,11 @@ use std::{ }; use crate::{ - client::{thunder_client::ThunderClient, thunder_plugin::ThunderPlugin}, + client::{ + device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, + thunder_client::ThunderClient, + thunder_plugin::ThunderPlugin, + }, ripple_sdk::{ api::device::{device_info_request::DeviceCapabilities, device_request::AudioProfile}, chrono::NaiveDateTime, @@ -37,7 +41,6 @@ use crate::{ api::{ device::{ device_info_request::{DeviceInfoRequest, DeviceResponse}, - device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, device_request::{ HDCPStatus, HdcpProfile, HdrProfile, NetworkResponse, NetworkState, NetworkType, Resolution, @@ -1658,7 +1661,6 @@ pub mod tests { use ripple_sdk::{ api::device::{ device_info_request::{DeviceInfoRequest, DeviceResponse, PlatformBuildInfo}, - device_operator::DeviceResponseMessage, device_request::DeviceRequest, }, extn::{ @@ -1674,7 +1676,10 @@ pub mod tests { use serde::{Deserialize, Serialize}; use crate::{ - client::{thunder_client::ThunderCallMessage, thunder_plugin::ThunderPlugin}, + client::{ + device_operator::DeviceResponseMessage, thunder_client::ThunderCallMessage, + thunder_plugin::ThunderPlugin, + }, processors::thunder_device_info::ThunderDeviceInfoRequestProcessor, tests::mock_thunder_controller::{CustomHandler, MockThunderController, ThunderHandlerFn}, }; diff --git a/device/thunder_ripple_sdk/src/processors/thunder_package_manager.rs b/device/thunder_ripple_sdk/src/processors/thunder_package_manager.rs index a82307376..c1813a4ce 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_package_manager.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_package_manager.rs @@ -22,9 +22,14 @@ use std::{thread, time}; use crate::ripple_sdk::{self}; use crate::{ - client::thunder_plugin::ThunderPlugin, + client::{ + device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, + DeviceSubscribeRequest, + }, + thunder_plugin::ThunderPlugin, + }, ripple_sdk::{ - api::device::device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, async_trait::async_trait, extn::{ client::extn_client::ExtnClient, @@ -41,7 +46,6 @@ use crate::{ use base64::{engine::general_purpose::STANDARD as base64, Engine}; use ripple_sdk::api::app_catalog::{AppCatalogRequest, AppOperationComplete, AppsUpdate}; use ripple_sdk::api::device::device_apps::DeviceAppMetadata; -use ripple_sdk::api::device::device_operator::{DeviceResponseMessage, DeviceSubscribeRequest}; use ripple_sdk::api::firebolt::fb_capabilities::FireboltPermissions; use ripple_sdk::api::firebolt::fb_metrics::{Timer, TimerType}; use ripple_sdk::api::observability::metrics_util::{ @@ -1133,6 +1137,10 @@ pub mod tests { id: Uuid::new_v4(), plugin_manager_tx: None, subscriptions: None, + thunder_async_callbacks: None, + thunder_async_subscriptions: None, + thunder_async_client: None, + use_thunder_async: false, }; let mut sessions: HashMap = HashMap::new(); sessions.insert("asdf".to_string(), operation.clone()); @@ -1166,6 +1174,10 @@ pub mod tests { id: Uuid::new_v4(), plugin_manager_tx: None, subscriptions: None, + thunder_async_callbacks: None, + thunder_async_subscriptions: None, + thunder_async_client: None, + use_thunder_async: false, }; let mut sessions: HashMap = HashMap::new(); sessions.insert("asdf".to_string(), operation.clone()); diff --git a/device/thunder_ripple_sdk/src/processors/thunder_persistent_store.rs b/device/thunder_ripple_sdk/src/processors/thunder_persistent_store.rs index cf0b1e3a8..532b1ee6a 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_persistent_store.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_persistent_store.rs @@ -16,14 +16,13 @@ // use crate::{ - client::thunder_plugin::ThunderPlugin, + client::{ + device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, + thunder_plugin::ThunderPlugin, + }, ripple_sdk::{ - api::device::{ - device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, - device_peristence::{ - DeleteStorageProperty, DevicePersistenceRequest, GetStorageProperty, - SetStorageProperty, - }, + api::device::device_peristence::{ + DeleteStorageProperty, DevicePersistenceRequest, GetStorageProperty, SetStorageProperty, }, async_trait::async_trait, extn::{ diff --git a/device/thunder_ripple_sdk/src/processors/thunder_remote.rs b/device/thunder_ripple_sdk/src/processors/thunder_remote.rs index d897c341e..26d53e226 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_remote.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_remote.rs @@ -17,22 +17,20 @@ use crate::{ client::{ + device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, + DeviceSubscribeRequest, DeviceUnsubscribeRequest, + }, thunder_client::ThunderClient, thunder_plugin::ThunderPlugin::{self, RemoteControl}, }, ripple_sdk::{ api::{ accessory::RemoteAccessoryResponse, - device::{ - device_accessory::{ - AccessoryDeviceListResponse, AccessoryDeviceResponse, AccessoryListRequest, - AccessoryPairRequest, AccessoryProtocol, AccessoryProtocolListType, - AccessoryType, RemoteAccessoryRequest, - }, - device_operator::{ - DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, - DeviceSubscribeRequest, DeviceUnsubscribeRequest, - }, + device::device_accessory::{ + AccessoryDeviceListResponse, AccessoryDeviceResponse, AccessoryListRequest, + AccessoryPairRequest, AccessoryProtocol, AccessoryProtocolListType, AccessoryType, + RemoteAccessoryRequest, }, }, async_trait::async_trait, diff --git a/device/thunder_ripple_sdk/src/processors/thunder_rfc.rs b/device/thunder_ripple_sdk/src/processors/thunder_rfc.rs index 48f9293d8..2d67d1a52 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_rfc.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_rfc.rs @@ -18,10 +18,7 @@ use std::collections::HashMap; use ripple_sdk::{ - api::{ - config::RfcRequest, - device::device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, - }, + api::config::RfcRequest, async_trait::async_trait, extn::{ client::{ @@ -39,7 +36,13 @@ use ripple_sdk::{ }; use serde::{Deserialize, Serialize}; -use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; +use crate::{ + client::{ + device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, + thunder_plugin::ThunderPlugin, + }, + thunder_state::ThunderState, +}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ThunderRFCResponse { diff --git a/device/thunder_ripple_sdk/src/processors/thunder_telemetry.rs b/device/thunder_ripple_sdk/src/processors/thunder_telemetry.rs index cce5c389f..dda24e5c6 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_telemetry.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_telemetry.rs @@ -16,10 +16,7 @@ // use ripple_sdk::{ - api::{ - device::device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, - firebolt::fb_telemetry::TelemetryPayload, - }, + api::firebolt::fb_telemetry::TelemetryPayload, async_trait::async_trait, extn::{ client::extn_processor::{ @@ -34,7 +31,13 @@ use ripple_sdk::{ }; use serde::{Deserialize, Serialize}; -use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; +use crate::{ + client::{ + device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, + thunder_plugin::ThunderPlugin, + }, + thunder_state::ThunderState, +}; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] diff --git a/device/thunder_ripple_sdk/src/processors/thunder_wifi.rs b/device/thunder_ripple_sdk/src/processors/thunder_wifi.rs index 9036edd23..890293f95 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_wifi.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_wifi.rs @@ -16,15 +16,25 @@ // use crate::{ - client::thunder_plugin::ThunderPlugin, + client::thunder_plugin::ThunderPlugin::Wifi, + ripple_sdk::{ + self, + api::device::device_wifi::WifiRequest, + extn::extn_client_message::{ExtnPayload, ExtnPayloadProvider}, + }, +}; +use crate::{ + client::{ + device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, + DeviceSubscribeRequest, DeviceUnsubscribeRequest, + }, + thunder_plugin::ThunderPlugin, + }, ripple_sdk::{ api::{ - device::{ - device_operator::{ - DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, - DeviceSubscribeRequest, - }, - device_wifi::{AccessPoint, AccessPointList, AccessPointRequest, WifiSecurityMode}, + device::device_wifi::{ + AccessPoint, AccessPointList, AccessPointRequest, WifiSecurityMode, }, wifi::WifiResponse, }, @@ -42,14 +52,6 @@ use crate::{ }, thunder_state::ThunderState, }; -use crate::{ - client::thunder_plugin::ThunderPlugin::Wifi, - ripple_sdk::{ - self, - api::device::{device_operator::DeviceUnsubscribeRequest, device_wifi::WifiRequest}, - extn::extn_client_message::{ExtnPayload, ExtnPayloadProvider}, - }, -}; use serde::{Deserialize, Serialize}; use tokio::time::{self, timeout, Duration}; diff --git a/device/thunder_ripple_sdk/src/processors/thunder_window_manager.rs b/device/thunder_ripple_sdk/src/processors/thunder_window_manager.rs index 5178a6e9d..70047d2d0 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_window_manager.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_window_manager.rs @@ -16,13 +16,7 @@ // use crate::ripple_sdk::{ - api::{ - apps::Dimensions, - device::{ - device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, - device_window_manager::WindowManagerRequest, - }, - }, + api::{apps::Dimensions, device::device_window_manager::WindowManagerRequest}, async_trait::async_trait, extn::{ client::{ @@ -39,7 +33,13 @@ use crate::ripple_sdk::{ }; use serde::{Deserialize, Serialize}; -use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; +use crate::{ + client::{ + device_operator::{DeviceCallRequest, DeviceChannelParams, DeviceOperator}, + thunder_plugin::ThunderPlugin, + }, + thunder_state::ThunderState, +}; #[derive(Debug)] pub struct ThunderWindowManagerRequestProcessor { diff --git a/device/thunder_ripple_sdk/src/tests/contracts/thunder_controller_pacts.rs b/device/thunder_ripple_sdk/src/tests/contracts/thunder_controller_pacts.rs index e281a52b9..d4736d38a 100644 --- a/device/thunder_ripple_sdk/src/tests/contracts/thunder_controller_pacts.rs +++ b/device/thunder_ripple_sdk/src/tests/contracts/thunder_controller_pacts.rs @@ -1,6 +1,10 @@ use crate::{ client::{ - plugin_manager::ThunderActivatePluginParams, thunder_client::ThunderClient, + device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, + }, + plugin_manager::ThunderActivatePluginParams, + thunder_client::ThunderClient, thunder_client_pool::ThunderClientPool, }, ripple_sdk::{ @@ -9,9 +13,6 @@ use crate::{ }, thunder_state::ThunderConnectionState, }; -use ripple_sdk::api::device::device_operator::{ - DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, -}; use crate::mock_websocket_server; use pact_consumer::mock_server::StartMockServerAsync; @@ -20,9 +21,14 @@ use std::sync::Arc; use url::Url; async fn initialize_thunder_client(server_url: Url) -> ThunderClient { - ThunderClientPool::start(server_url, None, Arc::new(ThunderConnectionState::new()), 1) - .await - .unwrap() + ThunderClientPool::start( + server_url, + None, + Some(Arc::new(ThunderConnectionState::new())), + 1, + ) + .await + .unwrap() } async fn perform_device_call_request( diff --git a/device/thunder_ripple_sdk/src/tests/contracts/thunder_device_info_pacts.rs b/device/thunder_ripple_sdk/src/tests/contracts/thunder_device_info_pacts.rs index dfc00f050..d48d8c0a9 100644 --- a/device/thunder_ripple_sdk/src/tests/contracts/thunder_device_info_pacts.rs +++ b/device/thunder_ripple_sdk/src/tests/contracts/thunder_device_info_pacts.rs @@ -89,7 +89,7 @@ async fn test_device_get_info_mac_address() { // Creating thunder client with mock server url let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -158,7 +158,7 @@ async fn test_device_get_model() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -223,7 +223,7 @@ async fn test_device_get_interfaces_wifi() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -288,7 +288,7 @@ async fn test_device_get_interfaces_ethernet() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -342,7 +342,7 @@ async fn test_device_get_audio() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -395,7 +395,7 @@ async fn test_device_get_hdcp_supported() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -457,7 +457,7 @@ async fn test_device_get_hdcp_status() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -509,7 +509,7 @@ async fn test_device_get_hdr() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -559,7 +559,7 @@ async fn test_device_get_screen_resolution_without_port() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -607,7 +607,7 @@ async fn test_device_get_make() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -668,7 +668,7 @@ async fn test_device_get_video_resolution() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -778,7 +778,7 @@ async fn test_device_get_timezone() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -850,7 +850,7 @@ async fn test_device_get_available_timezone() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -906,7 +906,7 @@ async fn test_device_get_voice_guidance_enabled() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -984,7 +984,7 @@ async fn test_device_get_voice_guidance_speed() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -1042,7 +1042,7 @@ async fn test_device_set_timezone() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -1098,7 +1098,7 @@ async fn test_device_set_voice_guidance() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -1154,7 +1154,7 @@ async fn test_device_set_voice_guidance_speed() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -1211,7 +1211,7 @@ async fn test_device_get_internet() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); diff --git a/device/thunder_ripple_sdk/src/tests/contracts/thunder_package_manager_pacts.rs b/device/thunder_ripple_sdk/src/tests/contracts/thunder_package_manager_pacts.rs index 0d7e64f1a..aab82e50b 100644 --- a/device/thunder_ripple_sdk/src/tests/contracts/thunder_package_manager_pacts.rs +++ b/device/thunder_ripple_sdk/src/tests/contracts/thunder_package_manager_pacts.rs @@ -129,7 +129,7 @@ async fn test_install_app( let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -260,7 +260,7 @@ async fn test_uninstall_app( let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -346,7 +346,7 @@ async fn test_get_installed_apps() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -499,7 +499,7 @@ async fn test_init() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -634,7 +634,7 @@ async fn test_get_firebolt_permissions() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); diff --git a/device/thunder_ripple_sdk/src/tests/contracts/thunder_persistent_store_pacts.rs b/device/thunder_ripple_sdk/src/tests/contracts/thunder_persistent_store_pacts.rs index 6da52fadd..a4d8e7f64 100644 --- a/device/thunder_ripple_sdk/src/tests/contracts/thunder_persistent_store_pacts.rs +++ b/device/thunder_ripple_sdk/src/tests/contracts/thunder_persistent_store_pacts.rs @@ -131,7 +131,7 @@ async fn test_device_set_persistent_value(with_scope: bool) { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -218,7 +218,7 @@ async fn test_device_get_persistent_value(with_scope: bool) { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -303,7 +303,7 @@ async fn test_device_delete_persistent_value_by_key(with_scope: bool) { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); diff --git a/device/thunder_ripple_sdk/src/tests/contracts/thunder_remote_pacts.rs b/device/thunder_ripple_sdk/src/tests/contracts/thunder_remote_pacts.rs index 506305599..09c7c5533 100644 --- a/device/thunder_ripple_sdk/src/tests/contracts/thunder_remote_pacts.rs +++ b/device/thunder_ripple_sdk/src/tests/contracts/thunder_remote_pacts.rs @@ -91,7 +91,7 @@ async fn test_device_remote_start_pairing() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -180,7 +180,7 @@ async fn test_device_remote_network_status() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); diff --git a/device/thunder_ripple_sdk/src/tests/contracts/thunder_wifi_pacts.rs b/device/thunder_ripple_sdk/src/tests/contracts/thunder_wifi_pacts.rs index 7f51ee1ae..2fb8d895f 100644 --- a/device/thunder_ripple_sdk/src/tests/contracts/thunder_wifi_pacts.rs +++ b/device/thunder_ripple_sdk/src/tests/contracts/thunder_wifi_pacts.rs @@ -69,7 +69,7 @@ async fn test_device_scan_wifi() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); @@ -132,7 +132,7 @@ async fn test_device_connect_wifi() { let url = url::Url::parse(mock_server.path("/jsonrpc").as_str()).unwrap(); let thunder_client = - ThunderClientPool::start(url, None, Arc::new(ThunderConnectionState::new()), 1) + ThunderClientPool::start(url, None, Some(Arc::new(ThunderConnectionState::new())), 1) .await .unwrap(); diff --git a/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs b/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs index ec2183cc7..6284888bb 100644 --- a/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs +++ b/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use ripple_sdk::{ - api::device::device_operator::DeviceResponseMessage, async_channel::{unbounded, Receiver}, extn::{ffi::ffi_message::CExtnMessage, mock_extension_client::MockExtnClient}, serde_json, @@ -13,6 +12,7 @@ use serde_json::Value; use crate::{ client::{ + device_operator::DeviceResponseMessage, jsonrpc_method_locator::JsonRpcMethodLocator, plugin_manager::{PluginState, PluginStateChangeEvent, PluginStatus}, thunder_client::{ @@ -173,6 +173,10 @@ impl MockThunderController { id: Uuid::new_v4(), plugin_manager_tx: None, subscriptions: None, + thunder_async_client: None, + thunder_async_subscriptions: None, + thunder_async_callbacks: None, + use_thunder_async: false, }; let (s, r) = unbounded(); diff --git a/device/thunder_ripple_sdk/src/thunder_state.rs b/device/thunder_ripple_sdk/src/thunder_state.rs index 700b01d64..9efebc4f9 100644 --- a/device/thunder_ripple_sdk/src/thunder_state.rs +++ b/device/thunder_ripple_sdk/src/thunder_state.rs @@ -18,9 +18,6 @@ use std::sync::{Arc, RwLock}; use ripple_sdk::{ - api::device::device_operator::{ - DeviceOperator, DeviceResponseMessage, DeviceUnsubscribeRequest, - }, extn::{ client::extn_client::ExtnClient, extn_client_message::{ExtnMessage, ExtnPayloadProvider}, @@ -33,7 +30,11 @@ use ripple_sdk::{ use url::Url; use crate::{ - client::{plugin_manager::ThunderPluginBootParam, thunder_client::ThunderClient}, + client::{ + device_operator::{DeviceOperator, DeviceResponseMessage, DeviceUnsubscribeRequest}, + plugin_manager::ThunderPluginBootParam, + thunder_client::ThunderClient, + }, events::thunder_event_processor::{ThunderEventHandler, ThunderEventProcessor}, }; @@ -61,9 +62,9 @@ impl ThunderConnectionState { pub struct ThunderBootstrapStateWithConfig { pub extn_client: ExtnClient, pub url: Url, - pub pool_size: u32, - pub plugin_param: ThunderPluginBootParam, - pub thunder_connection_state: Arc, + pub pool_size: Option, + pub plugin_param: Option, + pub thunder_connection_state: Option>, } #[derive(Debug, Clone)] diff --git a/device/thunder_ripple_sdk/src/utils.rs b/device/thunder_ripple_sdk/src/utils.rs index a9d38a7ef..449068645 100644 --- a/device/thunder_ripple_sdk/src/utils.rs +++ b/device/thunder_ripple_sdk/src/utils.rs @@ -15,13 +15,14 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::atomic::{AtomicU64, Ordering}, +}; +use crate::client::device_operator::DeviceResponseMessage; use jsonrpsee::core::Error; -use ripple_sdk::{ - api::device::{device_operator::DeviceResponseMessage, device_request::AudioProfile}, - serde_json::Value, -}; +use ripple_sdk::{api::device::device_request::AudioProfile, serde_json::Value}; use serde::Deserialize; pub fn get_audio_profile_from_value(value: Value) -> HashMap { @@ -109,3 +110,10 @@ pub fn get_error_value(error: &Error) -> Value { } Value::Null } + +static ATOMIC_ID: AtomicU64 = AtomicU64::new(0); + +pub fn get_next_id() -> u64 { + ATOMIC_ID.fetch_add(1, Ordering::Relaxed); + ATOMIC_ID.load(Ordering::Relaxed) +}