Skip to content

Commit

Permalink
fixed configurabilty
Browse files Browse the repository at this point in the history
  • Loading branch information
wuan committed Jan 3, 2024
1 parent 2b8bfb9 commit a231507
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 61 deletions.
7 changes: 4 additions & 3 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ pub struct Source {
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(tag = "type")]
pub enum Target {
#[serde(rename = "influxdb")]
#[serde(rename = "influxdb")]
InfluxDB {
host: String,
port: u16,
url: String,
database: String,
user: Option<String>,
password: Option<String>,
},
#[serde(rename = "postgresql")]
Postgresql {
Expand Down
6 changes: 3 additions & 3 deletions src/data/klimalogger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ impl fmt::Debug for Data {
}

pub struct SensorLogger {
tx: Vec::<SyncSender<SensorReading>>,
txs: Vec::<SyncSender<SensorReading>>,
}

impl SensorLogger {
pub(crate) fn new(tx: Vec::<SyncSender<SensorReading>>) -> Self {
SensorLogger { tx }
SensorLogger { txs: tx }
}
}

Expand Down Expand Up @@ -59,7 +59,7 @@ impl CheckMessage for SensorLogger {
calculated: result.calculated,
};

for tx in &self.tx {
for tx in &self.txs {
tx.send(sensor_reading.clone()).expect("failed to send");
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/data/opendtu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ struct Data {
}

pub struct OpenDTULogger {
tx: SyncSender<WriteQuery>,
txs: Vec::<SyncSender<WriteQuery>>,
parser: OpenDTUParser,
}

impl OpenDTULogger {
pub(crate) fn new(tx: SyncSender<WriteQuery>) -> Self {
OpenDTULogger { tx, parser: OpenDTUParser::new() }
pub(crate) fn new(txs: Vec::<SyncSender<WriteQuery>>) -> Self {
OpenDTULogger { txs, parser: OpenDTUParser::new() }
}
}

Expand All @@ -39,7 +39,9 @@ impl CheckMessage for OpenDTULogger {
} else {
write_query
};
let _ = self.tx.send(write_query);
for tx in &self.txs {
tx.send(write_query.clone()).expect("failed to send");
}
}
}
}
Expand Down
17 changes: 10 additions & 7 deletions src/data/shelly/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ impl fmt::Debug for TemperatureData {
}

pub struct ShellyLogger {
tx: SyncSender<WriteQuery>,
txs: Vec::<SyncSender<WriteQuery>>,
}

impl ShellyLogger {
pub(crate) fn new(tx: SyncSender<WriteQuery>) -> Self {
ShellyLogger { tx }
pub(crate) fn new(txs: Vec::<SyncSender<WriteQuery>>) -> Self {
ShellyLogger { txs }
}
}

Expand Down Expand Up @@ -114,14 +114,14 @@ impl CheckMessage for ShellyLogger {
fn check_message(&mut self, msg: &Message) {
let topic = msg.topic();
if topic.ends_with("/status/switch:0") {
handle_message(msg, &self.tx, SWITCH_FIELDS);
handle_message(msg, &self.txs, SWITCH_FIELDS);
} else if topic.ends_with("/status/cover:0") {
handle_message(msg, &self.tx, COVER_FIELDS);
handle_message(msg, &self.txs, COVER_FIELDS);
}
}
}

fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped>(msg: &'a Message, tx: &SyncSender<WriteQuery>, fields: &[(&str, fn(&T) -> Option<WriteType>, &str)]) {
fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped>(msg: &'a Message, txs: &Vec::<SyncSender<WriteQuery>>, fields: &[(&str, fn(&T) -> Option<WriteType>, &str)]) {
let location = msg.topic().split("/").nth(1).unwrap();
let result: Option<T> = shelly::parse(&msg).unwrap();
if let Some(data) = result {
Expand All @@ -145,7 +145,10 @@ fn handle_message<'a, T: Deserialize<'a> + Clone + Debug + Timestamped>(msg: &'a
.add_tag("sensor", "shelly")
.add_tag("type", "switch")
.add_tag("unit", unit);
tx.send(query).expect("failed to send");

for tx in txs {
tx.send(query.clone()).expect("failed to send");
}
}
}
} else {
Expand Down
139 changes: 95 additions & 44 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,43 @@ fn main() {
}
}

fn create_shelly_logger(_vec: Vec<Target>) -> (Arc::<Mutex::<dyn CheckMessage>>, Vec::<JoinHandle<()>>) {
let (tx, rx) = sync_channel(100);
fn create_shelly_logger(targets: Vec<Target>) -> (Arc::<Mutex::<dyn CheckMessage>>, Vec::<JoinHandle<()>>) {
let mut txs: Vec<SyncSender<WriteQuery>> = Vec::new();
let mut handles: Vec<JoinHandle<()>> = Vec::new();

let influx_writer_handle = thread::spawn(move || {
println!("starting influx writer");
let database = "iot";
for target in targets {
let (tx, handle) = match target {
Target::InfluxDB { url, database, user, password } => {
spawn_influxdb_writer(InfluxConfig::new(url, database, user, password), std::convert::identity)
}
Target::Postgresql { .. } => {
panic!("Postgresql not supported for shelly");
}
};
txs.push(tx);
handles.push(handle);
}

start_influx_writer(rx, database, std::convert::identity);
});

let logger = ShellyLogger::new(tx);
(Arc::new(Mutex::new(ShellyLogger::new(txs))), handles)
}

struct InfluxConfig {
url: String,
database: String,
user: Option<String>,
password: Option<String>,
}

(Arc::new(Mutex::new(logger)), vec![influx_writer_handle])
impl InfluxConfig {
fn new(url: String, database: String, user: Option<String>, password: Option<String>) -> Self {
Self {
url,
database,
user,
password,
}
}
}

fn create_sensor_logger(targets: Vec<Target>) -> (Arc::<Mutex::<dyn CheckMessage>>, Vec::<JoinHandle<()>>) {
Expand All @@ -155,30 +179,23 @@ fn create_sensor_logger(targets: Vec<Target>) -> (Arc::<Mutex::<dyn CheckMessage

for target in targets {
let (tx, handle) = match target {
Target::InfluxDB { .. } => {
let (tx, rx) = sync_channel(100);

(tx, thread::spawn(move || {
println!("starting influx writer");
let database = "klima";

fn mapper(result: SensorReading) -> WriteQuery {
let timestamp = Timestamp::Seconds(result.time.timestamp() as u128);
let write_query = WriteQuery::new(timestamp, "data")
.add_tag("type", result.measurement.to_string())
.add_tag("location", result.location.to_string())
.add_tag("sensor", result.sensor.to_string())
.add_tag("calculated", result.calculated)
.add_field("value", result.value);
if result.unit != "" {
write_query.add_tag("unit", result.unit.to_string())
} else {
write_query
}
Target::InfluxDB { url, database, user, password } => {
fn mapper(result: SensorReading) -> WriteQuery {
let timestamp = Timestamp::Seconds(result.time.timestamp() as u128);
let write_query = WriteQuery::new(timestamp, "data")
.add_tag("type", result.measurement.to_string())
.add_tag("location", result.location.to_string())
.add_tag("sensor", result.sensor.to_string())
.add_tag("calculated", result.calculated)
.add_field("value", result.value);
if result.unit != "" {
write_query.add_tag("unit", result.unit.to_string())
} else {
write_query
}
}

start_influx_writer(rx, database, mapper);
}))
spawn_influxdb_writer(InfluxConfig::new(url, database, user, password), mapper)
}
Target::Postgresql { host, port, user, password, database } => {
let (tx, rx) = sync_channel(100);
Expand All @@ -204,26 +221,54 @@ fn create_sensor_logger(targets: Vec<Target>) -> (Arc::<Mutex::<dyn CheckMessage
(Arc::new(Mutex::new(SensorLogger::new(txs))), handles)
}

fn create_opendtu_logger(_vec: Vec<Target>) -> (Arc::<Mutex::<dyn CheckMessage>>, Vec::<JoinHandle<()>>) {
let (tx, rx) = sync_channel(100);
fn create_opendtu_logger(targets: Vec<Target>) -> (Arc::<Mutex::<dyn CheckMessage>>, Vec::<JoinHandle<()>>) {
let mut txs: Vec<SyncSender<WriteQuery>> = Vec::new();
let mut handles: Vec<JoinHandle<()>> = Vec::new();

let influx_writer_handle = thread::spawn(move || {
println!("starting OpenDTU influx writer");
start_influx_writer(rx, "solar", std::convert::identity);
});
for target in targets {
let (tx, handle) = match target {
Target::InfluxDB { url, database, user, password } => {
spawn_influxdb_writer(InfluxConfig::new(url, database, user, password), std::convert::identity)
}
Target::Postgresql { .. } => {
panic!("Postgresql not supported for opendtu");
}
};
txs.push(tx);
handles.push(handle);
}

let logger = OpenDTULogger::new(txs);

let logger = OpenDTULogger::new(tx);
(Arc::new(Mutex::new(logger)), handles)
}

fn spawn_influxdb_writer<T: Send + 'static>(config: InfluxConfig, mapper: fn(T) -> WriteQuery) -> (SyncSender<T>, JoinHandle<()>) {
let (tx, rx) = sync_channel(100);

(Arc::new(Mutex::new(logger)), vec![influx_writer_handle])
(tx, thread::spawn(move || {
println!("starting influx writer {} {}", &config.url, &config.database);

influxdb_writer(rx, config, mapper);
}))
}

fn start_influx_writer<T>(iot_rx: Receiver<T>, database: &str, query_mapper: fn(T) -> WriteQuery) {
let influx_client = Client::new("http://influx:8086", database);
fn influxdb_writer<T>(rx: Receiver<T>, influx_config: InfluxConfig, query_mapper: fn(T) -> WriteQuery) {
let influx_url = influx_config.url.clone();
let influx_database = influx_config.database.clone();

let mut influx_client = Client::new(influx_config.url, influx_config.database);
influx_client = if let (Some(user), Some(password)) = (influx_config.user, influx_config.password) {
influx_client.with_auth(user, password)
} else {
influx_client
};

block_on(async move {
println!("starting influx writer async");
println!("starting influx writer async {} {}", &influx_url, &influx_database);

loop {
let result = iot_rx.recv();
let result = rx.recv();
let data = match result {
Ok(query) => { query }
Err(error) => {
Expand All @@ -232,7 +277,13 @@ fn start_influx_writer<T>(iot_rx: Receiver<T>, database: &str, query_mapper: fn(
}
};
let query = query_mapper(data);
let _ = influx_client.query(query).await.expect("failed to write to influx");
let result = influx_client.query(query).await;
match result {
Ok(_) => {}
Err(error) => {
panic!("#### Error writing to influx: {} {}: {:?}", &influx_url, &influx_database, error);
}
}
}
println!("exiting influx writer async");
});
Expand Down

0 comments on commit a231507

Please sign in to comment.