Skip to content

Commit 28eefd7

Browse files
committed
Connect old APIs to new DB
1 parent 8c63ebb commit 28eefd7

File tree

6 files changed

+100
-105
lines changed

6 files changed

+100
-105
lines changed

iot_config/migrations/20251027000000_gateways.sql

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,10 @@ CREATE TABLE IF NOT EXISTS gateways (
1212
location_changed_at TIMESTAMPTZ,
1313
refreshed_at TIMESTAMPTZ NOT NULL,
1414
updated_at TIMESTAMPTZ NOT NULL
15-
);
15+
);
16+
17+
CREATE INDEX IF NOT EXISTS gateways_last_changed_idx ON gateways (last_changed_at DESC);
18+
19+
CREATE INDEX IF NOT EXISTS gateways_location_changed_idx ON gateways (location_changed_at DESC)
20+
WHERE
21+
location IS NOT NULL;

iot_config/src/cli/daemon.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl Daemon {
3636

3737
let gateway_svc = GatewayService::new(
3838
signing_keypair.clone(),
39-
metadata_pool.clone(),
39+
pool.clone(),
4040
region_map.clone(),
4141
auth_cache.clone(),
4242
delegate_key_cache,

iot_config/src/gateway/db.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use chrono::{DateTime, Utc};
2+
use futures::{Stream, StreamExt, TryStreamExt};
23
use helium_crypto::PublicKeyBinary;
34
use sqlx::{postgres::PgRow, FromRow, PgExecutor, PgPool, Postgres, QueryBuilder, Row};
45

@@ -122,6 +123,42 @@ impl Gateway {
122123

123124
Ok(gateway)
124125
}
126+
127+
pub fn stream<'a>(
128+
db: impl PgExecutor<'a> + 'a,
129+
min_last_changed_at: DateTime<Utc>,
130+
min_location_changed_at: Option<DateTime<Utc>>,
131+
) -> impl Stream<Item = Self> + 'a {
132+
sqlx::query_as::<_, Self>(
133+
r#"
134+
SELECT
135+
address,
136+
created_at,
137+
elevation,
138+
gain,
139+
hash,
140+
is_active,
141+
is_full_hotspot,
142+
last_changed_at,
143+
location,
144+
location_asserts,
145+
location_changed_at,
146+
refreshed_at,
147+
updated_at
148+
FROM gateways
149+
WHERE last_changed_at >= $1
150+
AND (
151+
$2::timestamptz IS NULL
152+
OR (location IS NOT NULL AND location_changed_at >= $2)
153+
)
154+
"#,
155+
)
156+
.bind(min_last_changed_at)
157+
.bind(min_location_changed_at)
158+
.fetch(db)
159+
.map_err(anyhow::Error::from)
160+
.filter_map(|res| async move { res.ok() })
161+
}
125162
}
126163

127164
impl FromRow<'_, PgRow> for Gateway {

iot_config/src/gateway/service/info.rs

Lines changed: 43 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,40 @@
1-
use crate::region_map;
1+
use crate::{gateway::db::Gateway, region_map};
22
use anyhow::anyhow;
3-
use futures::stream::BoxStream;
3+
use chrono::{DateTime, Utc};
4+
use futures::{stream::BoxStream, Stream, StreamExt};
45
use helium_crypto::PublicKeyBinary;
56
use helium_proto::{
67
services::iot_config::{
78
GatewayInfo as GatewayInfoProto, GatewayMetadata as GatewayMetadataProto,
89
},
910
Region,
1011
};
12+
use sqlx::PgExecutor;
1113

1214
pub type GatewayInfoStream = BoxStream<'static, GatewayInfo>;
1315

16+
// Hotspot gain default; dbi * 10
17+
const DEFAULT_GAIN: u32 = 12;
18+
// Hotspot elevation default; meters above sea level
19+
const DEFAULT_ELEVATION: u32 = 0;
20+
21+
pub async fn get(
22+
db: impl PgExecutor<'_>,
23+
address: &PublicKeyBinary,
24+
) -> anyhow::Result<Option<IotMetadata>> {
25+
let gateway = Gateway::get_by_address(db, address).await?;
26+
Ok(gateway.map(IotMetadata::from))
27+
}
28+
29+
pub fn stream<'a>(
30+
db: impl PgExecutor<'a> + 'a,
31+
min_last_changed_at: DateTime<Utc>,
32+
min_location_changed_at: Option<DateTime<Utc>>,
33+
) -> impl Stream<Item = IotMetadata> + 'a {
34+
let stream = Gateway::stream(db, min_last_changed_at, min_location_changed_at);
35+
stream.map(IotMetadata::from).boxed()
36+
}
37+
1438
#[derive(Clone, Debug)]
1539
pub struct GatewayMetadata {
1640
pub location: u64,
@@ -26,9 +50,17 @@ pub struct GatewayInfo {
2650
pub is_full_hotspot: bool,
2751
}
2852

53+
pub struct IotMetadata {
54+
pub address: PublicKeyBinary,
55+
pub location: Option<u64>,
56+
pub elevation: i32,
57+
pub gain: i32,
58+
pub is_full_hotspot: bool,
59+
}
60+
2961
impl GatewayInfo {
3062
pub fn chain_metadata_to_info(
31-
meta: db::IotMetadata,
63+
meta: IotMetadata,
3264
region_map: &region_map::RegionMapReader,
3365
) -> Self {
3466
let metadata = if let Some(location) = meta.location {
@@ -112,69 +144,14 @@ impl TryFrom<GatewayInfo> for GatewayInfoProto {
112144
}
113145
}
114146

115-
pub(crate) mod db {
116-
use futures::stream::{Stream, StreamExt};
117-
use helium_crypto::PublicKeyBinary;
118-
use sqlx::{PgExecutor, Row};
119-
use std::str::FromStr;
120-
121-
// Hotspot gain default; dbi * 10
122-
const DEFAULT_GAIN: i32 = 12;
123-
// Hotspot elevation default; meters above sea level
124-
const DEFAULT_ELEVATION: i32 = 0;
125-
126-
pub struct IotMetadata {
127-
pub address: PublicKeyBinary,
128-
pub location: Option<u64>,
129-
pub elevation: i32,
130-
pub gain: i32,
131-
pub is_full_hotspot: bool,
132-
}
133-
134-
const GET_METADATA_SQL: &str = r#"
135-
select kta.entity_key, infos.location::bigint, CAST(infos.elevation AS integer), CAST(infos.gain as integer), infos.is_full_hotspot
136-
from iot_hotspot_infos infos
137-
join key_to_assets kta on infos.asset = kta.asset
138-
"#;
139-
140-
pub async fn get_info(
141-
db: impl PgExecutor<'_>,
142-
address: &PublicKeyBinary,
143-
) -> anyhow::Result<Option<IotMetadata>> {
144-
let entity_key = bs58::decode(address.to_string()).into_vec()?;
145-
let mut query: sqlx::QueryBuilder<sqlx::Postgres> =
146-
sqlx::QueryBuilder::new(GET_METADATA_SQL);
147-
query.push(" where kta.entity_key = $1 ");
148-
Ok(query
149-
.build_query_as::<IotMetadata>()
150-
.bind(entity_key)
151-
.fetch_optional(db)
152-
.await?)
153-
}
154-
155-
pub fn all_info_stream<'a>(
156-
db: impl PgExecutor<'a> + 'a,
157-
) -> impl Stream<Item = IotMetadata> + 'a {
158-
sqlx::query_as::<_, IotMetadata>(GET_METADATA_SQL)
159-
.fetch(db)
160-
.filter_map(|metadata| async move { metadata.ok() })
161-
.boxed()
162-
}
163-
164-
impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for IotMetadata {
165-
fn from_row(row: &sqlx::postgres::PgRow) -> sqlx::Result<Self> {
166-
Ok(Self {
167-
address: PublicKeyBinary::from_str(
168-
&bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(),
169-
)
170-
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?,
171-
location: row.get::<Option<i64>, &str>("location").map(|v| v as u64),
172-
elevation: row
173-
.get::<Option<i32>, &str>("elevation")
174-
.unwrap_or(DEFAULT_ELEVATION),
175-
gain: row.get::<Option<i32>, &str>("gain").unwrap_or(DEFAULT_GAIN),
176-
is_full_hotspot: row.get("is_full_hotspot"),
177-
})
147+
impl From<Gateway> for IotMetadata {
148+
fn from(gateway: Gateway) -> Self {
149+
Self {
150+
address: gateway.address,
151+
location: gateway.location,
152+
elevation: gateway.elevation.unwrap_or(DEFAULT_ELEVATION) as i32,
153+
gain: gateway.gain.unwrap_or(DEFAULT_GAIN) as i32,
154+
is_full_hotspot: gateway.is_full_hotspot.unwrap_or(false),
178155
}
179156
}
180157
}

iot_config/src/gateway/service/mod.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
44
};
55
use anyhow::Result;
6-
use chrono::Utc;
6+
use chrono::{DateTime, Utc};
77
use file_store::traits::{MsgVerify, TimestampEncode};
88
use futures::stream::StreamExt;
99
use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign};
@@ -30,7 +30,7 @@ const CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 3);
3030
pub struct GatewayService {
3131
auth_cache: AuthCache,
3232
gateway_cache: Arc<Cache<PublicKeyBinary, GatewayInfo>>,
33-
metadata_pool: Pool<Postgres>,
33+
pool: Pool<Postgres>,
3434
region_map: RegionMapReader,
3535
signing_key: Arc<Keypair>,
3636
delegate_cache: watch::Receiver<org::DelegateCache>,
@@ -39,7 +39,7 @@ pub struct GatewayService {
3939
impl GatewayService {
4040
pub fn new(
4141
signing_key: Arc<Keypair>,
42-
metadata_pool: Pool<Postgres>,
42+
pool: Pool<Postgres>,
4343
region_map: RegionMapReader,
4444
auth_cache: AuthCache,
4545
delegate_cache: watch::Receiver<org::DelegateCache>,
@@ -51,7 +51,7 @@ impl GatewayService {
5151
Ok(Self {
5252
auth_cache,
5353
gateway_cache,
54-
metadata_pool,
54+
pool,
5555
region_map,
5656
signing_key,
5757
delegate_cache,
@@ -102,7 +102,7 @@ impl GatewayService {
102102
Some(gateway) => Ok(gateway.value().clone()),
103103
None => {
104104
let metadata = tokio::select! {
105-
query_result = info::db::get_info(&self.metadata_pool, pubkey) => {
105+
query_result = info::get(&self.pool, pubkey) => {
106106
query_result.map_err(|_| Status::internal("error fetching gateway info"))?
107107
.ok_or_else(|| {
108108
telemetry::count_gateway_info_lookup("not-found");
@@ -284,7 +284,7 @@ impl iot_config::Gateway for GatewayService {
284284

285285
tracing::debug!("fetching all gateways' info");
286286

287-
let pool = self.metadata_pool.clone();
287+
let pool = self.pool.clone();
288288
let signing_key = self.signing_key.clone();
289289
let batch_size = request.batch_size;
290290
let region_map = self.region_map.clone();
@@ -316,7 +316,10 @@ async fn stream_all_gateways_info(
316316
) -> anyhow::Result<()> {
317317
let timestamp = Utc::now().encode_timestamp();
318318
let signer: Vec<u8> = signing_key.public_key().into();
319-
let mut stream = info::db::all_info_stream(pool).chunks(batch_size as usize);
319+
320+
let epoch: DateTime<Utc> = "1970-01-01T00:00:00Z".parse().unwrap();
321+
322+
let mut stream = info::stream(pool, epoch, None).chunks(batch_size as usize);
320323
while let Some(infos) = stream.next().await {
321324
let gateway_infos = infos
322325
.into_iter()

iot_config/tests/integrations/gateway_service.rs

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use crate::common::{
2-
gateway_metadata_db::{self, create_tables},
3-
make_keypair, spawn_gateway_service,
4-
};
1+
use crate::common::{make_keypair, spawn_gateway_service};
52
use chrono::{DateTime, Utc};
63
use futures::StreamExt;
74
use h3o::{LatLng, Resolution};
@@ -74,8 +71,6 @@ async fn gateway_location_v1(pool: PgPool) -> anyhow::Result<()> {
7471
let pub_key = make_keypair().public_key().clone();
7572
let now = Utc::now();
7673

77-
create_tables(&pool).await?;
78-
7974
let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?;
8075

8176
let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?;
@@ -97,8 +92,6 @@ async fn gateway_region_params_v1(pool: PgPool) -> anyhow::Result<()> {
9792
let pub_key = keypair.public_key().clone();
9893
let now = Utc::now();
9994

100-
create_tables(&pool).await?;
101-
10295
let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?;
10396

10497
let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?;
@@ -125,8 +118,6 @@ async fn gateway_info_v1(pool: PgPool) -> anyhow::Result<()> {
125118
let pub_key = make_keypair().public_key().clone();
126119
let now = Utc::now();
127120

128-
create_tables(&pool).await?;
129-
130121
let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?;
131122

132123
let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?;
@@ -158,8 +149,6 @@ async fn gateway_info_stream_v1(pool: PgPool) -> anyhow::Result<()> {
158149
let pub_key = make_keypair().public_key().clone();
159150
let now = Utc::now();
160151

161-
create_tables(&pool).await?;
162-
163152
let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?;
164153

165154
let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?;
@@ -295,24 +284,7 @@ async fn insert_gateway(
295284
updated_at: now,
296285
};
297286

298-
// Insert test data into iot_hotspot_infos
299-
gateway_metadata_db::insert_gateway(
300-
pool,
301-
"address1", // address (PRIMARY KEY)
302-
"asset1", // asset
303-
Some(gateway.location.unwrap() as i64), // location
304-
gateway.elevation.map(|v| v as i64), // elevation
305-
gateway.gain.map(|v| v as i64), // gain
306-
gateway.is_full_hotspot, // is_full_hotspot
307-
gateway.location_asserts.map(|v| v as i32), // num_location_asserts
308-
gateway.is_active, // is_active
309-
Some(0), // dc_onboarding_fee_paid
310-
gateway.created_at, // created_at
311-
gateway.refreshed_at, // refreshed_at
312-
Some(0), // last_block
313-
gateway.address.clone(), // key (PublicKeyBinary)
314-
)
315-
.await?;
287+
Gateway::insert_bulk(pool, std::slice::from_ref(&gateway)).await?;
316288

317289
let loc_bytes = h3_index.to_le_bytes();
318290
let mut encoder = Encoder::new(Vec::new())?;

0 commit comments

Comments
 (0)