From 687ac4da61957927ac78f289b689600db9633301 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Wed, 27 Nov 2024 22:14:28 +0100 Subject: [PATCH] add debug and openmqttgateway data --- README.md | 3 +- src/config/mod.rs | 14 ++- src/data/debug/mod.rs | 48 ++++++++++ src/data/klimalogger/mod.rs | 2 +- src/data/mod.rs | 2 + src/data/opendtu/mod.rs | 4 +- src/data/openmqttgateway/mod.rs | 161 ++++++++++++++++++++++++++++++++ src/data/shelly/mod.rs | 2 +- src/main.rs | 37 ++++---- src/source/mod.rs | 2 +- src/source/mqtt/mod.rs | 4 +- src/target/debug/mod.rs | 54 +++++++++++ src/target/influx/mod.rs | 2 +- src/target/postgres/mod.rs | 2 +- 14 files changed, 307 insertions(+), 30 deletions(-) create mode 100644 src/data/debug/mod.rs create mode 100644 src/data/openmqttgateway/mod.rs create mode 100644 src/target/debug/mod.rs diff --git a/README.md b/README.md index af310a3..0ef0e89 100644 --- a/README.md +++ b/README.md @@ -25,8 +25,7 @@ sources: prefix: "sensors" targets: - type: "influxdb" - host: "" - port: 8086 + url: "http://:8086" database: "sensors" - type: "postgresql" host: "" diff --git a/src/config/mod.rs b/src/config/mod.rs index 4da6e2d..b869689 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -8,6 +8,10 @@ pub enum SourceType { Sensor, #[serde(rename = "opendtu")] OpenDTU, + #[serde(rename = "openmqttgateway")] + OpenMqttGateway, + #[serde(rename = "debug")] + Debug, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -16,7 +20,7 @@ pub struct Source { #[serde(rename = "type")] pub(crate) source_type: SourceType, pub(crate) prefix: String, - pub(crate) targets: Vec, + pub(crate) targets: Option>, } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] @@ -37,6 +41,9 @@ pub enum Target { password: String, database: String, }, + // #[serde(rename = "debug")] + // Debug { + // }, } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] @@ -126,8 +133,9 @@ mod tests { assert_eq!(result.source_type, SourceType::Sensor); assert_eq!(result.prefix, "bar"); - assert_eq!(result.targets.len(), 1); - let target = &result.targets[0]; + let targets = result.targets.unwrap(); + assert_eq!(targets.len(), 1); + let target = &targets[0]; if let Target::InfluxDB { url, database, .. } = target { assert_eq!(url, "baz"); diff --git a/src/data/debug/mod.rs b/src/data/debug/mod.rs new file mode 100644 index 0000000..929e8d8 --- /dev/null +++ b/src/data/debug/mod.rs @@ -0,0 +1,48 @@ +use std::fmt; + +use crate::config::Target; +use crate::data::CheckMessage; +use log::{info, warn}; +use paho_mqtt::Message; +use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; + +#[derive(Serialize, Deserialize, Clone)] +pub struct Data { + #[serde(rename = "time")] + pub(crate) timestamp: i32, + pub(crate) value: f32, + pub(crate) sensor: String, +} + +impl fmt::Debug for Data { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{} (@{}, {})", self.value, self.timestamp, self.sensor) + } +} + +pub struct DebugLogger {} + +impl DebugLogger { + pub(crate) fn new() -> Self { + DebugLogger {} + } +} + +impl CheckMessage for DebugLogger { + fn check_message(&mut self, msg: &Message) { + let topic = msg.topic(); + let payload = msg.payload_str(); + + info!("'{}' with {}", topic, payload); + } +} + +pub fn create_logger(targets: Vec) -> (Arc>, Vec>) { + if targets.len() > 0 { + warn!("debug type has targets defined: {:?}", &targets); + } + + (Arc::new(Mutex::new(DebugLogger::new())), Vec::new()) +} diff --git a/src/data/klimalogger/mod.rs b/src/data/klimalogger/mod.rs index e049a00..36bb821 100644 --- a/src/data/klimalogger/mod.rs +++ b/src/data/klimalogger/mod.rs @@ -10,11 +10,11 @@ use crate::{target, SensorReading}; use anyhow::Result; use chrono::{DateTime, Utc}; use influxdb::{Timestamp, WriteQuery}; +use log::{debug, warn}; use paho_mqtt::Message; use serde::{Deserialize, Serialize}; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; -use log::{debug, warn}; #[derive(Serialize, Deserialize, Clone)] pub struct Data { diff --git a/src/data/mod.rs b/src/data/mod.rs index 92ecee7..16a91d3 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,8 +1,10 @@ use paho_mqtt::Message; use serde::{Deserialize, Serialize}; +pub(crate) mod debug; pub(crate) mod klimalogger; pub(crate) mod opendtu; +pub(crate) mod openmqttgateway; pub(crate) mod shelly; #[derive(Debug, Deserialize, Serialize)] diff --git a/src/data/opendtu/mod.rs b/src/data/opendtu/mod.rs index 84e9931..40cf284 100644 --- a/src/data/opendtu/mod.rs +++ b/src/data/opendtu/mod.rs @@ -8,10 +8,10 @@ use anyhow::Result; use chrono::Datelike; use influxdb::Timestamp::Seconds; use influxdb::WriteQuery; +use log::{debug, trace}; use paho_mqtt::Message; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; -use log::{debug, trace}; struct Data { timestamp: i64, @@ -102,7 +102,7 @@ impl OpenDTUParser { } } "device" => { - // ignore device global data + // ignore device global data trace!(" device: {:}: {:?}", field, msg.payload_str()) } "status" => { diff --git a/src/data/openmqttgateway/mod.rs b/src/data/openmqttgateway/mod.rs new file mode 100644 index 0000000..8f5d1bd --- /dev/null +++ b/src/data/openmqttgateway/mod.rs @@ -0,0 +1,161 @@ +use std::collections::HashMap; +use std::sync::mpsc::SyncSender; + +use crate::config::Target; +use crate::data::CheckMessage; +use crate::target::influx; +use crate::target::influx::InfluxConfig; +use anyhow::Result; +use influxdb::Timestamp::Seconds; +use influxdb::WriteQuery; +use log::warn; +use paho_mqtt::Message; +use serde_json::{Map, Number, Value}; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; + +struct Data { + fields: HashMap, + tags: HashMap, +} + +pub struct OpenMqttGatewayLogger { + txs: Vec>, + parser: OpenMqttGatewayParser, +} + +impl OpenMqttGatewayLogger { + pub(crate) fn new(txs: Vec>) -> Self { + OpenMqttGatewayLogger { + txs, + parser: OpenMqttGatewayParser::new(), + } + } +} + +impl CheckMessage for OpenMqttGatewayLogger { + fn check_message(&mut self, msg: &Message) { + let data = self.parser.parse(msg).unwrap(); + if let Some(data) = data { + let timestamp = chrono::offset::Utc::now(); + + let mut write_query = WriteQuery::new(Seconds(timestamp.timestamp() as u128), "btle"); + for (key, value) in data.fields { + write_query = write_query.add_field(key, value.as_f64()); + } + for (key, value) in data.tags { + write_query = write_query.add_tag(key, value); + } + for tx in &self.txs { + tx.send(write_query.clone()).expect("failed to send"); + } + } + } +} + +fn parse_json(payload: &str) -> Result> { + let parsed: Value = serde_json::from_str(payload)?; + let obj: Map = parsed.as_object().unwrap().clone(); + Ok(obj) +} + +struct OpenMqttGatewayParser {} + +impl OpenMqttGatewayParser { + pub fn new() -> Self { + OpenMqttGatewayParser {} + } + + fn parse(&mut self, msg: &Message) -> Result> { + let mut data: Option = None; + + let mut split = msg.topic().split("/"); + let _ = split.next(); + let gateway_id = split.next(); + let channel = split.next(); + let device_id = split.next(); + + if let (Some(gateway_id), Some(channel), Some(device_id)) = (gateway_id, channel, device_id) + { + if channel == "BTtoMQTT" { + let mut result = parse_json(&msg.payload_str())?; + + let _ = result.remove("id"); + + let mut fields = HashMap::new(); + let mut tags = HashMap::new(); + tags.insert(String::from("device"), String::from(device_id)); + tags.insert(String::from("gateway"), String::from(gateway_id)); + for (key, value) in result { + match value { + Value::Number(value) => { + fields.insert(key, value); + } + Value::String(value) => { + tags.insert(key, value); + } + _ => { + warn!("unhandled entry {}: {:?}", key, value); + } + } + } + data = Some(Data { fields, tags }); + } + } + Ok(data) + } +} + +#[cfg(test)] +mod tests { + use paho_mqtt::QOS_1; + + use super::*; + + #[test] + fn test_parse() -> Result<()> { + let mut parser = OpenMqttGatewayParser::new(); + let message = Message::new("blegateway/D12331654712/BTtoMQTT/283146C17616", "{\"id\":\"28:31:46:C1:76:16\",\"name\":\"DHS\",\"rssi\":-92,\"brand\":\"Oras\",\"model\":\"Hydractiva Digital\",\"model_id\":\"ADHS\",\"type\":\"ENRG\",\"session\":67,\"seconds\":115,\"litres\":9.1,\"tempc\":12,\"tempf\":53.6,\"energy\":0.03}", QOS_1); + let result = parser.parse(&message)?; + + assert!(result.is_some()); + let data = result.unwrap(); + let fields = data.fields; + assert_eq!(fields.get("rssi").unwrap().as_f64().unwrap(), -92f64); + assert_eq!(fields.get("seconds").unwrap().as_f64().unwrap(), 115f64); + let tags = data.tags; + assert_eq!(tags.get("device").unwrap(), "283146C17616"); + assert_eq!(tags.get("gateway").unwrap(), "D12331654712"); + assert_eq!(tags.get("name").unwrap(), "DHS"); + + Ok(()) + } +} + +pub fn create_logger(targets: Vec) -> (Arc>, Vec>) { + let mut txs: Vec> = Vec::new(); + let mut handles: Vec> = Vec::new(); + + for target in targets { + let (tx, handle) = match target { + Target::InfluxDB { + url, + database, + user, + password, + } => influx::spawn_influxdb_writer( + InfluxConfig::new(url, database, user, password), + std::convert::identity, + ), + Target::Postgresql { .. } => { + panic!("Postgresql not supported for open"); + } + }; + txs.push(tx); + handles.push(handle); + } + + let logger = OpenMqttGatewayLogger::new(txs); + + (Arc::new(Mutex::new(logger)), handles) +} diff --git a/src/data/shelly/mod.rs b/src/data/shelly/mod.rs index a1902ce..1269af8 100644 --- a/src/data/shelly/mod.rs +++ b/src/data/shelly/mod.rs @@ -12,11 +12,11 @@ use crate::WriteType; use anyhow::Result; use data::{CoverData, SwitchData}; use influxdb::{Timestamp, WriteQuery}; +use log::{debug, warn}; use paho_mqtt::Message; use regex::Regex; use serde::Deserialize; use std::thread::JoinHandle; -use log::{debug, warn}; pub trait Timestamped { fn timestamp(&self) -> Option; diff --git a/src/main.rs b/src/main.rs index 0f23da5..57023ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,23 @@ -use std::collections::HashMap; -use std::fmt::Debug; -use std::sync::{Arc, Mutex}; -use std::thread::JoinHandle; -use std::{fs, time::Duration}; -use std::path::Path; -use std::process::exit; use crate::config::SourceType; -use crate::data::CheckMessage; +use crate::data::{debug, openmqttgateway, CheckMessage}; use chrono::{DateTime, Utc}; use data::{klimalogger, opendtu, shelly}; use futures::{executor::block_on, stream::StreamExt}; +use log::{debug, error, info, warn}; use paho_mqtt as mqtt; use paho_mqtt::QOS_1; -use log::{debug, error, info, warn}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::path::Path; +use std::process::exit; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; +use std::{env, fs, time::Duration}; mod config; mod data; -mod target; mod source; +mod target; #[derive(Debug, Clone)] pub struct SensorReading { @@ -34,11 +34,14 @@ pub enum WriteType { } fn main() { + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "info") + } // Initialize the logger from the environment env_logger::init(); let config_file_path = determine_config_file_path(); - + let config_string = fs::read_to_string(config_file_path).expect("failed to read config file"); let config: config::Config = serde_yml::from_str(&config_string).expect("failed to parse config file"); @@ -51,10 +54,13 @@ fn main() { let mut qoss: Vec = Vec::new(); for source in config.sources { + let targets = source.targets.unwrap_or_default(); let (logger, mut source_handles) = match source.source_type { - SourceType::Shelly => shelly::create_logger(source.targets), - SourceType::Sensor => klimalogger::create_logger(source.targets), - SourceType::OpenDTU => opendtu::create_logger(source.targets), + SourceType::Shelly => shelly::create_logger(targets), + SourceType::Sensor => klimalogger::create_logger(targets), + SourceType::OpenDTU => opendtu::create_logger(targets), + SourceType::OpenMqttGateway => openmqttgateway::create_logger(targets), + SourceType::Debug => debug::create_logger(targets), }; handler_map.insert(source.prefix.clone(), logger); handles.append(&mut source_handles); @@ -136,7 +142,6 @@ fn determine_config_file_path() -> String { error!("ERROR: no configuration file found"); exit(10); } - + config_file_path.unwrap() } - diff --git a/src/source/mod.rs b/src/source/mod.rs index 4b85404..3ddf1cc 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -1 +1 @@ -pub(crate) mod mqtt; \ No newline at end of file +pub(crate) mod mqtt; diff --git a/src/source/mqtt/mod.rs b/src/source/mqtt/mod.rs index d0bc344..023e471 100644 --- a/src/source/mqtt/mod.rs +++ b/src/source/mqtt/mod.rs @@ -1,6 +1,6 @@ +use log::{error, info}; use paho_mqtt as mqtt; use std::process; -use log::{error, info}; pub fn create_mqtt_client(mqtt_url: String, mqtt_client_id: String) -> mqtt::AsyncClient { info!("Connecting to the MQTT server at '{}'...", mqtt_url); @@ -14,4 +14,4 @@ pub fn create_mqtt_client(mqtt_url: String, mqtt_client_id: String) -> mqtt::Asy error!("Error creating the client: {:?}", e); process::exit(1); }) -} \ No newline at end of file +} diff --git a/src/target/debug/mod.rs b/src/target/debug/mod.rs new file mode 100644 index 0000000..dace936 --- /dev/null +++ b/src/target/debug/mod.rs @@ -0,0 +1,54 @@ +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::thread; +use std::thread::JoinHandle; + +pub fn spawn_debug_logger() -> (SyncSender, JoinHandle<()>) { + spawn_debug_logger_internal() +} + +fn spawn_debug_logger_internal() -> (SyncSender, JoinHandle<()>) { + let (tx, rx) = sync_channel(100); + + ( + tx, + thread::spawn(move || { + info!( + "starting influx writer {} {}", + &influx_config.url, &influx_config.database + ); + + debug_writer(rx, influx_client, influx_config, query_mapper) + }), + ) +} + +fn debug_writer(rx: Receiver) { + block_on(async move { + info!("starting debug writer async"); + + loop { + let result = rx.recv(); + let data = match result { + Ok(query) => query, + Err(error) => { + warn!("error receiving query: {:?}", error); + break; + } + }; + let query = query_mapper(data); + let result = influx_client.query(query).await; + match result { + Ok(_) => {} + Err(error) => { + panic!( + "#### Error writing to influx: {} {}: {:?}", + &influx_config.url, &influx_config.database, error + ); + } + } + } + info!("exiting influx writer async"); + }); + + info!("exiting influx writer"); +} diff --git a/src/target/influx/mod.rs b/src/target/influx/mod.rs index 7f63f1e..21021a6 100644 --- a/src/target/influx/mod.rs +++ b/src/target/influx/mod.rs @@ -2,12 +2,12 @@ use async_trait::async_trait; //use anyhow::Result; use futures::executor::block_on; use influxdb::{Client, WriteQuery}; +use log::{info, warn}; #[cfg(test)] use mockall::automock; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread; use std::thread::JoinHandle; -use log::{info, warn}; pub struct InfluxConfig { url: String, diff --git a/src/target/postgres/mod.rs b/src/target/postgres/mod.rs index 43376a6..314dbea 100644 --- a/src/target/postgres/mod.rs +++ b/src/target/postgres/mod.rs @@ -1,5 +1,6 @@ use crate::SensorReading; use futures::executor::block_on; +use log::{error, info, warn}; #[cfg(test)] use mockall::automock; use postgres::types::ToSql; @@ -8,7 +9,6 @@ use postgres::{Error, NoTls}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread; use std::thread::JoinHandle; -use log::{error, info, warn}; pub struct PostgresConfig { host: String,