diff --git a/src/data/shelly/mod.rs b/src/data/shelly/mod.rs index b7a06bc..2ed4475 100644 --- a/src/data/shelly/mod.rs +++ b/src/data/shelly/mod.rs @@ -173,8 +173,10 @@ const COVER_FIELDS: &[(&str, fn(data: &CoverData) -> Option, &str)] = ), ]; -static SWITCH_REGEX: LazyLock Regex> = LazyLock::new(|| Regex::new("/status/switch:.").unwrap()); -static COVER_REGEX: LazyLock Regex> = LazyLock::new(|| Regex::new("/status/cover:.").unwrap()); +static SWITCH_REGEX: LazyLock Regex> = + LazyLock::new(|| Regex::new("/status/switch:.").unwrap()); +static COVER_REGEX: LazyLock Regex> = + LazyLock::new(|| Regex::new("/status/cover:.").unwrap()); impl CheckMessage for ShellyLogger { fn check_message(&mut self, msg: &Message) { @@ -245,22 +247,32 @@ mod tests { logger.check_message(&message); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("output,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=bool value=0i ")); + assert!(result.starts_with( + "output,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=bool value=0i " + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("power,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=W value=0 ")); + assert!(result.starts_with( + "power,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=W value=0 " + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("current,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=A value=3.0999")); + assert!(result.starts_with( + "current,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=A value=3.0999" + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("voltage,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=V value=226.5 ")); + assert!(result.starts_with( + "voltage,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=V value=226.5 " + )); let result = rx.recv()?.build()?.get(); assert!(result.starts_with("total_energy,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=Wh value=1094.86")); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("temperature,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=°C value=36.40")); + assert!(result.starts_with( + "temperature,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=°C value=36.40" + )); let result = rx.recv_timeout(Duration::from_micros(100)); assert!(result.is_err()); @@ -281,10 +293,14 @@ mod tests { assert!(result.starts_with("position,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=% value=100i ")); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("power,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=W value=0 ")); + assert!(result.starts_with( + "power,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=W value=0 " + )); let result = rx.recv()?.build()?.get(); - assert!(result.starts_with("current,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=A value=0.5 ")); + assert!(result.starts_with( + "current,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=A value=0.5 " + )); let result = rx.recv()?.build()?.get(); assert!(result.starts_with("voltage,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=V value=231.6999")); diff --git a/src/main.rs b/src/main.rs index 26c5412..faa037f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,12 +24,12 @@ mod data; mod target; #[derive(Debug, Clone)] -struct SensorReading { - measurement: String, - time: DateTime, - location: String, - sensor: String, - value: f32, +pub struct SensorReading { + pub measurement: String, + pub time: DateTime, + pub location: String, + pub sensor: String, + pub value: f32, } pub enum WriteType { diff --git a/src/target/influx/mod.rs b/src/target/influx/mod.rs index 970e768..fd522dc 100644 --- a/src/target/influx/mod.rs +++ b/src/target/influx/mod.rs @@ -57,12 +57,13 @@ impl InfluxClient for DefaultInfluxClient { fn create_influxdb_client(influx_config: &InfluxConfig) -> anyhow::Result> { let mut influx_client = Client::new(influx_config.url.clone(), influx_config.database.clone()); - influx_client = - if let (Some(user), Some(password)) = (influx_config.user.clone(), influx_config.password.clone()) { - influx_client.with_auth(user, password) - } else { - influx_client - }; + influx_client = if let (Some(user), Some(password)) = + (influx_config.user.clone(), influx_config.password.clone()) + { + influx_client.with_auth(user, password) + } else { + influx_client + }; Ok(DefaultInfluxClient::new(influx_client)) } @@ -110,7 +111,8 @@ pub fn spawn_influxdb_writer( influx_config: InfluxConfig, query_mapper: fn(T) -> WriteQuery, ) -> (SyncSender, JoinHandle<()>) { - let influx_client = create_influxdb_client(&influx_config).expect("could not create influxdb client"); + let influx_client = + create_influxdb_client(&influx_config).expect("could not create influxdb client"); spawn_influxdb_writer_internal(influx_client, influx_config, query_mapper) } @@ -150,7 +152,7 @@ mod tests { .add_field("field", influxdb::Type::Float(1.23)) } - // + // #[test] fn test_influxdb_writer_internal() -> anyhow::Result<()> { let influx_config = InfluxConfig::new( @@ -160,18 +162,15 @@ mod tests { Some("password".to_string()), ); - let mut mock_client = Box::new(MockInfluxClient::new()); - mock_client.expect_query() + mock_client + .expect_query() .times(1) .returning(|_| Ok("Success".to_string())); // Run the `influxdb_writer` function - let (tx, join_handle) = spawn_influxdb_writer_internal( - mock_client, - influx_config, - mock_write_query, - ); + let (tx, join_handle) = + spawn_influxdb_writer_internal(mock_client, influx_config, mock_write_query); // Send a test query tx.send("test_data".to_string()).unwrap(); @@ -183,4 +182,4 @@ mod tests { Ok(()) } -} \ No newline at end of file +} diff --git a/src/target/postgres/mod.rs b/src/target/postgres/mod.rs index dc3857c..4779d75 100644 --- a/src/target/postgres/mod.rs +++ b/src/target/postgres/mod.rs @@ -1,6 +1,10 @@ use crate::SensorReading; use futures::executor::block_on; -use postgres::{Config, NoTls}; +#[cfg(test)] +use mockall::automock; +use postgres::types::ToSql; +use postgres::Client; +use postgres::{Error, NoTls}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread; use std::thread::JoinHandle; @@ -31,11 +35,34 @@ impl PostgresConfig { } } -fn start_postgres_writer(rx: Receiver, config: Config) { - let mut client = config - .connect(NoTls) - .expect("failed to connect to postgres"); +#[cfg_attr(test, automock)] +pub trait PostgresClient: Send { + fn execute<'a>( + &mut self, + query: &String, + params: &'a [&'a (dyn ToSql + Sync)], + ) -> Result; +} + +struct DefaultPostgresClient { + client: Client, +} + +impl DefaultPostgresClient { + fn new(client: Client) -> Self { + DefaultPostgresClient { client } + } +} + +impl DefaultPostgresClient {} +impl PostgresClient for DefaultPostgresClient { + fn execute(&mut self, query: &String, params: &[&(dyn ToSql + Sync)]) -> Result { + self.client.execute(query, params) + } +} + +fn start_postgres_writer(rx: Receiver, mut client: Box) { block_on(async move { println!("starting postgres writer async"); @@ -77,21 +104,71 @@ fn start_postgres_writer(rx: Receiver, config: Config) { pub fn spawn_postgres_writer( config: PostgresConfig, ) -> (SyncSender, JoinHandle<()>) { - let (tx, rx) = sync_channel(100); + let client = create_postgres_client(&config); + spawn_postgres_writer_internal(client) +} - let mut db_config = postgres::Config::new(); - let _ = db_config +fn create_postgres_client(config: &PostgresConfig) -> Box { + let client = postgres::Config::new() .host(&config.host) .port(config.port) .user(&config.username) - .password(config.password) - .dbname(&config.database); + .password(&config.password) + .dbname(&config.database) + .connect(NoTls) + .expect("failed to connect to Postgres database"); + Box::new(DefaultPostgresClient::new(client)) +} + +pub fn spawn_postgres_writer_internal( + client: Box, +) -> (SyncSender, JoinHandle<()>) { + let (tx, rx) = sync_channel(100); ( tx, thread::spawn(move || { println!("starting postgres writer"); - start_postgres_writer(rx, db_config); + start_postgres_writer(rx, client); }), ) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_postgres_writer_internal() -> anyhow::Result<()> { + let sensor_reading = SensorReading { + measurement: "measurement".to_string(), + time: chrono::Utc::now(), + location: "location".to_string(), + sensor: "sensor".to_string(), + value: 123.4, + }; + + let sensor_reading_duplicate = sensor_reading.clone(); + + let mut mock_client = Box::new(MockPostgresClient::new()); + mock_client.expect_execute() + .times(1) + .withf(move |query, parameters| { + let expected_parameters: [&dyn ToSql; 4] = [&sensor_reading_duplicate.time, &sensor_reading_duplicate.location, &sensor_reading_duplicate.sensor, &sensor_reading_duplicate.value]; + query == "insert into \"measurement\" (time, location, sensor, value) values ($1, $2, $3, $4);" || + parameters.len() == expected_parameters.len() && + parameters.iter().zip(expected_parameters.iter()).all(|(a, b)| format!("{a:?}") == format!("{b:?}")) + }) + .returning(|_, _| Ok(123)); + + let (tx, join_handle) = spawn_postgres_writer_internal(mock_client); + + tx.send(sensor_reading).unwrap(); + + drop(tx); + + let _ = join_handle.join(); + + Ok(()) + } +}