Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ impl ValidatedHeartbeat {
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn process_validated_heartbeats(
pub async fn process_validated_heartbeats(
validated_heartbeats: impl Stream<Item = anyhow::Result<ValidatedHeartbeat>>,
heartbeat_cache: &Cache<(String, DateTime<Utc>), ()>,
coverage_claim_time_cache: &CoverageClaimTimeCache,
Expand Down
211 changes: 210 additions & 1 deletion mobile_verifier/tests/integrations/heartbeats.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use chrono::{DateTime, Utc};
use file_store::file_sink::FileSinkClient;
use futures::stream;
use futures_util::TryStreamExt;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{HeartbeatValidity, LocationSource};
use mobile_verifier::{
cell_type::CellType,
heartbeats::{HbType, Heartbeat, HeartbeatReward, ValidatedHeartbeat},
coverage::CoverageClaimTimeCache,
heartbeats::{
process_validated_heartbeats, HbType, Heartbeat, HeartbeatReward, ValidatedHeartbeat,
},
};
use retainer::Cache;
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use sqlx::PgPool;
use tokio::sync::mpsc;
use uuid::Uuid;

#[sqlx::test]
Expand Down Expand Up @@ -259,3 +266,205 @@ VALUES

Ok(())
}

#[sqlx::test]
async fn test_process_validated_heartbeats_with_restart(pool: PgPool) -> anyhow::Result<()> {
let coverage_object = Uuid::new_v4();

// Closure to create heartbeats with different parameters
let create_heartbeat = |minutes: u32,
distance_to_asserted: i64,
location_trust_score_multiplier: Decimal|
-> ValidatedHeartbeat {
ValidatedHeartbeat {
heartbeat: Heartbeat {
hb_type: HbType::Wifi,
hotspot_key: "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"
.parse()
.unwrap(),
operation_mode: true,
lat: 0.0,
lon: 0.0,
coverage_object: Some(coverage_object),
location_validation_timestamp: None,
timestamp: format!("2023-08-23 00:{:02}:00.000000000 UTC", minutes)
.parse()
.unwrap(),
location_source: LocationSource::Gps,
},
cell_type: CellType::NovaGenericWifiOutdoor,
distance_to_asserted: Some(distance_to_asserted),
coverage_meta: None,
location_trust_score_multiplier,
validity: HeartbeatValidity::Valid,
}
};

// Create two similar heartbeats in the same hour with different trust scores
let hb1 = create_heartbeat(5, 0, dec!(1.0));
// pt - poor trust
let hb2 = create_heartbeat(10, 2000, dec!(0.0));

// Create mock file sinks
let (heartbeat_tx, mut heartbeat_rx) = mpsc::channel(10);
let (seniority_tx, _) = mpsc::channel(10);
let heartbeat_sink = FileSinkClient::new(heartbeat_tx, "test_heartbeats");
let seniority_sink = FileSinkClient::new(seniority_tx, "test_seniority");

// Create cache and coverage claim time cache
let heartbeat_cache = Cache::<(String, DateTime<Utc>), ()>::new();
let coverage_claim_time_cache = CoverageClaimTimeCache::new();

// Create stream of validated heartbeats
let hb_list = vec![hb1];
let heartbeats = stream::iter(hb_list.into_iter().map(Ok));

// Start transaction
let mut transaction = pool.begin().await?;

// Process the heartbeats
process_validated_heartbeats(
heartbeats,
&heartbeat_cache,
&coverage_claim_time_cache,
&heartbeat_sink,
&seniority_sink,
&mut transaction,
)
.await?;
transaction.commit().await?;

// The service here was stopped, reloaded, so cache is clear
// Second hb is arrived
let hb_list = vec![hb2];
let heartbeats = stream::iter(hb_list.into_iter().map(Ok));

heartbeat_cache.clear().await;

let mut transaction = pool.begin().await?;
// Process the heartbeats
process_validated_heartbeats(
heartbeats,
&heartbeat_cache,
&coverage_claim_time_cache,
&heartbeat_sink,
&seniority_sink,
&mut transaction,
)
.await?;
transaction.commit().await?;

let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM wifi_heartbeats")
.fetch_one(&pool)
.await?;

assert_eq!(count, 1); // Only 1 row due to ON CONFLICT

// Verify the heartbeat data in database (should be the latest one - poor trust)
let db_heartbeat: (String, Decimal, DateTime<Utc>) = sqlx::query_as(
"SELECT hotspot_key::text, location_trust_score_multiplier, latest_timestamp FROM wifi_heartbeats",
)
.fetch_one(&pool)
.await?;

// The result is next:
// [mobile_verifier/tests/integrations/heartbeats.rs:370:5] db_heartbeat = (
// "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL",
// 1.0,
// 2023-08-23T00:10:00Z,
// )
dbg!(db_heartbeat);
// As we see the database has inconsistent record. Multiplier 1 belongs to the heartbeat with
// timestamp 2023-08-23T00:05:00Z,
Ok(())
}

#[sqlx::test]
async fn test_process_validated_heartbeats_no_restart(pool: PgPool) -> anyhow::Result<()> {
let coverage_object = Uuid::new_v4();

// Closure to create heartbeats with different parameters
let create_heartbeat = |minutes: u32,
distance_to_asserted: i64,
location_trust_score_multiplier: Decimal|
-> ValidatedHeartbeat {
ValidatedHeartbeat {
heartbeat: Heartbeat {
hb_type: HbType::Wifi,
hotspot_key: "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"
.parse()
.unwrap(),
operation_mode: true,
lat: 0.0,
lon: 0.0,
coverage_object: Some(coverage_object),
location_validation_timestamp: None,
timestamp: format!("2023-08-23 00:{:02}:00.000000000 UTC", minutes)
.parse()
.unwrap(),
location_source: LocationSource::Gps,
},
cell_type: CellType::NovaGenericWifiOutdoor,
distance_to_asserted: Some(distance_to_asserted),
coverage_meta: None,
location_trust_score_multiplier,
validity: HeartbeatValidity::Valid,
}
};

// Create two similar heartbeats in the same hour with different trust scores
let hb1 = create_heartbeat(5, 0, dec!(1.0));
// pt - poor trust
let hb2 = create_heartbeat(10, 2000, dec!(0.0));

// Create mock file sinks
let (heartbeat_tx, mut heartbeat_rx) = mpsc::channel(10);
let (seniority_tx, _) = mpsc::channel(10);
let heartbeat_sink = FileSinkClient::new(heartbeat_tx, "test_heartbeats");
let seniority_sink = FileSinkClient::new(seniority_tx, "test_seniority");

// Create cache and coverage claim time cache
let heartbeat_cache = Cache::<(String, DateTime<Utc>), ()>::new();
let coverage_claim_time_cache = CoverageClaimTimeCache::new();

// Create stream of validated heartbeats
let hb_list = vec![hb1, hb2];
let heartbeats = stream::iter(hb_list.into_iter().map(Ok));

// Start transaction
let mut transaction = pool.begin().await?;

// Process the heartbeats
process_validated_heartbeats(
heartbeats,
&heartbeat_cache,
&coverage_claim_time_cache,
&heartbeat_sink,
&seniority_sink,
&mut transaction,
)
.await?;
transaction.commit().await?;

let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM wifi_heartbeats")
.fetch_one(&pool)
.await?;

assert_eq!(count, 1); // Only 1 row due to ON CONFLICT

// Verify the heartbeat data in database (should be the latest one - poor trust)
let db_heartbeat: (String, Decimal, DateTime<Utc>) = sqlx::query_as(
"SELECT hotspot_key::text, location_trust_score_multiplier, latest_timestamp FROM wifi_heartbeats",
)
.fetch_one(&pool)
.await?;

// The result is next:
// [mobile_verifier/tests/integrations/heartbeats.rs:370:5] db_heartbeat = (
// "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL",
// 1.0,
// 2023-08-23T00:05:00Z,
// )
dbg!(db_heartbeat);
Ok(())
}
Loading