Skip to content

Commit

Permalink
add tests for influx and postgres writing
Browse files Browse the repository at this point in the history
  • Loading branch information
wuan committed Oct 13, 2024
1 parent 251c32a commit 7cb30f8
Show file tree
Hide file tree
Showing 12 changed files with 671 additions and 269 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ jobs:
- name: Run Tests with Coverage
run: cargo tarpaulin --out Xml

- name: Run Clippy
run: cargo clippy --message-format=json &> clippy.json

- name: Update Sonar
uses: sonarsource/sonarqube-scan-action@v3
if: github.ref == 'refs/heads/main'
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
/.idea
/config.yml
/.envrc
/cobertura.xml
/clippy.json
72 changes: 72 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ postgres = { version = "0.19.9" , features = ["with-chrono-0_4"] }
serde_yaml = { version = "0.9.34+deprecated", features = [] }
anyhow = "1.0.89"
regex = "1.11.0"
async-trait = "0.1.83"

[dev-dependencies]
mockall = "0.13.0"
2 changes: 2 additions & 0 deletions sonar-project.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
sonar.projectKey=mqtt-gateway
sonar.projectName=mqtt-gateway
community.rust.cpd.ignoretests=true
community.rust.clippy.reportPaths=./clippy.json
56 changes: 52 additions & 4 deletions src/data/klimalogger/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::fmt;
use std::sync::mpsc::SyncSender;

use crate::config::Target;
use crate::data::CheckMessage;
use crate::target::influx;
use crate::target::influx::InfluxConfig;
use crate::target::postgres::PostgresConfig;
use crate::{target, SensorReading};
use anyhow::Result;
use chrono::{DateTime, Utc};
use influxdb::{Timestamp, WriteQuery};
use paho_mqtt::Message;
use serde::{Deserialize, Serialize};

use crate::data::CheckMessage;
use crate::SensorReading;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

#[derive(Serialize, Deserialize, Clone)]
pub struct Data {
Expand Down Expand Up @@ -43,7 +49,7 @@ impl CheckMessage for SensorLogger {

let location = split.nth(1);
let measurement = split.next();
let result = parse(&msg);
let result = parse(msg);
if let (Some(location), Some(measurement), Ok(result)) = (location, measurement, &result) {
let date_time = Self::convert_timestamp(result.timestamp as i64);

Expand Down Expand Up @@ -174,3 +180,45 @@ mod tests {
Ok(())
}
}

pub fn create_logger(targets: Vec<Target>) -> (Arc<Mutex<dyn CheckMessage>>, Vec<JoinHandle<()>>) {
let mut txs: Vec<SyncSender<SensorReading>> = Vec::new();
let mut handles: Vec<JoinHandle<()>> = Vec::new();

for target in targets {
let (tx, handle) = match target {
Target::InfluxDB {
url,
database,
user,
password,
} => {
fn mapper(result: SensorReading) -> WriteQuery {
let timestamp = Timestamp::Seconds(result.time.timestamp() as u128);
WriteQuery::new(timestamp, result.measurement.to_string())
.add_tag("location", result.location.to_string())
.add_tag("sensor", result.sensor.to_string())
.add_field("value", result.value)
}

influx::spawn_influxdb_writer(
InfluxConfig::new(url, database, user, password),
mapper,
)
}
Target::Postgresql {
host,
port,
user,
password,
database,
} => target::postgres::spawn_postgres_writer(PostgresConfig::new(
host, port, user, password, database,
)),
};
txs.push(tx);
handles.push(handle);
}

(Arc::new(Mutex::new(SensorLogger::new(txs))), handles)
}
38 changes: 35 additions & 3 deletions src/data/opendtu/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::sync::mpsc::SyncSender;

use crate::config::Target;
use crate::data::CheckMessage;
use crate::target::influx;
use crate::target::influx::InfluxConfig;
use anyhow::Result;
use chrono::Datelike;
use influxdb::Timestamp::Seconds;
use influxdb::WriteQuery;
use paho_mqtt::Message;

use crate::data::CheckMessage;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

struct Data {
timestamp: i64,
Expand Down Expand Up @@ -134,7 +138,7 @@ impl OpenDTUParser {
}
}

return Ok(data);
Ok(data)
}
}

Expand Down Expand Up @@ -192,3 +196,31 @@ mod tests {
Ok(())
}
}

pub fn create_logger(targets: Vec<Target>) -> (Arc<Mutex<dyn CheckMessage>>, Vec<JoinHandle<()>>) {
let mut txs: Vec<SyncSender<WriteQuery>> = Vec::new();
let mut handles: Vec<JoinHandle<()>> = Vec::new();

for target in targets {
let (tx, handle) = match target {
Target::InfluxDB {
url,
database,
user,
password,
} => influx::spawn_influxdb_writer(
InfluxConfig::new(url, database, user, password),
std::convert::identity,
),
Target::Postgresql { .. } => {
panic!("Postgresql not supported for opendtu");
}
};
txs.push(tx);
handles.push(handle);
}

let logger = OpenDTULogger::new(txs);

(Arc::new(Mutex::new(logger)), handles)
}
Loading

0 comments on commit 7cb30f8

Please sign in to comment.