Skip to content

Commit

Permalink
add postgres test
Browse files Browse the repository at this point in the history
  • Loading branch information
wuan committed Oct 12, 2024
1 parent 6278a43 commit 99eca49
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 42 deletions.
34 changes: 25 additions & 9 deletions src/data/shelly/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ const COVER_FIELDS: &[(&str, fn(data: &CoverData) -> Option<WriteType>, &str)] =
),
];

static SWITCH_REGEX: LazyLock<Regex, fn() -> Regex> = LazyLock::new(|| Regex::new("/status/switch:.").unwrap());
static COVER_REGEX: LazyLock<Regex, fn() -> Regex> = LazyLock::new(|| Regex::new("/status/cover:.").unwrap());
static SWITCH_REGEX: LazyLock<Regex, fn() -> Regex> =
LazyLock::new(|| Regex::new("/status/switch:.").unwrap());
static COVER_REGEX: LazyLock<Regex, fn() -> Regex> =
LazyLock::new(|| Regex::new("/status/cover:.").unwrap());

impl CheckMessage for ShellyLogger {
fn check_message(&mut self, msg: &Message) {
Expand Down Expand Up @@ -245,22 +247,32 @@ mod tests {
logger.check_message(&message);

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("output,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=bool value=0i "));
assert!(result.starts_with(
"output,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=bool value=0i "
));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("power,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=W value=0 "));
assert!(result.starts_with(
"power,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=W value=0 "
));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("current,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=A value=3.0999"));
assert!(result.starts_with(
"current,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=A value=3.0999"
));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("voltage,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=V value=226.5 "));
assert!(result.starts_with(
"voltage,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=V value=226.5 "
));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("total_energy,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=Wh value=1094.86"));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("temperature,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=°C value=36.40"));
assert!(result.starts_with(
"temperature,location=loo-fan,channel=1,sensor=shelly,type=switch,unit=°C value=36.40"
));

let result = rx.recv_timeout(Duration::from_micros(100));
assert!(result.is_err());
Expand All @@ -281,10 +293,14 @@ mod tests {
assert!(result.starts_with("position,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=% value=100i "));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("power,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=W value=0 "));
assert!(result.starts_with(
"power,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=W value=0 "
));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("current,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=A value=0.5 "));
assert!(result.starts_with(
"current,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=A value=0.5 "
));

let result = rx.recv()?.build()?.get();
assert!(result.starts_with("voltage,location=bedroom-curtain,channel=0,sensor=shelly,type=cover,unit=V value=231.6999"));
Expand Down
12 changes: 6 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ mod data;
mod target;

#[derive(Debug, Clone)]
struct SensorReading {
measurement: String,
time: DateTime<Utc>,
location: String,
sensor: String,
value: f32,
pub struct SensorReading {
pub measurement: String,
pub time: DateTime<Utc>,
pub location: String,
pub sensor: String,
pub value: f32,
}

pub enum WriteType {
Expand Down
31 changes: 15 additions & 16 deletions src/target/influx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ impl InfluxClient for DefaultInfluxClient {
fn create_influxdb_client(influx_config: &InfluxConfig) -> anyhow::Result<Box<dyn InfluxClient>> {
let mut influx_client = Client::new(influx_config.url.clone(), influx_config.database.clone());

influx_client =
if let (Some(user), Some(password)) = (influx_config.user.clone(), influx_config.password.clone()) {
influx_client.with_auth(user, password)
} else {
influx_client
};
influx_client = if let (Some(user), Some(password)) =
(influx_config.user.clone(), influx_config.password.clone())
{
influx_client.with_auth(user, password)
} else {
influx_client
};

Ok(DefaultInfluxClient::new(influx_client))
}
Expand Down Expand Up @@ -110,7 +111,8 @@ pub fn spawn_influxdb_writer<T: Send + 'static>(
influx_config: InfluxConfig,
query_mapper: fn(T) -> WriteQuery,
) -> (SyncSender<T>, JoinHandle<()>) {
let influx_client = create_influxdb_client(&influx_config).expect("could not create influxdb client");
let influx_client =
create_influxdb_client(&influx_config).expect("could not create influxdb client");
spawn_influxdb_writer_internal(influx_client, influx_config, query_mapper)
}

Expand Down Expand Up @@ -150,7 +152,7 @@ mod tests {
.add_field("field", influxdb::Type::Float(1.23))
}

//
//
#[test]
fn test_influxdb_writer_internal() -> anyhow::Result<()> {
let influx_config = InfluxConfig::new(
Expand All @@ -160,18 +162,15 @@ mod tests {
Some("password".to_string()),
);


let mut mock_client = Box::new(MockInfluxClient::new());
mock_client.expect_query()
mock_client
.expect_query()
.times(1)
.returning(|_| Ok("Success".to_string()));

// Run the `influxdb_writer` function
let (tx, join_handle) = spawn_influxdb_writer_internal(
mock_client,
influx_config,
mock_write_query,
);
let (tx, join_handle) =
spawn_influxdb_writer_internal(mock_client, influx_config, mock_write_query);

// Send a test query
tx.send("test_data".to_string()).unwrap();
Expand All @@ -183,4 +182,4 @@ mod tests {

Ok(())
}
}
}
99 changes: 88 additions & 11 deletions src/target/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::SensorReading;
use futures::executor::block_on;
use postgres::{Config, NoTls};
#[cfg(test)]
use mockall::automock;
use postgres::types::ToSql;
use postgres::Client;
use postgres::{Error, NoTls};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread;
use std::thread::JoinHandle;
Expand Down Expand Up @@ -31,11 +35,34 @@ impl PostgresConfig {
}
}

fn start_postgres_writer(rx: Receiver<SensorReading>, config: Config) {
let mut client = config
.connect(NoTls)
.expect("failed to connect to postgres");
#[cfg_attr(test, automock)]
pub trait PostgresClient: Send {
fn execute<'a>(
&mut self,
query: &String,
params: &'a [&'a (dyn ToSql + Sync)],
) -> Result<u64, Error>;
}

struct DefaultPostgresClient {
client: Client,
}

impl DefaultPostgresClient {
fn new(client: Client) -> Self {
DefaultPostgresClient { client }
}
}

impl DefaultPostgresClient {}

impl PostgresClient for DefaultPostgresClient {
fn execute(&mut self, query: &String, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
self.client.execute(query, params)
}
}

fn start_postgres_writer(rx: Receiver<SensorReading>, mut client: Box<dyn PostgresClient>) {
block_on(async move {
println!("starting postgres writer async");

Expand Down Expand Up @@ -77,21 +104,71 @@ fn start_postgres_writer(rx: Receiver<SensorReading>, config: Config) {
pub fn spawn_postgres_writer(
config: PostgresConfig,
) -> (SyncSender<SensorReading>, JoinHandle<()>) {
let (tx, rx) = sync_channel(100);
let client = create_postgres_client(&config);
spawn_postgres_writer_internal(client)
}

let mut db_config = postgres::Config::new();
let _ = db_config
fn create_postgres_client(config: &PostgresConfig) -> Box<dyn PostgresClient> {
let client = postgres::Config::new()
.host(&config.host)
.port(config.port)
.user(&config.username)
.password(config.password)
.dbname(&config.database);
.password(&config.password)
.dbname(&config.database)
.connect(NoTls)
.expect("failed to connect to Postgres database");
Box::new(DefaultPostgresClient::new(client))
}

pub fn spawn_postgres_writer_internal(
client: Box<dyn PostgresClient>,
) -> (SyncSender<SensorReading>, JoinHandle<()>) {
let (tx, rx) = sync_channel(100);

(
tx,
thread::spawn(move || {
println!("starting postgres writer");
start_postgres_writer(rx, db_config);
start_postgres_writer(rx, client);
}),
)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_postgres_writer_internal() -> anyhow::Result<()> {
let sensor_reading = SensorReading {
measurement: "measurement".to_string(),
time: chrono::Utc::now(),
location: "location".to_string(),
sensor: "sensor".to_string(),
value: 123.4,
};

let sensor_reading_duplicate = sensor_reading.clone();

let mut mock_client = Box::new(MockPostgresClient::new());
mock_client.expect_execute()
.times(1)
.withf(move |query, parameters| {
let expected_parameters: [&dyn ToSql; 4] = [&sensor_reading_duplicate.time, &sensor_reading_duplicate.location, &sensor_reading_duplicate.sensor, &sensor_reading_duplicate.value];
query == "insert into \"measurement\" (time, location, sensor, value) values ($1, $2, $3, $4);" ||
parameters.len() == expected_parameters.len() &&
parameters.iter().zip(expected_parameters.iter()).all(|(a, b)| format!("{a:?}") == format!("{b:?}"))
})
.returning(|_, _| Ok(123));

let (tx, join_handle) = spawn_postgres_writer_internal(mock_client);

tx.send(sensor_reading).unwrap();

drop(tx);

let _ = join_handle.join();

Ok(())
}
}

0 comments on commit 99eca49

Please sign in to comment.