Skip to content

Commit aa5ba75

Browse files
committed
Implement gateway_info_stream_v2
1 parent 76d37d3 commit aa5ba75

File tree

5 files changed

+204
-25
lines changed

5 files changed

+204
-25
lines changed

Cargo.lock

Lines changed: 34 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ aws-smithy-types-convert = { version = "0.60.9", features = [
126126
url = "2.5.4"
127127

128128
### Protobuf
129-
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
129+
helium-proto = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2", features = [
130130
"services",
131131
] }
132-
beacon = { git = "https://github.com/helium/proto", branch = "master" }
132+
beacon = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2" }
133133
# Pickup versions from above
134134
prost = "*"
135135
tonic = { version = "*", features = ["tls-aws-lc", "tls-native-roots"] }

file_store_oracles/src/traits/msg_verify.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl_msg_verify!(iot_config::AdminLoadRegionReqV1, signature);
7373
impl_msg_verify!(iot_config::AdminRemoveKeyReqV1, signature);
7474
impl_msg_verify!(iot_config::GatewayInfoReqV1, signature);
7575
impl_msg_verify!(iot_config::GatewayInfoStreamReqV1, signature);
76+
impl_msg_verify!(iot_config::GatewayInfoStreamReqV2, signature);
7677
impl_msg_verify!(iot_config::RegionParamsReqV1, signature);
7778
impl_msg_verify!(iot_config::GatewayInfoResV1, signature);
7879
impl_msg_verify!(iot_config::GatewayInfoStreamResV1, signature);

iot_config/src/gateway/service/mod.rs

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@ use crate::{
33
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
44
};
55
use anyhow::Result;
6-
use chrono::{DateTime, Utc};
6+
use chrono::{DateTime, TimeZone, Utc};
77
use file_store::traits::TimestampEncode;
88
use file_store_oracles::traits::MsgVerify;
99
use futures::stream::StreamExt;
1010
use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign};
1111
use helium_proto::{
1212
services::iot_config::{
13-
self, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, GatewayInfoStreamResV1,
14-
GatewayLocationReqV1, GatewayLocationResV1, GatewayRegionParamsReqV1,
15-
GatewayRegionParamsResV1,
13+
self, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, GatewayInfoStreamReqV2,
14+
GatewayInfoStreamResV1, GatewayLocationReqV1, GatewayLocationResV1,
15+
GatewayRegionParamsReqV1, GatewayRegionParamsResV1,
1616
},
1717
Message, Region,
1818
};
@@ -293,13 +293,71 @@ impl iot_config::Gateway for GatewayService {
293293
let (tx, rx) = tokio::sync::mpsc::channel(20);
294294

295295
tokio::spawn(async move {
296+
let epoch: DateTime<Utc> = "1970-01-01T00:00:00Z".parse().unwrap();
296297
tokio::select! {
297298
_ = stream_all_gateways_info(
298299
&pool,
299300
tx.clone(),
300301
&signing_key,
301302
region_map.clone(),
302303
batch_size,
304+
epoch,
305+
None,
306+
) => (),
307+
}
308+
});
309+
310+
Ok(Response::new(GrpcStreamResult::new(rx)))
311+
}
312+
313+
type info_stream_v2Stream = GrpcStreamResult<GatewayInfoStreamResV1>;
314+
async fn info_stream_v2(
315+
&self,
316+
request: Request<GatewayInfoStreamReqV2>,
317+
) -> GrpcResult<Self::info_stream_v2Stream> {
318+
let request = request.into_inner();
319+
telemetry::count_request("gateway", "info-stream");
320+
321+
let signer = verify_public_key(&request.signer)?;
322+
self.verify_request_signature(&signer, &request)?;
323+
324+
tracing::debug!("fetching all gateways' info");
325+
326+
let pool = self.pool.clone();
327+
let signing_key = self.signing_key.clone();
328+
let batch_size = request.batch_size;
329+
let min_last_changed_at = Utc
330+
.timestamp_opt(request.min_updated_at as i64, 0)
331+
.single()
332+
.ok_or(Status::invalid_argument(
333+
"Invalid min_refreshed_at argument",
334+
))?;
335+
336+
let min_location_changed_at = if request.min_location_changed_at == 0 {
337+
None
338+
} else {
339+
Some(
340+
Utc.timestamp_opt(request.min_location_changed_at as i64, 0)
341+
.single()
342+
.ok_or(Status::invalid_argument(
343+
"Invalid min_location_changed_at argument",
344+
))?,
345+
)
346+
};
347+
let region_map = self.region_map.clone();
348+
349+
let (tx, rx) = tokio::sync::mpsc::channel(20);
350+
351+
tokio::spawn(async move {
352+
tokio::select! {
353+
_ = stream_all_gateways_info(
354+
&pool,
355+
tx.clone(),
356+
&signing_key,
357+
region_map.clone(),
358+
batch_size,
359+
min_last_changed_at,
360+
min_location_changed_at,
303361
) => (),
304362
}
305363
});
@@ -314,13 +372,14 @@ async fn stream_all_gateways_info(
314372
signing_key: &Keypair,
315373
region_map: RegionMapReader,
316374
batch_size: u32,
375+
min_last_changed_at: DateTime<Utc>,
376+
min_location_changed_at: Option<DateTime<Utc>>,
317377
) -> anyhow::Result<()> {
318378
let timestamp = Utc::now().encode_timestamp();
319379
let signer: Vec<u8> = signing_key.public_key().into();
320380

321-
let epoch: DateTime<Utc> = "1970-01-01T00:00:00Z".parse().unwrap();
322-
323-
let mut stream = info::stream(pool, epoch, None).chunks(batch_size as usize);
381+
let mut stream = info::stream(pool, min_last_changed_at, min_location_changed_at)
382+
.chunks(batch_size as usize);
324383
while let Some(infos) = stream.next().await {
325384
let gateway_infos = infos
326385
.into_iter()

0 commit comments

Comments
 (0)