diff --git a/crates/extensions/c8y_mapper_ext/Cargo.toml b/crates/extensions/c8y_mapper_ext/Cargo.toml index 0f2da12c1ef..f775b630e4f 100644 --- a/crates/extensions/c8y_mapper_ext/Cargo.toml +++ b/crates/extensions/c8y_mapper_ext/Cargo.toml @@ -56,6 +56,7 @@ tedge_http_ext = { workspace = true, features = ["test_helpers"] } tedge_mqtt_ext = { workspace = true, features = ["test-helpers"] } tedge_test_utils = { workspace = true } test-case = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } [lints] workspace = true diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 2ef59ff32f3..5accca6101c 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -93,6 +93,7 @@ use tedge_utils::file::FileError; use tedge_utils::size_threshold::SizeThreshold; use thiserror::Error; use tokio::time::Duration; +use tokio::time::Instant; use tracing::debug; use tracing::error; use tracing::info; @@ -186,7 +187,8 @@ pub struct CumulocityConverter { pub command_id: IdGenerator, // Keep active command IDs to avoid creation of multiple commands for an operation - pub active_commands: HashSet, + pub active_commands: HashMap>, + active_commands_last_cleared: Instant, supported_operations: SupportedOperations, pub operation_handler: OperationHandler, @@ -277,7 +279,8 @@ impl CumulocityConverter { mqtt_schema: mqtt_schema.clone(), entity_cache, command_id, - active_commands: HashSet::new(), + active_commands: HashMap::new(), + active_commands_last_cleared: Instant::now(), operation_handler, }) } @@ -645,7 +648,7 @@ impl CumulocityConverter { let device_xid = operation.external_source.external_id; let cmd_id = self.command_id.new_id_with_str(&operation.op_id); - if self.active_commands.contains(&cmd_id) { + if self.active_commands.contains_key(&cmd_id) { info!("{cmd_id} is already addressed"); return Ok(vec![]); } @@ -662,7 +665,10 @@ impl CumulocityConverter { &operation_message, ) .await; - let result = self.handle_c8y_operation_result(&result, Some(operation.op_id)); + let result = self.handle_c8y_operation_result(&result, Some(operation.op_id.clone())); + if !result.is_empty() { + self.active_commands.insert(cmd_id, Some(Instant::now())); + } output.extend(result); } @@ -761,7 +767,7 @@ impl CumulocityConverter { let cmd_id = self.command_id.new_id_with_str(&operation.op_id); let device_xid = operation.external_source.external_id; - if self.active_commands.contains(&cmd_id) { + if self.active_commands.contains_key(&cmd_id) { info!("{cmd_id} is already addressed"); return Ok(vec![]); } @@ -769,7 +775,7 @@ impl CumulocityConverter { let result = self .process_json_custom_operation( operation.op_id.clone(), - cmd_id, + cmd_id.clone(), device_xid, &operation.extras, message, @@ -777,6 +783,9 @@ impl CumulocityConverter { .await; let output = self.handle_c8y_operation_result(&result, Some(operation.op_id)); + if !output.is_empty() { + self.active_commands.insert(cmd_id, Some(Instant::now())); + } Ok(output) } @@ -1362,7 +1371,10 @@ impl CumulocityConverter { } Channel::Command { cmd_id, .. } if self.command_id.is_generator_of(cmd_id) => { - self.active_commands.insert(cmd_id.clone()); + // Keep track of operation if we've received it through a retain message + // If we've already got the operation in `active_commands`, set the insertion + // time to `None` to disable the time-based expiry + self.active_commands.insert(cmd_id.clone(), None); let entity = self.entity_cache.try_get(&source)?; let entity = operations::EntityTarget { @@ -1402,6 +1414,22 @@ impl CumulocityConverter { &mut self, message: &MqttMessage, ) -> Result, ConversionError> { + if self.active_commands_last_cleared.elapsed() > Duration::from_secs(3600) { + let mut to_remove = vec![]; + for (id, time) in &self.active_commands { + if let Some(time) = time { + // Expire tasks after 12 hours + if time.elapsed() > Duration::from_secs(3600 * 12) { + to_remove.push(id.to_owned()); + } + } + } + for id in to_remove { + self.active_commands.remove(&id); + } + self.active_commands_last_cleared = Instant::now(); + } + let messages = match &message.topic { topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => { self.alarm_converter.process_internal_alarm(message); @@ -1639,7 +1667,7 @@ pub fn get_local_child_devices_list(path: &Path) -> Result, Cumu #[cfg(test)] pub(crate) mod tests { - use super::CumulocityConverter; + use super::*; use crate::actor::IdDownloadRequest; use crate::actor::IdDownloadResult; use crate::actor::IdUploadRequest; @@ -1648,6 +1676,7 @@ pub(crate) mod tests { use crate::config::C8yMapperConfig; use crate::entity_cache::InvalidExternalIdError; use crate::supported_operations::operation::ResultFormat; + use crate::supported_operations::SupportedOperations; use crate::tests::spawn_dummy_c8y_http_proxy; use crate::Capabilities; use anyhow::Result; @@ -1659,6 +1688,7 @@ pub(crate) mod tests { use serde_json::json; use serde_json::Value; use std::str::FromStr; + use std::time::Duration; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::Builder; @@ -1684,6 +1714,7 @@ pub(crate) mod tests { use tedge_http_ext::HttpResult; use tedge_mqtt_ext::test_helpers::assert_messages_matching; use tedge_mqtt_ext::MqttMessage; + use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::Topic; use tedge_test_utils::fs::TempTedgeDir; use test_case::test_case; @@ -1691,7 +1722,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_sync_alarms() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let alarm_topic = "te/device/main///a/temperature_alarm"; let alarm_payload = r#"{ "severity": "critical", "text": "Temperature very high" }"#; @@ -1746,7 +1777,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_sync_child_alarms() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let alarm_topic = "te/device/external_sensor///a/temperature_alarm"; let alarm_payload = r#"{ "severity": "critical", "text": "Temperature very high" }"#; @@ -1804,7 +1835,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_child_device_registration() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_message = MqttMessage::new( &Topic::new_unchecked("te/device/child1//"), @@ -1836,7 +1867,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_with_child_id() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_message = MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/"), @@ -1876,7 +1907,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_with_nested_child_device() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/immediate_child//"), json!({ @@ -1928,7 +1959,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_with_nested_child_service() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/immediate_child//"), json!({ @@ -1994,7 +2025,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_for_child_device_service() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/child1/service/app1/m/m_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -2022,7 +2053,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_for_main_device_service() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/main/service/appm/m/m_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -2051,7 +2082,7 @@ pub(crate) mod tests { #[ignore = "FIXME: the registration is currently done even if the message is ill-formed"] async fn convert_first_measurement_invalid_then_valid_with_child_id() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/child1///m/"; let in_invalid_payload = r#"{"temp": invalid}"#; @@ -2092,7 +2123,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_two_measurement_messages_given_different_child_id() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; // First message from "child1" @@ -2133,7 +2164,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_with_main_id_with_measurement_type() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/main///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -2159,7 +2190,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_with_main_id_with_measurement_type_in_payload() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/main///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00","type":"type_in_payload"}"#; @@ -2185,7 +2216,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_with_child_id_with_measurement_type() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/child///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -2211,7 +2242,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_measurement_with_child_id_with_measurement_type_in_payload() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/child2///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00","type":"type_in_payload"}"#; @@ -2237,7 +2268,7 @@ pub(crate) mod tests { #[tokio::test] async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let alarm_topic = "te/device/main///a/temperature_alarm"; let big_alarm_text = create_packet(1024 * 20); @@ -2256,7 +2287,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_event_without_given_event_type() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let event_topic = "te/device/main///e/"; let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); @@ -2275,7 +2306,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_event_use_event_type_from_payload_to_c8y_smartrest() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let event_topic = "te/device/main///e/topic_event"; let event_payload = r#"{ "type": "payload event", "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); @@ -2294,7 +2325,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_event_use_event_type_from_payload_to_c8y_json() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let event_topic = "te/device/main///e/click_event"; let event_payload = r#"{ "type": "payload event", "text": "tick", "foo": "bar" }"#; let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); @@ -2318,7 +2349,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_event_with_known_fields_to_c8y_smartrest() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let event_topic = "te/device/main///e/click_event"; let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); @@ -2361,7 +2392,7 @@ pub(crate) mod tests { #[tokio::test] async fn convert_event_with_extra_fields_to_c8y_json() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let event_topic = "te/device/main///e/click_event"; let event_payload = r#"{ "text": "tick", "foo": "bar" }"#; let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); @@ -2385,7 +2416,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_convert_big_event() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, http_proxy) = create_c8y_converter(&tmp_dir); spawn_dummy_c8y_http_proxy(http_proxy); let event_topic = "te/device/main///e/click_event"; @@ -2400,7 +2431,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_convert_big_event_for_child_device() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, http_proxy) = create_c8y_converter(&tmp_dir); spawn_dummy_c8y_http_proxy(http_proxy); let event_topic = "te/device/child1///e/click_event"; @@ -2420,7 +2451,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_convert_big_measurement() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let measurement_topic = "te/device/main///m/"; let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json @@ -2440,7 +2471,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_convert_small_measurement() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let measurement_topic = "te/device/main///m/"; let big_measurement_payload = create_thin_edge_measurement(20); // Measurement payload size is 20 bytes @@ -2464,7 +2495,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_convert_big_measurement_for_child_device() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let measurement_topic = "te/device/child1///m/"; let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json @@ -2488,9 +2519,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_execute_operation_is_not_blocked() { let tmp_dir = TempTedgeDir::new(); - let (converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (converter, _http_proxy) = create_c8y_converter(&tmp_dir); - let now = std::time::Instant::now(); + let now = Instant::now(); converter .execute_operation( ShellScript::from_str("sleep 5").unwrap(), @@ -2521,10 +2552,237 @@ pub(crate) mod tests { assert_eq!(now.elapsed().as_secs(), 0); } + #[tokio::test] + async fn operations_are_deduplicated() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); + + let operation = MqttMessage::new(&Topic::new_unchecked("c8y/devicecontrol/notifications"), json!( + {"id":"16574089","status":"PENDING","c8y_Restart":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} + ).to_string()); + assert_eq!( + converter.try_convert(&operation).await.unwrap(), + vec![MqttMessage { + topic: Topic { + name: "te/device/main///cmd/restart/c8y-mapper-16574089".into(), + }, + payload: json!({"status":"init"}).to_string().into(), + qos: QoS::AtLeastOnce, + retain: true, + },] + ); + assert_eq!(converter.try_convert(&operation).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn custom_operations_are_deduplicated() { + let tmp_dir = TempTedgeDir::new(); + let custom_op = r#"exec.topic = "my/custom/topic" + exec.on_fragment = "my_op" + exec.command = "/etc/tedge/operations/command ${.payload.my_op.text}""#; + let f = tmp_dir.dir("operations").dir("c8y").file("my_op"); + f.with_raw_content(custom_op); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); + + let operation = MqttMessage::new(&Topic::new_unchecked("my/custom/topic"), json!( + {"id":"16574089","status":"PENDING","my_op":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} + ).to_string()); + assert!( + matches!( + dbg!(converter.try_convert(&operation).await.unwrap().as_slice()), + [MqttMessage { topic, .. }, ..] if topic.to_string() == "c8y/s/us" + ), + "Initial operation delivery produces outgoing message" + ); + assert_eq!( + converter.try_convert(&operation).await.unwrap(), + vec![], + "Operation redelivery is ignored by converter" + ); + } + + #[tokio::test] + async fn custom_operations_are_not_deduplicated_before_registration() { + // We could potentially receive a custom operation before we have + // registered a handler for it. If we then register a suitable handler + // and the operation is redelivered to the mapper, this should be + // processed + let tmp_dir = TempTedgeDir::new(); + let custom_op = r#"exec.topic = "my/custom/topic" + exec.on_fragment = "my_op" + exec.command = "/etc/tedge/operations/command ${.payload.my_op.text}""#; + let f = tmp_dir.dir("operations").dir("c8y").file("my_op"); + f.with_raw_content(custom_op); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); + let before_registration = SupportedOperations { + device_id: converter.supported_operations.device_id.clone(), + base_ops_dir: converter.supported_operations.base_ops_dir.clone(), + operations_by_xid: <_>::default(), + }; + let after_registration = + std::mem::replace(&mut converter.supported_operations, before_registration); + + let operation = MqttMessage::new(&Topic::new_unchecked("my/custom/topic"), json!( + {"id":"16574089","status":"PENDING","my_op":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} + ).to_string()); + assert_eq!( + converter.try_convert(&operation).await.unwrap(), + vec![], + "Operation is ignored before the operation is registered" + ); + + converter.supported_operations = after_registration; + + assert!( + matches!( + dbg!(converter.try_convert(&operation).await.unwrap().as_slice()), + [MqttMessage { topic, .. }, ..] if topic.to_string() == "c8y/s/us" + ), + "First delivery after registration produces outgoing message" + ); + } + + #[tokio::test] + async fn te_topic_operations_do_not_have_time_based_expiry() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); + + tokio::time::pause(); + + let operation = MqttMessage::new(&Topic::new_unchecked("c8y/devicecontrol/notifications"), json!( + {"id":"16574089","status":"PENDING","c8y_Restart":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} + ).to_string()); + let expected_message = MqttMessage { + topic: Topic { + name: "te/device/main///cmd/restart/c8y-mapper-16574089".into(), + }, + payload: json!({"status":"init"}).to_string().into(), + qos: QoS::AtLeastOnce, + retain: true, + }; + + assert_eq!( + converter.try_convert(&operation).await.unwrap().first(), + Some(&expected_message), + "First delivery after registration produces outgoing message" + ); + assert_eq!(converter.active_commands.len(), 1); + + // Converter should disable time-based expiry when the `te` topic + // message is processed + converter.try_convert(&expected_message).await.unwrap(); + + tokio::time::advance(Duration::from_secs(24 * 3600)).await; + + let random_message = MqttMessage::new(&Topic::new_unchecked("c8y/s/ds"), "510,test"); + + // Trigger the converter since it performs cache eviction only when it's converting c8y messages + converter.try_convert(&random_message).await.unwrap(); + assert_eq!(converter.active_commands.len(), 1); + } + + #[tokio::test] + async fn custom_operation_ids_are_not_cached_indefinitely() { + let tmp_dir = TempTedgeDir::new(); + let custom_op = r#"exec.topic = "my/custom/topic" + exec.on_fragment = "my_op" + exec.command = "/etc/tedge/operations/command ${.payload.my_op.text}""#; + let f = tmp_dir.dir("operations").dir("c8y").file("my_op"); + f.with_raw_content(custom_op); + + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); + + tokio::time::pause(); + + let operation = MqttMessage::new(&Topic::new_unchecked("my/custom/topic"), json!( + {"id":"16574089","status":"PENDING","my_op":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} + ).to_string()); + + assert!( + matches!( + dbg!(converter.try_convert(&operation).await.unwrap().as_slice()), + [MqttMessage { topic, .. }, ..] if topic.to_string() == "c8y/s/us" + ), + "First delivery after registration produces outgoing message" + ); + + assert_eq!(converter.active_commands.len(), 1); + tokio::time::advance(Duration::from_secs(24 * 3600)).await; + + let random_message = MqttMessage::new(&Topic::new_unchecked("c8y/s/ds"), "510,test"); + + // Trigger the converter since it performs cache eviction only when it's converting c8y messages + converter.try_convert(&random_message).await.unwrap(); + assert_eq!(converter.active_commands.len(), 0); + } + + #[tokio::test] + async fn active_commands_is_populated_with_existing_commands_from_retain_messages() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); + + let existing_pending_operation = MqttMessage { + topic: Topic { + name: "te/device/main///cmd/restart/c8y-mapper-16574089".into(), + }, + payload: json!({"status":"init"}).to_string().into(), + qos: QoS::AtLeastOnce, + retain: true, + }; + converter + .try_convert(&existing_pending_operation) + .await + .unwrap(); + + let operation = MqttMessage::new(&Topic::new_unchecked("c8y/devicecontrol/notifications"), json!( + {"id":"16574089","status":"PENDING","c8y_Restart":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} + ).to_string()); + + assert_eq!( + converter.try_convert(&operation).await.unwrap().as_slice(), + [], + "Existing tedge operation should trigger de-duplication" + ); + } + + #[tokio::test] + async fn custom_operation_ids_are_not_evicted_from_cache_prematurely() { + let tmp_dir = TempTedgeDir::new(); + let custom_op = r#"exec.topic = "my/custom/topic" + exec.on_fragment = "my_op" + exec.command = "/etc/tedge/operations/command ${.payload.my_op.text}""#; + let f = tmp_dir.dir("operations").dir("c8y").file("my_op"); + f.with_raw_content(custom_op); + + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); + + tokio::time::pause(); + + let operation = MqttMessage::new(&Topic::new_unchecked("my/custom/topic"), json!( + {"id":"16574089","status":"PENDING","my_op":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} + ).to_string()); + + assert!( + matches!( + dbg!(converter.try_convert(&operation).await.unwrap().as_slice()), + [MqttMessage { topic, .. }, ..] if topic.to_string() == "c8y/s/us" + ), + "First delivery after registration produces outgoing message" + ); + + assert_eq!(converter.active_commands.len(), 1); + // After a minute, the operation id should still exist + tokio::time::advance(Duration::from_secs(60)).await; + + let random_message = MqttMessage::new(&Topic::new_unchecked("c8y/s/ds"), "510,test"); + converter.try_convert(&random_message).await.unwrap(); + assert_eq!(converter.active_commands.len(), 1); + } + #[tokio::test] async fn handle_operations_for_child_device() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); // The child has first to declare its capabilities let mqtt_schema = MqttSchema::default(); @@ -2626,7 +2884,7 @@ pub(crate) mod tests { #[tokio::test] async fn default_device_name_from_external_id(external_id: &str, device_name: &str) { let tmp_dir = TempTedgeDir::new(); - let (converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (converter, _http_proxy) = create_c8y_converter(&tmp_dir); assert_eq!( converter.default_device_name_from_external_id(&external_id.into()), @@ -2637,7 +2895,7 @@ pub(crate) mod tests { #[tokio::test] async fn duplicate_registration_messages_not_mapped_2311() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let in_topic = "te/device/main/service/my_measurement_service"; register_source_entities(in_topic, &mut converter).await; @@ -2699,7 +2957,7 @@ pub(crate) mod tests { #[tokio::test] async fn operations_not_supported_for_services(op_type: &str) { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); // Register main device service let _ = converter @@ -3077,7 +3335,7 @@ pub(crate) mod tests { messages } - pub(crate) async fn create_c8y_converter( + pub(crate) fn create_c8y_converter( tmp_dir: &TempTedgeDir, ) -> (CumulocityConverter, FakeServerBox) { let config = c8y_converter_config(tmp_dir); diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs index 1c79d8ac2ec..dfe7aac7077 100644 --- a/crates/extensions/c8y_mapper_ext/src/inventory.rs +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -198,7 +198,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_json_object() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_topic = "te/device/main///twin/device_os"; let twin_payload = json!({ @@ -227,7 +227,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_string_value() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/foo"), @@ -250,7 +250,7 @@ mod tests { #[tokio::test] async fn duplicate_twin_name_and_type_updates_ignored_after_registration() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); // Register a child with a name and type upfront let reg_message = &MqttMessage::new( @@ -318,7 +318,7 @@ mod tests { #[tokio::test] async fn unquoted_string_value_invalid() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/foo"), @@ -331,7 +331,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_numeric_value() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/foo"), @@ -354,7 +354,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_boolean_value() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/enabled"), @@ -377,7 +377,7 @@ mod tests { #[tokio::test] async fn clear_inventory_fragment() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); // Register a twin data fragment first let twin_message = MqttMessage::new( @@ -402,7 +402,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_ignores_duplicate_fragment() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_topic = "te/device/main///twin/device_os"; let twin_payload = json!({ @@ -451,7 +451,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_with_duplicate_fragment_after_clearing_it() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_topic = "te/device/main///twin/device_os"; let twin_payload = json!({ @@ -486,7 +486,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_with_firmware_update_for_main_device() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/firmware"), @@ -507,7 +507,7 @@ mod tests { #[tokio::test] async fn convert_entity_twin_data_with_firmware_update_for_child_device() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/child1///twin/firmware"), @@ -535,7 +535,7 @@ mod tests { #[tokio::test] async fn convert_service_type() { let tmp_dir = TempTedgeDir::new(); - let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir); let reg_message = &MqttMessage::new( &Topic::new_unchecked("te/device/main/service/service01"),