Skip to content

Commit

Permalink
add debug and openmqttgateway data
Browse files Browse the repository at this point in the history
  • Loading branch information
wuan committed Nov 27, 2024
1 parent 98e4d72 commit 687ac4d
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 30 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ sources:
prefix: "sensors"
targets:
- type: "influxdb"
host: "<influx host>"
port: 8086
url: "http://<host>:8086"
database: "sensors"
- type: "postgresql"
host: "<postgres host>"
Expand Down
14 changes: 11 additions & 3 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ pub enum SourceType {
Sensor,
#[serde(rename = "opendtu")]
OpenDTU,
#[serde(rename = "openmqttgateway")]
OpenMqttGateway,
#[serde(rename = "debug")]
Debug,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
Expand All @@ -16,7 +20,7 @@ pub struct Source {
#[serde(rename = "type")]
pub(crate) source_type: SourceType,
pub(crate) prefix: String,
pub(crate) targets: Vec<Target>,
pub(crate) targets: Option<Vec<Target>>,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
Expand All @@ -37,6 +41,9 @@ pub enum Target {
password: String,
database: String,
},
// #[serde(rename = "debug")]
// Debug {
// },
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -126,8 +133,9 @@ mod tests {
assert_eq!(result.source_type, SourceType::Sensor);
assert_eq!(result.prefix, "bar");

assert_eq!(result.targets.len(), 1);
let target = &result.targets[0];
let targets = result.targets.unwrap();
assert_eq!(targets.len(), 1);
let target = &targets[0];

if let Target::InfluxDB { url, database, .. } = target {
assert_eq!(url, "baz");
Expand Down
48 changes: 48 additions & 0 deletions src/data/debug/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::fmt;

use crate::config::Target;
use crate::data::CheckMessage;
use log::{info, warn};
use paho_mqtt::Message;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

#[derive(Serialize, Deserialize, Clone)]
pub struct Data {
#[serde(rename = "time")]
pub(crate) timestamp: i32,
pub(crate) value: f32,
pub(crate) sensor: String,
}

impl fmt::Debug for Data {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} (@{}, {})", self.value, self.timestamp, self.sensor)
}
}

pub struct DebugLogger {}

impl DebugLogger {
pub(crate) fn new() -> Self {
DebugLogger {}
}
}

impl CheckMessage for DebugLogger {
fn check_message(&mut self, msg: &Message) {
let topic = msg.topic();
let payload = msg.payload_str();

info!("'{}' with {}", topic, payload);
}
}

pub fn create_logger(targets: Vec<Target>) -> (Arc<Mutex<dyn CheckMessage>>, Vec<JoinHandle<()>>) {
if targets.len() > 0 {
warn!("debug type has targets defined: {:?}", &targets);
}

(Arc::new(Mutex::new(DebugLogger::new())), Vec::new())
}
2 changes: 1 addition & 1 deletion src/data/klimalogger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use crate::{target, SensorReading};
use anyhow::Result;
use chrono::{DateTime, Utc};
use influxdb::{Timestamp, WriteQuery};
use log::{debug, warn};
use paho_mqtt::Message;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use log::{debug, warn};

#[derive(Serialize, Deserialize, Clone)]
pub struct Data {
Expand Down
2 changes: 2 additions & 0 deletions src/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use paho_mqtt::Message;
use serde::{Deserialize, Serialize};

pub(crate) mod debug;
pub(crate) mod klimalogger;
pub(crate) mod opendtu;
pub(crate) mod openmqttgateway;
pub(crate) mod shelly;

#[derive(Debug, Deserialize, Serialize)]
Expand Down
4 changes: 2 additions & 2 deletions src/data/opendtu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use anyhow::Result;
use chrono::Datelike;
use influxdb::Timestamp::Seconds;
use influxdb::WriteQuery;
use log::{debug, trace};
use paho_mqtt::Message;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use log::{debug, trace};

struct Data {
timestamp: i64,
Expand Down Expand Up @@ -102,7 +102,7 @@ impl OpenDTUParser {
}
}
"device" => {
// ignore device global data
// ignore device global data
trace!(" device: {:}: {:?}", field, msg.payload_str())
}
"status" => {
Expand Down
161 changes: 161 additions & 0 deletions src/data/openmqttgateway/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::collections::HashMap;
use std::sync::mpsc::SyncSender;

use crate::config::Target;
use crate::data::CheckMessage;
use crate::target::influx;
use crate::target::influx::InfluxConfig;
use anyhow::Result;
use influxdb::Timestamp::Seconds;
use influxdb::WriteQuery;
use log::warn;
use paho_mqtt::Message;
use serde_json::{Map, Number, Value};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

struct Data {
fields: HashMap<String, Number>,
tags: HashMap<String, String>,
}

pub struct OpenMqttGatewayLogger {
txs: Vec<SyncSender<WriteQuery>>,
parser: OpenMqttGatewayParser,
}

impl OpenMqttGatewayLogger {
pub(crate) fn new(txs: Vec<SyncSender<WriteQuery>>) -> Self {
OpenMqttGatewayLogger {
txs,
parser: OpenMqttGatewayParser::new(),
}
}
}

impl CheckMessage for OpenMqttGatewayLogger {
fn check_message(&mut self, msg: &Message) {
let data = self.parser.parse(msg).unwrap();
if let Some(data) = data {
let timestamp = chrono::offset::Utc::now();

let mut write_query = WriteQuery::new(Seconds(timestamp.timestamp() as u128), "btle");
for (key, value) in data.fields {
write_query = write_query.add_field(key, value.as_f64());
}
for (key, value) in data.tags {
write_query = write_query.add_tag(key, value);
}
for tx in &self.txs {
tx.send(write_query.clone()).expect("failed to send");
}
}
}
}

fn parse_json(payload: &str) -> Result<Map<String, Value>> {
let parsed: Value = serde_json::from_str(payload)?;
let obj: Map<String, Value> = parsed.as_object().unwrap().clone();
Ok(obj)
}

struct OpenMqttGatewayParser {}

impl OpenMqttGatewayParser {
pub fn new() -> Self {
OpenMqttGatewayParser {}
}

fn parse(&mut self, msg: &Message) -> Result<Option<Data>> {
let mut data: Option<Data> = None;

let mut split = msg.topic().split("/");
let _ = split.next();
let gateway_id = split.next();
let channel = split.next();
let device_id = split.next();

if let (Some(gateway_id), Some(channel), Some(device_id)) = (gateway_id, channel, device_id)
{
if channel == "BTtoMQTT" {
let mut result = parse_json(&msg.payload_str())?;

let _ = result.remove("id");

let mut fields = HashMap::new();
let mut tags = HashMap::new();
tags.insert(String::from("device"), String::from(device_id));
tags.insert(String::from("gateway"), String::from(gateway_id));
for (key, value) in result {
match value {
Value::Number(value) => {
fields.insert(key, value);
}
Value::String(value) => {
tags.insert(key, value);
}
_ => {
warn!("unhandled entry {}: {:?}", key, value);
}
}
}
data = Some(Data { fields, tags });
}
}
Ok(data)
}
}

#[cfg(test)]
mod tests {
use paho_mqtt::QOS_1;

use super::*;

#[test]
fn test_parse() -> Result<()> {
let mut parser = OpenMqttGatewayParser::new();
let message = Message::new("blegateway/D12331654712/BTtoMQTT/283146C17616", "{\"id\":\"28:31:46:C1:76:16\",\"name\":\"DHS\",\"rssi\":-92,\"brand\":\"Oras\",\"model\":\"Hydractiva Digital\",\"model_id\":\"ADHS\",\"type\":\"ENRG\",\"session\":67,\"seconds\":115,\"litres\":9.1,\"tempc\":12,\"tempf\":53.6,\"energy\":0.03}", QOS_1);
let result = parser.parse(&message)?;

assert!(result.is_some());
let data = result.unwrap();
let fields = data.fields;
assert_eq!(fields.get("rssi").unwrap().as_f64().unwrap(), -92f64);
assert_eq!(fields.get("seconds").unwrap().as_f64().unwrap(), 115f64);
let tags = data.tags;
assert_eq!(tags.get("device").unwrap(), "283146C17616");
assert_eq!(tags.get("gateway").unwrap(), "D12331654712");
assert_eq!(tags.get("name").unwrap(), "DHS");

Ok(())
}
}

pub fn create_logger(targets: Vec<Target>) -> (Arc<Mutex<dyn CheckMessage>>, Vec<JoinHandle<()>>) {
let mut txs: Vec<SyncSender<WriteQuery>> = Vec::new();
let mut handles: Vec<JoinHandle<()>> = Vec::new();

for target in targets {
let (tx, handle) = match target {
Target::InfluxDB {
url,
database,
user,
password,
} => influx::spawn_influxdb_writer(
InfluxConfig::new(url, database, user, password),
std::convert::identity,
),
Target::Postgresql { .. } => {
panic!("Postgresql not supported for open");
}
};
txs.push(tx);
handles.push(handle);
}

let logger = OpenMqttGatewayLogger::new(txs);

(Arc::new(Mutex::new(logger)), handles)
}
2 changes: 1 addition & 1 deletion src/data/shelly/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use crate::WriteType;
use anyhow::Result;
use data::{CoverData, SwitchData};
use influxdb::{Timestamp, WriteQuery};
use log::{debug, warn};
use paho_mqtt::Message;
use regex::Regex;
use serde::Deserialize;
use std::thread::JoinHandle;
use log::{debug, warn};

pub trait Timestamped {
fn timestamp(&self) -> Option<i64>;
Expand Down
37 changes: 21 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::{fs, time::Duration};
use std::path::Path;
use std::process::exit;
use crate::config::SourceType;
use crate::data::CheckMessage;
use crate::data::{debug, openmqttgateway, CheckMessage};
use chrono::{DateTime, Utc};
use data::{klimalogger, opendtu, shelly};
use futures::{executor::block_on, stream::StreamExt};
use log::{debug, error, info, warn};
use paho_mqtt as mqtt;
use paho_mqtt::QOS_1;
use log::{debug, error, info, warn};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::process::exit;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::{env, fs, time::Duration};

mod config;
mod data;
mod target;
mod source;
mod target;

#[derive(Debug, Clone)]
pub struct SensorReading {
Expand All @@ -34,11 +34,14 @@ pub enum WriteType {
}

fn main() {
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "info")
}
// Initialize the logger from the environment
env_logger::init();

let config_file_path = determine_config_file_path();

let config_string = fs::read_to_string(config_file_path).expect("failed to read config file");
let config: config::Config =
serde_yml::from_str(&config_string).expect("failed to parse config file");
Expand All @@ -51,10 +54,13 @@ fn main() {
let mut qoss: Vec<i32> = Vec::new();

for source in config.sources {
let targets = source.targets.unwrap_or_default();
let (logger, mut source_handles) = match source.source_type {
SourceType::Shelly => shelly::create_logger(source.targets),
SourceType::Sensor => klimalogger::create_logger(source.targets),
SourceType::OpenDTU => opendtu::create_logger(source.targets),
SourceType::Shelly => shelly::create_logger(targets),
SourceType::Sensor => klimalogger::create_logger(targets),
SourceType::OpenDTU => opendtu::create_logger(targets),
SourceType::OpenMqttGateway => openmqttgateway::create_logger(targets),
SourceType::Debug => debug::create_logger(targets),
};
handler_map.insert(source.prefix.clone(), logger);
handles.append(&mut source_handles);
Expand Down Expand Up @@ -136,7 +142,6 @@ fn determine_config_file_path() -> String {
error!("ERROR: no configuration file found");
exit(10);
}

config_file_path.unwrap()
}

Loading

0 comments on commit 687ac4d

Please sign in to comment.