diff --git a/Cargo.lock b/Cargo.lock index 90c35c5..e94825f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1396,6 +1396,7 @@ dependencies = [ "env_logger", "futures", "influxdb", + "log", "mockall", "paho-mqtt", "postgres", diff --git a/Cargo.toml b/Cargo.toml index 3adc234..41ea9dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ serde_yml = { version = "0.0.12", features = [] } anyhow = "^1.0" regex = "^1.11" async-trait = "0.1.83" +log = "0.4.22" [dev-dependencies] mockall = "0.13.0" diff --git a/Dockerfile b/Dockerfile index a34cfa4..501ff22 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,4 +8,5 @@ RUN chmod a+x mqtt-gateway VOLUME /config +ENV RUST_LOG=info CMD ["./mqtt-gateway"] \ No newline at end of file diff --git a/src/config/mod.rs b/src/config/mod.rs index df566ba..4da6e2d 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -52,6 +52,7 @@ pub struct Config { mod tests { use super::*; use anyhow::Result; + use log::debug; #[test] fn test_deserialize_influxdb() -> Result<()> { @@ -85,7 +86,7 @@ mod tests { "#; let result: Target = serde_yml::from_str(&yaml).unwrap(); - println!("{:?}", result); + debug!("{:?}", result); if let Target::Postgresql { host, diff --git a/src/data/klimalogger/mod.rs b/src/data/klimalogger/mod.rs index 1ee54a5..e049a00 100644 --- a/src/data/klimalogger/mod.rs +++ b/src/data/klimalogger/mod.rs @@ -14,6 +14,7 @@ use paho_mqtt::Message; use serde::{Deserialize, Serialize}; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; +use log::{debug, warn}; #[derive(Serialize, Deserialize, Clone)] pub struct Data { @@ -57,7 +58,7 @@ impl CheckMessage for SensorLogger { let difference = now - date_time; let has_high_time_offset = difference.num_seconds() > 10; - println!( + debug!( "Sensor {} \"{}\": {:?} {:.2}s{}", location, measurement, @@ -86,7 +87,7 @@ impl CheckMessage for SensorLogger { tx.send(sensor_reading.clone()).expect("failed to send"); } } else { - println!("FAILED: {:?}, {:?}, {:?}", location, measurement, &result); + warn!("FAILED: {:?}, {:?}, {:?}", location, measurement, &result); } } } diff --git a/src/data/opendtu/mod.rs b/src/data/opendtu/mod.rs index 2b2c7fe..84e9931 100644 --- a/src/data/opendtu/mod.rs +++ b/src/data/opendtu/mod.rs @@ -11,6 +11,7 @@ use influxdb::WriteQuery; use paho_mqtt::Message; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; +use log::{debug, trace}; struct Data { timestamp: i64, @@ -84,7 +85,7 @@ impl OpenDTUParser { match element { "0" => { if let Some(timestamp) = self.timestamp { - println!( + debug!( "OpenDTU {} inverter: {:}: {:?}", section, field, @@ -101,22 +102,22 @@ impl OpenDTUParser { } } "device" => { - // ignore device global data - // println!(" device: {:}: {:?}", field, msg.payload_str()) + // ignore device global data + trace!(" device: {:}: {:?}", field, msg.payload_str()) } "status" => { if field == "last_update" { self.timestamp = Some(msg.payload_str().parse::()?); } else { // ignore other status data - // println!(" status: {:}: {:?}", field, msg.payload_str()); + trace!(" status: {:}: {:?}", field, msg.payload_str()); } } _ => { let payload = msg.payload_str(); if payload.len() > 0 { if let Some(timestamp) = self.timestamp { - println!( + debug!( "OpenDTU {} string {:}: {:}: {:?}", section, element, field, payload ); @@ -134,7 +135,7 @@ impl OpenDTUParser { } } else { // global options -> ignore for now - // println!(" global {:}.{:}: {:?}", section, element, msg.payload_str()) + trace!(" global {:}.{:}: {:?}", section, element, msg.payload_str()) } } diff --git a/src/data/shelly/mod.rs b/src/data/shelly/mod.rs index 76543e7..a1902ce 100644 --- a/src/data/shelly/mod.rs +++ b/src/data/shelly/mod.rs @@ -16,6 +16,7 @@ use paho_mqtt::Message; use regex::Regex; use serde::Deserialize; use std::thread::JoinHandle; +use log::{debug, warn}; pub trait Timestamped { fn timestamp(&self) -> Option; @@ -132,7 +133,7 @@ fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped + Typenam let channel = msg.topic().split(":").last().unwrap(); let result: Option = shelly::parse(msg).unwrap(); if let Some(data) = result { - println!("Shelly {}:{}: {:?}", location, channel, data); + debug!("Shelly {}:{}: {:?}", location, channel, data); if let Some(minute_ts) = data.timestamp() { let timestamp = Timestamp::Seconds(minute_ts as u128); @@ -157,7 +158,7 @@ fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped + Typenam } } } else { - println!("{} no timestamp {:?}", msg.topic(), msg.payload_str()); + warn!("{} no timestamp {:?}", msg.topic(), msg.payload_str()); } } } diff --git a/src/main.rs b/src/main.rs index 34cbde3..0f23da5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,8 @@ use data::{klimalogger, opendtu, shelly}; use futures::{executor::block_on, stream::StreamExt}; use paho_mqtt as mqtt; use paho_mqtt::QOS_1; +use log::{debug, error, info, warn}; + mod config; mod data; mod target; @@ -41,7 +43,7 @@ fn main() { let config: config::Config = serde_yml::from_str(&config_string).expect("failed to parse config file"); - println!("config: {:?}", config); + debug!("config: {:?}", config); let mut handler_map: HashMap>> = HashMap::new(); let mut handles: Vec> = Vec::new(); @@ -65,7 +67,7 @@ fn main() { if let Err(err) = block_on(async { // Get message stream before connecting. - let mut strm = mqtt_client.get_stream(25); + let mut strm = mqtt_client.get_stream(200); let conn_opts = mqtt::ConnectOptionsBuilder::new_v5() .keep_alive_interval(Duration::from_secs(30)) @@ -75,10 +77,10 @@ fn main() { mqtt_client.connect(conn_opts).await?; - println!("Subscribing to topics: {:?}", &topics); + info!("Subscribing to topics: {:?}", &topics); mqtt_client.subscribe_many(&topics, &qoss).await?; - println!("Waiting for messages..."); + info!("Waiting for messages..."); while let Some(msg_opt) = strm.next().await { if let Some(msg) = msg_opt { @@ -88,16 +90,16 @@ fn main() { if let Some(handler) = handler { handler.lock().unwrap().check_message(&msg); } else { - println!("unhandled prefix {} from topic {}", prefix, msg.topic()); + warn!("unhandled prefix {} from topic {}", prefix, msg.topic()); } } else { // A "None" means we were disconnected. Try to reconnect... - println!( + warn!( "Lost connection. Attempting reconnect. {:?}", mqtt_client.is_connected() ); while let Err(err) = mqtt_client.reconnect().await { - println!("Error reconnecting: {}", err); + warn!("Error reconnecting: {}", err); // For tokio use: tokio::time::delay_for() async_std::task::sleep(Duration::from_millis(1000)).await; } @@ -111,7 +113,7 @@ fn main() { // Explicit return type for the async block Ok::<(), mqtt::Error>(()) }) { - eprintln!("{}", err); + error!("{}", err); } } @@ -131,7 +133,7 @@ fn determine_config_file_path() -> String { } if config_file_path.is_none() { - println!("ERROR: no configuration file found"); + error!("ERROR: no configuration file found"); exit(10); } diff --git a/src/source/mqtt/mod.rs b/src/source/mqtt/mod.rs index a7238c1..d0bc344 100644 --- a/src/source/mqtt/mod.rs +++ b/src/source/mqtt/mod.rs @@ -1,8 +1,9 @@ use paho_mqtt as mqtt; use std::process; +use log::{error, info}; pub fn create_mqtt_client(mqtt_url: String, mqtt_client_id: String) -> mqtt::AsyncClient { - println!("Connecting to the MQTT server at '{}'...", mqtt_url); + info!("Connecting to the MQTT server at '{}'...", mqtt_url); let create_opts = mqtt::CreateOptionsBuilder::new_v3() .server_uri(mqtt_url) @@ -10,7 +11,7 @@ pub fn create_mqtt_client(mqtt_url: String, mqtt_client_id: String) -> mqtt::Asy .finalize(); mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { - println!("Error creating the client: {:?}", e); + error!("Error creating the client: {:?}", e); process::exit(1); }) } \ No newline at end of file diff --git a/src/target/influx/mod.rs b/src/target/influx/mod.rs index 8381879..7f63f1e 100644 --- a/src/target/influx/mod.rs +++ b/src/target/influx/mod.rs @@ -7,6 +7,7 @@ use mockall::automock; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread; use std::thread::JoinHandle; +use log::{info, warn}; pub struct InfluxConfig { url: String, @@ -75,7 +76,7 @@ fn influxdb_writer( query_mapper: fn(T) -> WriteQuery, ) { block_on(async move { - println!( + info!( "starting influx writer async {} {}", &influx_config.url, &influx_config.database ); @@ -85,7 +86,7 @@ fn influxdb_writer( let data = match result { Ok(query) => query, Err(error) => { - println!("error receiving query: {:?}", error); + warn!("error receiving query: {:?}", error); break; } }; @@ -101,10 +102,10 @@ fn influxdb_writer( } } } - println!("exiting influx writer async"); + info!("exiting influx writer async"); }); - println!("exiting influx writer"); + info!("exiting influx writer"); } pub fn spawn_influxdb_writer( @@ -126,7 +127,7 @@ fn spawn_influxdb_writer_internal( ( tx, thread::spawn(move || { - println!( + info!( "starting influx writer {} {}", &influx_config.url, &influx_config.database ); @@ -143,7 +144,7 @@ mod tests { // A mock `WriteQuery` for testing purposes fn mock_write_query(data: String) -> WriteQuery { - println!("mock write query {}", data); + info!("mock write query {}", data); assert_eq!(data, "test_data"); diff --git a/src/target/postgres/mod.rs b/src/target/postgres/mod.rs index 1ec2c36..43376a6 100644 --- a/src/target/postgres/mod.rs +++ b/src/target/postgres/mod.rs @@ -8,6 +8,7 @@ use postgres::{Error, NoTls}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread; use std::thread::JoinHandle; +use log::{error, info, warn}; pub struct PostgresConfig { host: String, @@ -64,14 +65,14 @@ impl PostgresClient for DefaultPostgresClient { fn start_postgres_writer(rx: Receiver, mut client: Box) { block_on(async move { - println!("starting postgres writer async"); + info!("starting postgres writer async"); loop { let result = rx.recv(); let query = match result { Ok(query) => query, Err(error) => { - println!("error receiving query: {:?}", error); + warn!("error receiving query: {:?}", error); break; } }; @@ -88,17 +89,17 @@ fn start_postgres_writer(rx: Receiver, mut client: Box {} Err(error) => { - eprintln!( + error!( "#### Error writing to postgres: {} {:?}", query.measurement, error ); } } } - println!("exiting influx writer async"); + info!("exiting influx writer async"); }); - println!("exiting influx writer"); + info!("exiting influx writer"); } pub fn spawn_postgres_writer( @@ -128,7 +129,7 @@ pub fn spawn_postgres_writer_internal( ( tx, thread::spawn(move || { - println!("starting postgres writer"); + info!("starting postgres writer"); start_postgres_writer(rx, client); }), )