diff --git a/src/config/mod.rs b/src/config/mod.rs index edd6e48..cd36710 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -22,11 +22,12 @@ pub struct Source { #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] #[serde(tag = "type")] pub enum Target { - #[serde(rename = "influxdb")] + #[serde(rename = "influxdb")] InfluxDB { - host: String, - port: u16, + url: String, database: String, + user: Option, + password: Option, }, #[serde(rename = "postgresql")] Postgresql { diff --git a/src/data/klimalogger/mod.rs b/src/data/klimalogger/mod.rs index af112e5..7f5641d 100644 --- a/src/data/klimalogger/mod.rs +++ b/src/data/klimalogger/mod.rs @@ -25,12 +25,12 @@ impl fmt::Debug for Data { } pub struct SensorLogger { - tx: Vec::>, + txs: Vec::>, } impl SensorLogger { pub(crate) fn new(tx: Vec::>) -> Self { - SensorLogger { tx } + SensorLogger { txs: tx } } } @@ -59,7 +59,7 @@ impl CheckMessage for SensorLogger { calculated: result.calculated, }; - for tx in &self.tx { + for tx in &self.txs { tx.send(sensor_reading.clone()).expect("failed to send"); } } diff --git a/src/data/opendtu/mod.rs b/src/data/opendtu/mod.rs index ae648b1..cd832d0 100644 --- a/src/data/opendtu/mod.rs +++ b/src/data/opendtu/mod.rs @@ -16,13 +16,13 @@ struct Data { } pub struct OpenDTULogger { - tx: SyncSender, + txs: Vec::>, parser: OpenDTUParser, } impl OpenDTULogger { - pub(crate) fn new(tx: SyncSender) -> Self { - OpenDTULogger { tx, parser: OpenDTUParser::new() } + pub(crate) fn new(txs: Vec::>) -> Self { + OpenDTULogger { txs, parser: OpenDTUParser::new() } } } @@ -39,7 +39,9 @@ impl CheckMessage for OpenDTULogger { } else { write_query }; - let _ = self.tx.send(write_query); + for tx in &self.txs { + tx.send(write_query.clone()).expect("failed to send"); + } } } } diff --git a/src/data/shelly/mod.rs b/src/data/shelly/mod.rs index 08b895d..63f8fa4 100644 --- a/src/data/shelly/mod.rs +++ b/src/data/shelly/mod.rs @@ -75,12 +75,12 @@ impl fmt::Debug for TemperatureData { } pub struct ShellyLogger { - tx: SyncSender, + txs: Vec::>, } impl ShellyLogger { - pub(crate) fn new(tx: SyncSender) -> Self { - ShellyLogger { tx } + pub(crate) fn new(txs: Vec::>) -> Self { + ShellyLogger { txs } } } @@ -114,14 +114,14 @@ impl CheckMessage for ShellyLogger { fn check_message(&mut self, msg: &Message) { let topic = msg.topic(); if topic.ends_with("/status/switch:0") { - handle_message(msg, &self.tx, SWITCH_FIELDS); + handle_message(msg, &self.txs, SWITCH_FIELDS); } else if topic.ends_with("/status/cover:0") { - handle_message(msg, &self.tx, COVER_FIELDS); + handle_message(msg, &self.txs, COVER_FIELDS); } } } -fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped>(msg: &'a Message, tx: &SyncSender, fields: &[(&str, fn(&T) -> Option, &str)]) { +fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped>(msg: &'a Message, txs: &Vec::>, fields: &[(&str, fn(&T) -> Option, &str)]) { let location = msg.topic().split("/").nth(1).unwrap(); let result: Option = shelly::parse(&msg).unwrap(); if let Some(data) = result { @@ -145,7 +145,10 @@ fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped>(msg: &'a .add_tag("sensor", "shelly") .add_tag("type", "switch") .add_tag("unit", unit); - tx.send(query).expect("failed to send"); + + for tx in txs { + tx.send(query.clone()).expect("failed to send"); + } } } } else { diff --git a/src/main.rs b/src/main.rs index 6730916..6914a3d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -134,19 +134,43 @@ fn main() { } } -fn create_shelly_logger(_vec: Vec) -> (Arc::>, Vec::>) { - let (tx, rx) = sync_channel(100); +fn create_shelly_logger(targets: Vec) -> (Arc::>, Vec::>) { + let mut txs: Vec> = Vec::new(); + let mut handles: Vec> = Vec::new(); - let influx_writer_handle = thread::spawn(move || { - println!("starting influx writer"); - let database = "iot"; + for target in targets { + let (tx, handle) = match target { + Target::InfluxDB { url, database, user, password } => { + spawn_influxdb_writer(InfluxConfig::new(url, database, user, password), std::convert::identity) + } + Target::Postgresql { .. } => { + panic!("Postgresql not supported for shelly"); + } + }; + txs.push(tx); + handles.push(handle); + } - start_influx_writer(rx, database, std::convert::identity); - }); - let logger = ShellyLogger::new(tx); + (Arc::new(Mutex::new(ShellyLogger::new(txs))), handles) +} + +struct InfluxConfig { + url: String, + database: String, + user: Option, + password: Option, +} - (Arc::new(Mutex::new(logger)), vec![influx_writer_handle]) +impl InfluxConfig { + fn new(url: String, database: String, user: Option, password: Option) -> Self { + Self { + url, + database, + user, + password, + } + } } fn create_sensor_logger(targets: Vec) -> (Arc::>, Vec::>) { @@ -155,30 +179,23 @@ fn create_sensor_logger(targets: Vec) -> (Arc:: { - let (tx, rx) = sync_channel(100); - - (tx, thread::spawn(move || { - println!("starting influx writer"); - let database = "klima"; - - fn mapper(result: SensorReading) -> WriteQuery { - let timestamp = Timestamp::Seconds(result.time.timestamp() as u128); - let write_query = WriteQuery::new(timestamp, "data") - .add_tag("type", result.measurement.to_string()) - .add_tag("location", result.location.to_string()) - .add_tag("sensor", result.sensor.to_string()) - .add_tag("calculated", result.calculated) - .add_field("value", result.value); - if result.unit != "" { - write_query.add_tag("unit", result.unit.to_string()) - } else { - write_query - } + Target::InfluxDB { url, database, user, password } => { + fn mapper(result: SensorReading) -> WriteQuery { + let timestamp = Timestamp::Seconds(result.time.timestamp() as u128); + let write_query = WriteQuery::new(timestamp, "data") + .add_tag("type", result.measurement.to_string()) + .add_tag("location", result.location.to_string()) + .add_tag("sensor", result.sensor.to_string()) + .add_tag("calculated", result.calculated) + .add_field("value", result.value); + if result.unit != "" { + write_query.add_tag("unit", result.unit.to_string()) + } else { + write_query } + } - start_influx_writer(rx, database, mapper); - })) + spawn_influxdb_writer(InfluxConfig::new(url, database, user, password), mapper) } Target::Postgresql { host, port, user, password, database } => { let (tx, rx) = sync_channel(100); @@ -204,26 +221,54 @@ fn create_sensor_logger(targets: Vec) -> (Arc::) -> (Arc::>, Vec::>) { - let (tx, rx) = sync_channel(100); +fn create_opendtu_logger(targets: Vec) -> (Arc::>, Vec::>) { + let mut txs: Vec> = Vec::new(); + let mut handles: Vec> = Vec::new(); - let influx_writer_handle = thread::spawn(move || { - println!("starting OpenDTU influx writer"); - start_influx_writer(rx, "solar", std::convert::identity); - }); + for target in targets { + let (tx, handle) = match target { + Target::InfluxDB { url, database, user, password } => { + spawn_influxdb_writer(InfluxConfig::new(url, database, user, password), std::convert::identity) + } + Target::Postgresql { .. } => { + panic!("Postgresql not supported for opendtu"); + } + }; + txs.push(tx); + handles.push(handle); + } + + let logger = OpenDTULogger::new(txs); - let logger = OpenDTULogger::new(tx); + (Arc::new(Mutex::new(logger)), handles) +} + +fn spawn_influxdb_writer(config: InfluxConfig, mapper: fn(T) -> WriteQuery) -> (SyncSender, JoinHandle<()>) { + let (tx, rx) = sync_channel(100); - (Arc::new(Mutex::new(logger)), vec![influx_writer_handle]) + (tx, thread::spawn(move || { + println!("starting influx writer {} {}", &config.url, &config.database); + + influxdb_writer(rx, config, mapper); + })) } -fn start_influx_writer(iot_rx: Receiver, database: &str, query_mapper: fn(T) -> WriteQuery) { - let influx_client = Client::new("http://influx:8086", database); +fn influxdb_writer(rx: Receiver, influx_config: InfluxConfig, query_mapper: fn(T) -> WriteQuery) { + let influx_url = influx_config.url.clone(); + let influx_database = influx_config.database.clone(); + + let mut influx_client = Client::new(influx_config.url, influx_config.database); + influx_client = if let (Some(user), Some(password)) = (influx_config.user, influx_config.password) { + influx_client.with_auth(user, password) + } else { + influx_client + }; + block_on(async move { - println!("starting influx writer async"); + println!("starting influx writer async {} {}", &influx_url, &influx_database); loop { - let result = iot_rx.recv(); + let result = rx.recv(); let data = match result { Ok(query) => { query } Err(error) => { @@ -232,7 +277,13 @@ fn start_influx_writer(iot_rx: Receiver, database: &str, query_mapper: fn( } }; let query = query_mapper(data); - let _ = influx_client.query(query).await.expect("failed to write to influx"); + let result = influx_client.query(query).await; + match result { + Ok(_) => {} + Err(error) => { + panic!("#### Error writing to influx: {} {}: {:?}", &influx_url, &influx_database, error); + } + } } println!("exiting influx writer async"); });