diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b90ed45..f4e34a6 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -52,6 +52,9 @@ jobs: - name: Run Tests with Coverage run: cargo tarpaulin --out Xml + - name: Run Clippy + run: cargo clippy --message-format=json &> clippy.json + - name: Update Sonar uses: sonarsource/sonarqube-scan-action@v3 if: github.ref == 'refs/heads/main' diff --git a/.gitignore b/.gitignore index f4afe7d..fc79ecc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ /.idea /config.yml /.envrc +/cobertura.xml +/clippy.json diff --git a/Cargo.lock b/Cargo.lock index aaaec2a..711eb22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,6 +698,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "env_filter" version = "0.1.2" @@ -815,6 +821,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "futures" version = "0.3.31" @@ -1337,16 +1349,44 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c28b3fb6d753d28c20e826cd46ee611fda1cf3cde03a443a974043247c065a" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "341014e7f530314e9a1fdbc7400b244efea7122662c96bfa248c31da5bfb2020" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "mqtt-gateway" version = "0.2.0" dependencies = [ "anyhow", "async-std", + "async-trait", "chrono", "env_logger", "futures", "influxdb", + "mockall", "paho-mqtt", "postgres", "regex", @@ -1703,6 +1743,32 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "predicates" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9086cc7640c29a356d1a29fd134380bee9d8f79a17410aa76e7ad295f42c97" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" + +[[package]] +name = "predicates-tree" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro-hack" version = "0.5.20+deprecated" @@ -2229,6 +2295,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.64" diff --git a/Cargo.toml b/Cargo.toml index 215bf66..5cfdf95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,7 @@ postgres = { version = "0.19.9" , features = ["with-chrono-0_4"] } serde_yaml = { version = "0.9.34+deprecated", features = [] } anyhow = "1.0.89" regex = "1.11.0" +async-trait = "0.1.83" + +[dev-dependencies] +mockall = "0.13.0" diff --git a/sonar-project.properties b/sonar-project.properties index 561a65b..bb09199 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -1,2 +1,4 @@ sonar.projectKey=mqtt-gateway sonar.projectName=mqtt-gateway +community.rust.cpd.ignoretests=true +community.rust.clippy.reportPaths=./clippy.json diff --git a/src/data/klimalogger/mod.rs b/src/data/klimalogger/mod.rs index 15831c7..1ee54a5 100644 --- a/src/data/klimalogger/mod.rs +++ b/src/data/klimalogger/mod.rs @@ -1,13 +1,19 @@ use std::fmt; use std::sync::mpsc::SyncSender; +use crate::config::Target; +use crate::data::CheckMessage; +use crate::target::influx; +use crate::target::influx::InfluxConfig; +use crate::target::postgres::PostgresConfig; +use crate::{target, SensorReading}; use anyhow::Result; use chrono::{DateTime, Utc}; +use influxdb::{Timestamp, WriteQuery}; use paho_mqtt::Message; use serde::{Deserialize, Serialize}; - -use crate::data::CheckMessage; -use crate::SensorReading; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; #[derive(Serialize, Deserialize, Clone)] pub struct Data { @@ -43,7 +49,7 @@ impl CheckMessage for SensorLogger { let location = split.nth(1); let measurement = split.next(); - let result = parse(&msg); + let result = parse(msg); if let (Some(location), Some(measurement), Ok(result)) = (location, measurement, &result) { let date_time = Self::convert_timestamp(result.timestamp as i64); @@ -174,3 +180,45 @@ mod tests { Ok(()) } } + +pub fn create_logger(targets: Vec) -> (Arc>, Vec>) { + let mut txs: Vec> = Vec::new(); + let mut handles: Vec> = Vec::new(); + + for target in targets { + let (tx, handle) = match target { + Target::InfluxDB { + url, + database, + user, + password, + } => { + fn mapper(result: SensorReading) -> WriteQuery { + let timestamp = Timestamp::Seconds(result.time.timestamp() as u128); + WriteQuery::new(timestamp, result.measurement.to_string()) + .add_tag("location", result.location.to_string()) + .add_tag("sensor", result.sensor.to_string()) + .add_field("value", result.value) + } + + influx::spawn_influxdb_writer( + InfluxConfig::new(url, database, user, password), + mapper, + ) + } + Target::Postgresql { + host, + port, + user, + password, + database, + } => target::postgres::spawn_postgres_writer(PostgresConfig::new( + host, port, user, password, database, + )), + }; + txs.push(tx); + handles.push(handle); + } + + (Arc::new(Mutex::new(SensorLogger::new(txs))), handles) +} diff --git a/src/data/opendtu/mod.rs b/src/data/opendtu/mod.rs index a5f3531..2b2c7fe 100644 --- a/src/data/opendtu/mod.rs +++ b/src/data/opendtu/mod.rs @@ -1,12 +1,16 @@ use std::sync::mpsc::SyncSender; +use crate::config::Target; +use crate::data::CheckMessage; +use crate::target::influx; +use crate::target::influx::InfluxConfig; use anyhow::Result; use chrono::Datelike; use influxdb::Timestamp::Seconds; use influxdb::WriteQuery; use paho_mqtt::Message; - -use crate::data::CheckMessage; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; struct Data { timestamp: i64, @@ -134,7 +138,7 @@ impl OpenDTUParser { } } - return Ok(data); + Ok(data) } } @@ -192,3 +196,31 @@ mod tests { Ok(()) } } + +pub fn create_logger(targets: Vec) -> (Arc>, Vec>) { + let mut txs: Vec> = Vec::new(); + let mut handles: Vec> = Vec::new(); + + for target in targets { + let (tx, handle) = match target { + Target::InfluxDB { + url, + database, + user, + password, + } => influx::spawn_influxdb_writer( + InfluxConfig::new(url, database, user, password), + std::convert::identity, + ), + Target::Postgresql { .. } => { + panic!("Postgresql not supported for opendtu"); + } + }; + txs.push(tx); + handles.push(handle); + } + + let logger = OpenDTULogger::new(txs); + + (Arc::new(Mutex::new(logger)), handles) +} diff --git a/src/data/shelly/data.rs b/src/data/shelly/data.rs new file mode 100644 index 0000000..4d3248c --- /dev/null +++ b/src/data/shelly/data.rs @@ -0,0 +1,191 @@ +use crate::data::shelly::{Timestamped, Typenamed}; +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SwitchData { + pub(crate) output: bool, + #[serde(rename = "apower")] + pub(crate) power: Option, + pub(crate) voltage: Option, + pub(crate) current: Option, + #[serde(rename = "aenergy")] + pub(crate) energy: EnergyData, + pub(crate) temperature: TemperatureData, +} + +impl Timestamped for SwitchData { + fn timestamp(&self) -> Option { + self.energy.minute_ts + } +} + +impl Typenamed for SwitchData { + fn type_name(&self) -> &str { + "switch" + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct CoverData { + #[serde(rename = "current_pos")] + pub(crate) position: Option, + #[serde(rename = "apower")] + pub(crate) power: Option, + pub(crate) voltage: Option, + pub(crate) current: Option, + #[serde(rename = "aenergy")] + pub(crate) energy: EnergyData, + pub(crate) temperature: TemperatureData, +} + +impl Timestamped for CoverData { + fn timestamp(&self) -> Option { + self.energy.minute_ts + } +} + +impl Typenamed for CoverData { + fn type_name(&self) -> &str { + "cover" + } +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct EnergyData { + pub(crate) total: f32, + pub(crate) minute_ts: Option, +} + +impl fmt::Debug for EnergyData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{} Wh", self.total) + } +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct TemperatureData { + #[serde(rename = "tC")] + pub(crate) t_celsius: f32, +} + +impl fmt::Debug for TemperatureData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{} °C", self.t_celsius) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_switch_data_debug() { + let energy = EnergyData { + total: 10.0, + minute_ts: Some(1627848123), + }; + let temperature = TemperatureData { t_celsius: 25.0 }; + let switch_data = SwitchData { + output: true, + power: Some(100.0), + voltage: Some(220.0), + current: Some(0.45), + energy, + temperature, + }; + + assert_eq!(format!("{:?}", switch_data.energy), "10 Wh"); + assert_eq!(format!("{:?}", switch_data.temperature), "25 °C"); + } + + #[test] + fn test_cover_data_debug() { + let energy = EnergyData { + total: 20.0, + minute_ts: Some(1627848124), + }; + let temperature = TemperatureData { t_celsius: 26.0 }; + let cover_data = CoverData { + position: Some(50), + power: Some(110.0), + voltage: Some(230.0), + current: Some(0.50), + energy, + temperature, + }; + + assert_eq!(format!("{:?}", cover_data.energy), "20 Wh"); + assert_eq!(format!("{:?}", cover_data.temperature), "26 °C"); + } + + #[test] + fn test_switch_data_timestamp() { + let energy = EnergyData { + total: 10.0, + minute_ts: Some(1627848123), + }; + let switch_data = SwitchData { + output: true, + power: Some(100.0), + voltage: Some(220.0), + current: Some(0.45), + energy, + temperature: TemperatureData { t_celsius: 25.0 }, + }; + + assert_eq!(switch_data.timestamp(), Some(1627848123)); + } + + #[test] + fn test_cover_data_timestamp() { + let energy = EnergyData { + total: 20.0, + minute_ts: Some(1627848124), + }; + let cover_data = CoverData { + position: Some(50), + power: Some(110.0), + voltage: Some(230.0), + current: Some(0.50), + energy, + temperature: TemperatureData { t_celsius: 26.0 }, + }; + + assert_eq!(cover_data.timestamp(), Some(1627848124)); + } + + #[test] + fn test_switch_data_typename() { + let switch_data = SwitchData { + output: true, + power: Some(100.0), + voltage: Some(220.0), + current: Some(0.45), + energy: EnergyData { + total: 10.0, + minute_ts: Some(1627848123), + }, + temperature: TemperatureData { t_celsius: 25.0 }, + }; + + assert_eq!(switch_data.type_name(), "switch"); + } + + #[test] + fn test_cover_data_typename() { + let cover_data = CoverData { + position: Some(50), + power: Some(110.0), + voltage: Some(230.0), + current: Some(0.50), + energy: EnergyData { + total: 20.0, + minute_ts: Some(1627848124), + }, + temperature: TemperatureData { t_celsius: 26.0 }, + }; + + assert_eq!(cover_data.type_name(), "cover"); + } +} diff --git a/src/data/shelly/mod.rs b/src/data/shelly/mod.rs index b7a06bc..e68823b 100644 --- a/src/data/shelly/mod.rs +++ b/src/data/shelly/mod.rs @@ -1,16 +1,21 @@ -use std::fmt; +mod data; + use std::fmt::Debug; use std::sync::mpsc::SyncSender; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock, Mutex}; +use crate::config::Target; +use crate::data::{shelly, CheckMessage}; +use crate::target::influx; +use crate::target::influx::InfluxConfig; +use crate::WriteType; use anyhow::Result; +use data::{CoverData, SwitchData}; use influxdb::{Timestamp, WriteQuery}; use paho_mqtt::Message; use regex::Regex; -use serde::{Deserialize, Serialize}; - -use crate::data::{shelly, CheckMessage}; -use crate::WriteType; +use serde::Deserialize; +use std::thread::JoinHandle; pub trait Timestamped { fn timestamp(&self) -> Option; @@ -20,79 +25,6 @@ pub trait Typenamed { fn type_name(&self) -> &str; } -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct SwitchData { - pub(crate) output: bool, - #[serde(rename = "apower")] - pub(crate) power: Option, - pub(crate) voltage: Option, - pub(crate) current: Option, - #[serde(rename = "aenergy")] - pub(crate) energy: EnergyData, - pub(crate) temperature: TemperatureData, -} - -impl Timestamped for SwitchData { - fn timestamp(&self) -> Option { - self.energy.minute_ts - } -} - -impl Typenamed for SwitchData { - fn type_name(&self) -> &str { - return "switch"; - } -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct CoverData { - #[serde(rename = "current_pos")] - pub(crate) position: Option, - #[serde(rename = "apower")] - pub(crate) power: Option, - pub(crate) voltage: Option, - pub(crate) current: Option, - #[serde(rename = "aenergy")] - pub(crate) energy: EnergyData, - pub(crate) temperature: TemperatureData, -} - -impl Timestamped for CoverData { - fn timestamp(&self) -> Option { - self.energy.minute_ts - } -} - -impl Typenamed for CoverData { - fn type_name(&self) -> &str { - return "cover"; - } -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct EnergyData { - pub(crate) total: f32, - pub(crate) minute_ts: Option, -} - -impl fmt::Debug for EnergyData { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{} Wh", self.total) - } -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct TemperatureData { - #[serde(rename = "tC")] - pub(crate) t_celsius: f32, -} - -impl fmt::Debug for TemperatureData { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{} °C", self.t_celsius) - } -} - pub struct ShellyLogger { txs: Vec>, } @@ -107,7 +39,9 @@ pub fn parse<'a, T: Deserialize<'a> + Clone>(msg: &'a Message) -> Result { Ok(serde_json::from_slice::(msg.payload())?) } -const SWITCH_FIELDS: &[(&str, fn(data: &SwitchData) -> Option, &str)] = &[ +type WriteTypeMapper = fn(&T) -> Option; + +const SWITCH_FIELDS: &[(&str, WriteTypeMapper, &str)] = &[ ( "output", |data: &SwitchData| Some(WriteType::Int(data.output as i32)), @@ -115,17 +49,17 @@ const SWITCH_FIELDS: &[(&str, fn(data: &SwitchData) -> Option, &str)] ), ( "power", - |data: &SwitchData| data.power.map(|value| WriteType::Float(value)), + |data: &SwitchData| data.power.map(WriteType::Float), "W", ), ( "current", - |data: &SwitchData| data.current.map(|value| WriteType::Float(value)), + |data: &SwitchData| data.current.map(WriteType::Float), "A", ), ( "voltage", - |data: &SwitchData| data.voltage.map(|value| WriteType::Float(value)), + |data: &SwitchData| data.voltage.map(WriteType::Float), "V", ), ( @@ -140,25 +74,25 @@ const SWITCH_FIELDS: &[(&str, fn(data: &SwitchData) -> Option, &str)] ), ]; -const COVER_FIELDS: &[(&str, fn(data: &CoverData) -> Option, &str)] = &[ +const COVER_FIELDS: &[(&str, WriteTypeMapper, &str)] = &[ ( "position", - |data: &CoverData| data.position.map(|value| WriteType::Int(value)), + |data: &CoverData| data.position.map(WriteType::Int), "%", ), ( "power", - |data: &CoverData| data.power.map(|value| WriteType::Float(value)), + |data: &CoverData| data.power.map(WriteType::Float), "W", ), ( "current", - |data: &CoverData| data.current.map(|value| WriteType::Float(value)), + |data: &CoverData| data.current.map(WriteType::Float), "A", ), ( "voltage", - |data: &CoverData| data.voltage.map(|value| WriteType::Float(value)), + |data: &CoverData| data.voltage.map(WriteType::Float), "V", ), ( @@ -173,8 +107,10 @@ const COVER_FIELDS: &[(&str, fn(data: &CoverData) -> Option, &str)] = ), ]; -static SWITCH_REGEX: LazyLock Regex> = LazyLock::new(|| Regex::new("/status/switch:.").unwrap()); -static COVER_REGEX: LazyLock Regex> = LazyLock::new(|| Regex::new("/status/cover:.").unwrap()); +static SWITCH_REGEX: LazyLock Regex> = + LazyLock::new(|| Regex::new("/status/switch:.").unwrap()); +static COVER_REGEX: LazyLock Regex> = + LazyLock::new(|| Regex::new("/status/cover:.").unwrap()); impl CheckMessage for ShellyLogger { fn check_message(&mut self, msg: &Message) { @@ -190,11 +126,11 @@ impl CheckMessage for ShellyLogger { fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped + Typenamed>( msg: &'a Message, txs: &Vec>, - fields: &[(&str, fn(&T) -> Option, &str)], + fields: &[(&str, WriteTypeMapper, &str)], ) { let location = msg.topic().split("/").nth(1).unwrap(); let channel = msg.topic().split(":").last().unwrap(); - let result: Option = shelly::parse(&msg).unwrap(); + let result: Option = shelly::parse(msg).unwrap(); if let Some(data) = result { println!("Shelly {}:{}: {:?}", location, channel, data); @@ -241,26 +177,43 @@ mod tests { let mut logger = ShellyLogger::new(txs); - let message = Message::new("shellies/loo-fan/status/switch:1", "{\"id\":0, \"source\":\"timer\", \"output\":false, \"apower\":0.0, \"voltage\":226.5, \"current\":3.1, \"aenergy\":{\"total\":1094.865,\"by_minute\":[0.000,0.000,0.000],\"minute_ts\":1703415907},\"temperature\":{\"tC\":36.4, \"tF\":97.5}}", QOS_1); + let message = Message::new( + "shellies/loo-fan/status/switch:1", + "{\"id\":0, \"source\":\"timer\", \"output\":false, \ + \"apower\":0.0, \"voltage\":226.5, \"current\":3.1, \ + \"aenergy\":{\"total\":1094.865,\"by_minute\":[0.000,0.000,0.000],\ + \"minute_ts\":1703415907},\"temperature\":{\"tC\":36.4, \"tF\":97.5}}", + QOS_1, + ); logger.check_message(&message); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("output,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=bool value=0i ")); + assert!(result.starts_with( + "output,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=bool value=0i " + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("power,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=W value=0 ")); + assert!(result.starts_with( + "power,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=W value=0 " + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("current,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=A value=3.0999")); + assert!(result.starts_with( + "current,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=A value=3.0999" + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("voltage,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=V value=226.5 ")); + assert!(result.starts_with( + "voltage,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=V value=226.5 " + )); let result = rx.recv()?.build()?.get(); assert!(result.starts_with("total_energy,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=Wh value=1094.86")); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("temperature,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=°C value=36.40")); + assert!(result.starts_with( + "temperature,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=°C value=36.40" + )); let result = rx.recv_timeout(Duration::from_micros(100)); assert!(result.is_err()); @@ -274,17 +227,29 @@ mod tests { let mut logger = ShellyLogger::new(txs); - let message = Message::new("shellies/bedroom-curtain/status/cover:0", "{\"id\":0, \"source\":\"limit_switch\", \"state\":\"open\",\"apower\":0.0,\"voltage\":231.7,\"current\":0.500,\"pf\":0.00,\"freq\":50.0,\"aenergy\":{\"total\":3.143,\"by_minute\":[0.000,0.000,97.712],\"minute_ts\":1703414519},\"temperature\":{\"tC\":30.7, \"tF\":87.3},\"pos_control\":true,\"last_direction\":\"open\",\"current_pos\":100}", QOS_1); + let message = Message::new( + "shellies/bedroom-curtain/status/cover:0", + "{\"id\":0, \"source\":\"limit_switch\", \"state\":\"open\",\ + \"apower\":0.0,\"voltage\":231.7,\"current\":0.500,\"pf\":0.00,\"freq\":50.0,\ + \"aenergy\":{\"total\":3.143,\"by_minute\":[0.000,0.000,97.712],\ + \"minute_ts\":1703414519},\"temperature\":{\"tC\":30.7, \"tF\":87.3},\ + \"pos_control\":true,\"last_direction\":\"open\",\"current_pos\":100}", + QOS_1, + ); logger.check_message(&message); let result = rx.recv()?.build()?.get(); assert!(result.starts_with("position,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=% value=100i ")); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("power,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=W value=0 ")); + assert!(result.starts_with( + "power,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=W value=0 " + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("current,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=A value=0.5 ")); + assert!(result.starts_with( + "current,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=A value=0.5 " + )); let result = rx.recv()?.build()?.get(); assert!(result.starts_with("voltage,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=V value=231.6999")); @@ -352,3 +317,29 @@ mod tests { Ok(()) } } + +pub fn create_logger(targets: Vec) -> (Arc>, Vec>) { + let mut txs: Vec> = Vec::new(); + let mut handles: Vec> = Vec::new(); + + for target in targets { + let (tx, handle) = match target { + Target::InfluxDB { + url, + database, + user, + password, + } => influx::spawn_influxdb_writer( + InfluxConfig::new(url, database, user, password), + std::convert::identity, + ), + Target::Postgresql { .. } => { + panic!("Postgresql not supported for shelly"); + } + }; + txs.push(tx); + handles.push(handle); + } + + (Arc::new(Mutex::new(ShellyLogger::new(txs))), handles) +} diff --git a/src/main.rs b/src/main.rs index 26c5412..a41c023 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,35 +1,27 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::mpsc::SyncSender; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::{fs, process, time::Duration}; -use crate::config::{SourceType, Target}; -use crate::data::klimalogger::SensorLogger; -use crate::data::opendtu::OpenDTULogger; -use crate::data::shelly::ShellyLogger; +use crate::config::SourceType; use crate::data::CheckMessage; -use crate::target::influx::InfluxConfig; -use crate::target::postgres::PostgresConfig; use chrono::{DateTime, Utc}; +use data::{klimalogger, opendtu, shelly}; use futures::{executor::block_on, stream::StreamExt}; -use influxdb::{Timestamp, WriteQuery}; use paho_mqtt as mqtt; -use paho_mqtt::QOS_1; -use target::influx; - +use paho_mqtt::{AsyncClient, QOS_1}; mod config; mod data; mod target; #[derive(Debug, Clone)] -struct SensorReading { - measurement: String, - time: DateTime, - location: String, - sensor: String, - value: f32, +pub struct SensorReading { + pub measurement: String, + pub time: DateTime, + pub location: String, + pub sensor: String, + pub value: f32, } pub enum WriteType { @@ -54,9 +46,9 @@ fn main() { for source in config.sources { let (logger, mut source_handles) = match source.source_type { - SourceType::Shelly => create_shelly_logger(source.targets), - SourceType::Sensor => create_sensor_logger(source.targets), - SourceType::OpenDTU => create_opendtu_logger(source.targets), + SourceType::Shelly => shelly::create_logger(source.targets), + SourceType::Sensor => klimalogger::create_logger(source.targets), + SourceType::OpenDTU => opendtu::create_logger(source.targets), }; handler_map.insert(source.prefix.clone(), logger); handles.append(&mut source_handles); @@ -65,20 +57,7 @@ fn main() { qoss.push(QOS_1); } - let mqtt_url = config.mqtt_url; - let mqtt_client_id = config.mqtt_client_id; - - println!("Connecting to the MQTT server at '{}'...", mqtt_url); - - let create_opts = mqtt::CreateOptionsBuilder::new_v3() - .server_uri(mqtt_url) - .client_id(mqtt_client_id) - .finalize(); - - let mut mqtt_client = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { - println!("Error creating the client: {:?}", e); - process::exit(1); - }); + let mut mqtt_client = create_mqtt_client(config.mqtt_url, config.mqtt_client_id); if let Err(err) = block_on(async { // Get message stream before connecting. @@ -132,104 +111,16 @@ fn main() { } } -fn create_shelly_logger( - targets: Vec, -) -> (Arc>, Vec>) { - let mut txs: Vec> = Vec::new(); - let mut handles: Vec> = Vec::new(); - - for target in targets { - let (tx, handle) = match target { - Target::InfluxDB { - url, - database, - user, - password, - } => influx::spawn_influxdb_writer( - InfluxConfig::new(url, database, user, password), - std::convert::identity, - ), - Target::Postgresql { .. } => { - panic!("Postgresql not supported for shelly"); - } - }; - txs.push(tx); - handles.push(handle); - } - - (Arc::new(Mutex::new(ShellyLogger::new(txs))), handles) -} - -fn create_sensor_logger( - targets: Vec, -) -> (Arc>, Vec>) { - let mut txs: Vec> = Vec::new(); - let mut handles: Vec> = Vec::new(); - - for target in targets { - let (tx, handle) = match target { - Target::InfluxDB { - url, - database, - user, - password, - } => { - fn mapper(result: SensorReading) -> WriteQuery { - let timestamp = Timestamp::Seconds(result.time.timestamp() as u128); - WriteQuery::new(timestamp, result.measurement.to_string()) - .add_tag("location", result.location.to_string()) - .add_tag("sensor", result.sensor.to_string()) - .add_field("value", result.value) - } - - influx::spawn_influxdb_writer( - InfluxConfig::new(url, database, user, password), - mapper, - ) - } - Target::Postgresql { - host, - port, - user, - password, - database, - } => target::postgres::spawn_postgres_writer(PostgresConfig::new( - host, port, user, password, database, - )), - }; - txs.push(tx); - handles.push(handle); - } - - (Arc::new(Mutex::new(SensorLogger::new(txs))), handles) -} - -fn create_opendtu_logger( - targets: Vec, -) -> (Arc>, Vec>) { - let mut txs: Vec> = Vec::new(); - let mut handles: Vec> = Vec::new(); - - for target in targets { - let (tx, handle) = match target { - Target::InfluxDB { - url, - database, - user, - password, - } => influx::spawn_influxdb_writer( - InfluxConfig::new(url, database, user, password), - std::convert::identity, - ), - Target::Postgresql { .. } => { - panic!("Postgresql not supported for opendtu"); - } - }; - txs.push(tx); - handles.push(handle); - } +fn create_mqtt_client(mqtt_url: String, mqtt_client_id: String) -> AsyncClient { + println!("Connecting to the MQTT server at '{}'...", mqtt_url); - let logger = OpenDTULogger::new(txs); + let create_opts = mqtt::CreateOptionsBuilder::new_v3() + .server_uri(mqtt_url) + .client_id(mqtt_client_id) + .finalize(); - (Arc::new(Mutex::new(logger)), handles) + mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }) } diff --git a/src/target/influx/mod.rs b/src/target/influx/mod.rs index 7450757..fd522dc 100644 --- a/src/target/influx/mod.rs +++ b/src/target/influx/mod.rs @@ -1,5 +1,9 @@ +use async_trait::async_trait; +//use anyhow::Result; use futures::executor::block_on; use influxdb::{Client, WriteQuery}; +#[cfg(test)] +use mockall::automock; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread; use std::thread::JoinHandle; @@ -27,26 +31,53 @@ impl InfluxConfig { } } -pub fn influxdb_writer( +struct DefaultInfluxClient { + client: Client, +} + +impl DefaultInfluxClient { + fn new(client: Client) -> Box { + Box::new(DefaultInfluxClient { client }) + } +} + +#[cfg_attr(test, automock)] +#[async_trait] +trait InfluxClient: Sync + Send { + async fn query(&self, write_query: WriteQuery) -> Result; +} + +#[async_trait] +impl InfluxClient for DefaultInfluxClient { + async fn query(&self, write_query: WriteQuery) -> Result { + self.client.query(write_query).await + } +} + +fn create_influxdb_client(influx_config: &InfluxConfig) -> anyhow::Result> { + let mut influx_client = Client::new(influx_config.url.clone(), influx_config.database.clone()); + + influx_client = if let (Some(user), Some(password)) = + (influx_config.user.clone(), influx_config.password.clone()) + { + influx_client.with_auth(user, password) + } else { + influx_client + }; + + Ok(DefaultInfluxClient::new(influx_client)) +} + +fn influxdb_writer( rx: Receiver, + influx_client: Box, influx_config: InfluxConfig, query_mapper: fn(T) -> WriteQuery, ) { - let influx_url = influx_config.url.clone(); - let influx_database = influx_config.database.clone(); - - let mut influx_client = Client::new(influx_config.url, influx_config.database); - influx_client = - if let (Some(user), Some(password)) = (influx_config.user, influx_config.password) { - influx_client.with_auth(user, password) - } else { - influx_client - }; - block_on(async move { println!( "starting influx writer async {} {}", - &influx_url, &influx_database + &influx_config.url, &influx_config.database ); loop { @@ -65,7 +96,7 @@ pub fn influxdb_writer( Err(error) => { panic!( "#### Error writing to influx: {} {}: {:?}", - &influx_url, &influx_database, error + &influx_config.url, &influx_config.database, error ); } } @@ -77,8 +108,18 @@ pub fn influxdb_writer( } pub fn spawn_influxdb_writer( - config: InfluxConfig, - mapper: fn(T) -> WriteQuery, + influx_config: InfluxConfig, + query_mapper: fn(T) -> WriteQuery, +) -> (SyncSender, JoinHandle<()>) { + let influx_client = + create_influxdb_client(&influx_config).expect("could not create influxdb client"); + spawn_influxdb_writer_internal(influx_client, influx_config, query_mapper) +} + +fn spawn_influxdb_writer_internal( + influx_client: Box, + influx_config: InfluxConfig, + query_mapper: fn(T) -> WriteQuery, ) -> (SyncSender, JoinHandle<()>) { let (tx, rx) = sync_channel(100); @@ -87,10 +128,58 @@ pub fn spawn_influxdb_writer( thread::spawn(move || { println!( "starting influx writer {} {}", - &config.url, &config.database + &influx_config.url, &influx_config.database ); - influxdb_writer(rx, config, mapper); + influxdb_writer(rx, influx_client, influx_config, query_mapper) }), ) } + +#[cfg(test)] +mod tests { + use super::*; + use influxdb::Timestamp::Seconds; + + // A mock `WriteQuery` for testing purposes + fn mock_write_query(data: String) -> WriteQuery { + println!("mock write query {}", data); + + assert_eq!(data, "test_data"); + + let current_timestamp = Seconds(chrono::Utc::now().timestamp() as u128); + WriteQuery::new(current_timestamp, "measurement") + .add_field("field", influxdb::Type::Float(1.23)) + } + + // + #[test] + fn test_influxdb_writer_internal() -> anyhow::Result<()> { + let influx_config = InfluxConfig::new( + "http://localhost:8086".to_string(), + "test_db".to_string(), + Some("user".to_string()), + Some("password".to_string()), + ); + + let mut mock_client = Box::new(MockInfluxClient::new()); + mock_client + .expect_query() + .times(1) + .returning(|_| Ok("Success".to_string())); + + // Run the `influxdb_writer` function + let (tx, join_handle) = + spawn_influxdb_writer_internal(mock_client, influx_config, mock_write_query); + + // Send a test query + tx.send("test_data".to_string()).unwrap(); + + // Close the channel + drop(tx); + + join_handle.join().expect("stopped writer"); + + Ok(()) + } +} diff --git a/src/target/postgres/mod.rs b/src/target/postgres/mod.rs index dc3857c..4779d75 100644 --- a/src/target/postgres/mod.rs +++ b/src/target/postgres/mod.rs @@ -1,6 +1,10 @@ use crate::SensorReading; use futures::executor::block_on; -use postgres::{Config, NoTls}; +#[cfg(test)] +use mockall::automock; +use postgres::types::ToSql; +use postgres::Client; +use postgres::{Error, NoTls}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread; use std::thread::JoinHandle; @@ -31,11 +35,34 @@ impl PostgresConfig { } } -fn start_postgres_writer(rx: Receiver, config: Config) { - let mut client = config - .connect(NoTls) - .expect("failed to connect to postgres"); +#[cfg_attr(test, automock)] +pub trait PostgresClient: Send { + fn execute<'a>( + &mut self, + query: &String, + params: &'a [&'a (dyn ToSql + Sync)], + ) -> Result; +} + +struct DefaultPostgresClient { + client: Client, +} + +impl DefaultPostgresClient { + fn new(client: Client) -> Self { + DefaultPostgresClient { client } + } +} + +impl DefaultPostgresClient {} +impl PostgresClient for DefaultPostgresClient { + fn execute(&mut self, query: &String, params: &[&(dyn ToSql + Sync)]) -> Result { + self.client.execute(query, params) + } +} + +fn start_postgres_writer(rx: Receiver, mut client: Box) { block_on(async move { println!("starting postgres writer async"); @@ -77,21 +104,71 @@ fn start_postgres_writer(rx: Receiver, config: Config) { pub fn spawn_postgres_writer( config: PostgresConfig, ) -> (SyncSender, JoinHandle<()>) { - let (tx, rx) = sync_channel(100); + let client = create_postgres_client(&config); + spawn_postgres_writer_internal(client) +} - let mut db_config = postgres::Config::new(); - let _ = db_config +fn create_postgres_client(config: &PostgresConfig) -> Box { + let client = postgres::Config::new() .host(&config.host) .port(config.port) .user(&config.username) - .password(config.password) - .dbname(&config.database); + .password(&config.password) + .dbname(&config.database) + .connect(NoTls) + .expect("failed to connect to Postgres database"); + Box::new(DefaultPostgresClient::new(client)) +} + +pub fn spawn_postgres_writer_internal( + client: Box, +) -> (SyncSender, JoinHandle<()>) { + let (tx, rx) = sync_channel(100); ( tx, thread::spawn(move || { println!("starting postgres writer"); - start_postgres_writer(rx, db_config); + start_postgres_writer(rx, client); }), ) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_postgres_writer_internal() -> anyhow::Result<()> { + let sensor_reading = SensorReading { + measurement: "measurement".to_string(), + time: chrono::Utc::now(), + location: "location".to_string(), + sensor: "sensor".to_string(), + value: 123.4, + }; + + let sensor_reading_duplicate = sensor_reading.clone(); + + let mut mock_client = Box::new(MockPostgresClient::new()); + mock_client.expect_execute() + .times(1) + .withf(move |query, parameters| { + let expected_parameters: [&dyn ToSql; 4] = [&sensor_reading_duplicate.time, &sensor_reading_duplicate.location, &sensor_reading_duplicate.sensor, &sensor_reading_duplicate.value]; + query == "insert into \"measurement\" (time, location, sensor, value) values ($1, $2, $3, $4);" || + parameters.len() == expected_parameters.len() && + parameters.iter().zip(expected_parameters.iter()).all(|(a, b)| format!("{a:?}") == format!("{b:?}")) + }) + .returning(|_, _| Ok(123)); + + let (tx, join_handle) = spawn_postgres_writer_internal(mock_client); + + tx.send(sensor_reading).unwrap(); + + drop(tx); + + let _ = join_handle.join(); + + Ok(()) + } +}