Skip to content

Commit

Permalink
switch print statements to logging with different levels and set the …
Browse files Browse the repository at this point in the history
…default to info log level
  • Loading branch information
wuan committed Nov 9, 2024
1 parent 178898d commit 1d27cca
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde_yml = { version = "0.0.12", features = [] }
anyhow = "^1.0"
regex = "^1.11"
async-trait = "0.1.83"
log = "0.4.22"

[dev-dependencies]
mockall = "0.13.0"
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ RUN chmod a+x mqtt-gateway

VOLUME /config

ENV RUST_LOG=info
CMD ["./mqtt-gateway"]
3 changes: 2 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct Config {
mod tests {
use super::*;
use anyhow::Result;
use log::debug;

#[test]
fn test_deserialize_influxdb() -> Result<()> {
Expand Down Expand Up @@ -85,7 +86,7 @@ mod tests {
"#;

let result: Target = serde_yml::from_str(&yaml).unwrap();
println!("{:?}", result);
debug!("{:?}", result);

if let Target::Postgresql {
host,
Expand Down
5 changes: 3 additions & 2 deletions src/data/klimalogger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use paho_mqtt::Message;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use log::{debug, warn};

#[derive(Serialize, Deserialize, Clone)]
pub struct Data {
Expand Down Expand Up @@ -57,7 +58,7 @@ impl CheckMessage for SensorLogger {
let difference = now - date_time;

let has_high_time_offset = difference.num_seconds() > 10;
println!(
debug!(
"Sensor {} \"{}\": {:?} {:.2}s{}",
location,
measurement,
Expand Down Expand Up @@ -86,7 +87,7 @@ impl CheckMessage for SensorLogger {
tx.send(sensor_reading.clone()).expect("failed to send");
}
} else {
println!("FAILED: {:?}, {:?}, {:?}", location, measurement, &result);
warn!("FAILED: {:?}, {:?}, {:?}", location, measurement, &result);
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/data/opendtu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use influxdb::WriteQuery;
use paho_mqtt::Message;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use log::{debug, trace};

struct Data {
timestamp: i64,
Expand Down Expand Up @@ -84,7 +85,7 @@ impl OpenDTUParser {
match element {
"0" => {
if let Some(timestamp) = self.timestamp {
println!(
debug!(
"OpenDTU {} inverter: {:}: {:?}",
section,
field,
Expand All @@ -101,22 +102,22 @@ impl OpenDTUParser {
}
}
"device" => {
// ignore device global data
// println!(" device: {:}: {:?}", field, msg.payload_str())
// ignore device global data
trace!(" device: {:}: {:?}", field, msg.payload_str())
}
"status" => {
if field == "last_update" {
self.timestamp = Some(msg.payload_str().parse::<i64>()?);
} else {
// ignore other status data
// println!(" status: {:}: {:?}", field, msg.payload_str());
trace!(" status: {:}: {:?}", field, msg.payload_str());
}
}
_ => {
let payload = msg.payload_str();
if payload.len() > 0 {
if let Some(timestamp) = self.timestamp {
println!(
debug!(
"OpenDTU {} string {:}: {:}: {:?}",
section, element, field, payload
);
Expand All @@ -134,7 +135,7 @@ impl OpenDTUParser {
}
} else {
// global options -> ignore for now
// println!(" global {:}.{:}: {:?}", section, element, msg.payload_str())
trace!(" global {:}.{:}: {:?}", section, element, msg.payload_str())
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/data/shelly/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use paho_mqtt::Message;
use regex::Regex;
use serde::Deserialize;
use std::thread::JoinHandle;
use log::{debug, warn};

pub trait Timestamped {
fn timestamp(&self) -> Option<i64>;
Expand Down Expand Up @@ -132,7 +133,7 @@ fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped + Typenam
let channel = msg.topic().split(":").last().unwrap();
let result: Option<T> = shelly::parse(msg).unwrap();
if let Some(data) = result {
println!("Shelly {}:{}: {:?}", location, channel, data);
debug!("Shelly {}:{}: {:?}", location, channel, data);

if let Some(minute_ts) = data.timestamp() {
let timestamp = Timestamp::Seconds(minute_ts as u128);
Expand All @@ -157,7 +158,7 @@ fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped + Typenam
}
}
} else {
println!("{} no timestamp {:?}", msg.topic(), msg.payload_str());
warn!("{} no timestamp {:?}", msg.topic(), msg.payload_str());
}
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use data::{klimalogger, opendtu, shelly};
use futures::{executor::block_on, stream::StreamExt};
use paho_mqtt as mqtt;
use paho_mqtt::QOS_1;
use log::{debug, error, info, warn};

mod config;
mod data;
mod target;
Expand Down Expand Up @@ -41,7 +43,7 @@ fn main() {
let config: config::Config =
serde_yml::from_str(&config_string).expect("failed to parse config file");

println!("config: {:?}", config);
debug!("config: {:?}", config);

let mut handler_map: HashMap<String, Arc<Mutex<dyn CheckMessage>>> = HashMap::new();
let mut handles: Vec<JoinHandle<()>> = Vec::new();
Expand All @@ -65,7 +67,7 @@ fn main() {

if let Err(err) = block_on(async {
// Get message stream before connecting.
let mut strm = mqtt_client.get_stream(25);
let mut strm = mqtt_client.get_stream(200);

let conn_opts = mqtt::ConnectOptionsBuilder::new_v5()
.keep_alive_interval(Duration::from_secs(30))
Expand All @@ -75,10 +77,10 @@ fn main() {

mqtt_client.connect(conn_opts).await?;

println!("Subscribing to topics: {:?}", &topics);
info!("Subscribing to topics: {:?}", &topics);
mqtt_client.subscribe_many(&topics, &qoss).await?;

println!("Waiting for messages...");
info!("Waiting for messages...");

while let Some(msg_opt) = strm.next().await {
if let Some(msg) = msg_opt {
Expand All @@ -88,16 +90,16 @@ fn main() {
if let Some(handler) = handler {
handler.lock().unwrap().check_message(&msg);
} else {
println!("unhandled prefix {} from topic {}", prefix, msg.topic());
warn!("unhandled prefix {} from topic {}", prefix, msg.topic());
}
} else {
// A "None" means we were disconnected. Try to reconnect...
println!(
warn!(
"Lost connection. Attempting reconnect. {:?}",
mqtt_client.is_connected()
);
while let Err(err) = mqtt_client.reconnect().await {
println!("Error reconnecting: {}", err);
warn!("Error reconnecting: {}", err);
// For tokio use: tokio::time::delay_for()
async_std::task::sleep(Duration::from_millis(1000)).await;
}
Expand All @@ -111,7 +113,7 @@ fn main() {
// Explicit return type for the async block
Ok::<(), mqtt::Error>(())
}) {
eprintln!("{}", err);
error!("{}", err);
}
}

Expand All @@ -131,7 +133,7 @@ fn determine_config_file_path() -> String {
}

if config_file_path.is_none() {
println!("ERROR: no configuration file found");
error!("ERROR: no configuration file found");
exit(10);
}

Expand Down
5 changes: 3 additions & 2 deletions src/source/mqtt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use paho_mqtt as mqtt;
use std::process;
use log::{error, info};

pub fn create_mqtt_client(mqtt_url: String, mqtt_client_id: String) -> mqtt::AsyncClient {
println!("Connecting to the MQTT server at '{}'...", mqtt_url);
info!("Connecting to the MQTT server at '{}'...", mqtt_url);

let create_opts = mqtt::CreateOptionsBuilder::new_v3()
.server_uri(mqtt_url)
.client_id(mqtt_client_id)
.finalize();

mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
error!("Error creating the client: {:?}", e);
process::exit(1);
})
}
13 changes: 7 additions & 6 deletions src/target/influx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use mockall::automock;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread;
use std::thread::JoinHandle;
use log::{info, warn};

pub struct InfluxConfig {
url: String,
Expand Down Expand Up @@ -75,7 +76,7 @@ fn influxdb_writer<T>(
query_mapper: fn(T) -> WriteQuery,
) {
block_on(async move {
println!(
info!(
"starting influx writer async {} {}",
&influx_config.url, &influx_config.database
);
Expand All @@ -85,7 +86,7 @@ fn influxdb_writer<T>(
let data = match result {
Ok(query) => query,
Err(error) => {
println!("error receiving query: {:?}", error);
warn!("error receiving query: {:?}", error);
break;
}
};
Expand All @@ -101,10 +102,10 @@ fn influxdb_writer<T>(
}
}
}
println!("exiting influx writer async");
info!("exiting influx writer async");
});

println!("exiting influx writer");
info!("exiting influx writer");
}

pub fn spawn_influxdb_writer<T: Send + 'static>(
Expand All @@ -126,7 +127,7 @@ fn spawn_influxdb_writer_internal<T: Send + 'static>(
(
tx,
thread::spawn(move || {
println!(
info!(
"starting influx writer {} {}",
&influx_config.url, &influx_config.database
);
Expand All @@ -143,7 +144,7 @@ mod tests {

// A mock `WriteQuery` for testing purposes
fn mock_write_query(data: String) -> WriteQuery {
println!("mock write query {}", data);
info!("mock write query {}", data);

assert_eq!(data, "test_data");

Expand Down
13 changes: 7 additions & 6 deletions src/target/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use postgres::{Error, NoTls};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread;
use std::thread::JoinHandle;
use log::{error, info, warn};

pub struct PostgresConfig {
host: String,
Expand Down Expand Up @@ -64,14 +65,14 @@ impl PostgresClient for DefaultPostgresClient {

fn start_postgres_writer(rx: Receiver<SensorReading>, mut client: Box<dyn PostgresClient>) {
block_on(async move {
println!("starting postgres writer async");
info!("starting postgres writer async");

loop {
let result = rx.recv();
let query = match result {
Ok(query) => query,
Err(error) => {
println!("error receiving query: {:?}", error);
warn!("error receiving query: {:?}", error);
break;
}
};
Expand All @@ -88,17 +89,17 @@ fn start_postgres_writer(rx: Receiver<SensorReading>, mut client: Box<dyn Postgr
match x {
Ok(_) => {}
Err(error) => {
eprintln!(
error!(
"#### Error writing to postgres: {} {:?}",
query.measurement, error
);
}
}
}
println!("exiting influx writer async");
info!("exiting influx writer async");
});

println!("exiting influx writer");
info!("exiting influx writer");
}

pub fn spawn_postgres_writer(
Expand Down Expand Up @@ -128,7 +129,7 @@ pub fn spawn_postgres_writer_internal(
(
tx,
thread::spawn(move || {
println!("starting postgres writer");
info!("starting postgres writer");
start_postgres_writer(rx, client);
}),
)
Expand Down

0 comments on commit 1d27cca

Please sign in to comment.