diff --git a/Cargo.lock b/Cargo.lock index c0f58d916..9e1de5837 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2831,7 +2831,6 @@ dependencies = [ "derive_builder", "futures", "futures-util", - "h3o", "helium-crypto", "helium-proto", "http 1.3.1", @@ -2882,6 +2881,7 @@ dependencies = [ "serde", "serde_json", "strum", + "thiserror 1.0.69", "tokio", "uuid", ] diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 342e4d832..eca71c703 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -38,7 +38,6 @@ async-trait = { workspace = true } derive_builder = { workspace = true } retainer = { workspace = true } uuid = { workspace = true } -h3o = { workspace = true } task-manager = { path = "../task_manager" } tls-init = { path = "../tls_init" } bs58 = { workspace = true } diff --git a/file_store/src/error.rs b/file_store/src/error.rs index 506352f69..f28de6a70 100644 --- a/file_store/src/error.rs +++ b/file_store/src/error.rs @@ -1,179 +1,101 @@ -use prost::UnknownEnumValue; +use std::path::{Path, PathBuf}; + use thiserror::Error; +use crate::file_info::FileInfoError; + pub type Result = std::result::Result; #[derive(Error, Debug)] pub enum Error { - #[error("io error")] + #[error("io error: {0}")] Io(#[from] std::io::Error), - #[error("encode error")] - Encode(#[from] EncodeError), - #[error("decode error")] - Decode(#[from] DecodeError), - #[error("unknown enum value")] - UnknownEnumValue(#[from] UnknownEnumValue), - #[error("not found")] - NotFound(String), - #[error("crypto error")] + + #[error("prost encode error: {0}")] + Encode(#[from] prost::EncodeError), + + #[error("file info error: {0}")] + FileInfo(#[from] FileInfoError), + + #[error("aws error: {0}")] + Aws(#[from] AwsError), + + #[error("crypto error: {0}")] Crypto(Box), - #[error("aws error")] - Aws(#[source] Box), - #[error("mpsc channel error")] - Channel, - #[error("no manifest")] - NoManifest, - #[error("send timeout")] - SendTimeout, + + #[error("channel error: {0}")] + Channel(#[from] ChannelError), + #[error("error building file info poller: {0}")] FileInfoPollerError(#[from] crate::file_info_poller::FileInfoPollerConfigBuilderError), + #[cfg(feature = "sqlx-postgres")] #[error("db error")] DbError(#[from] sqlx::Error), - #[error("channel send error")] - SendError(#[from] tokio::sync::mpsc::error::SendError<()>), - //Generic error wrapper for external (out of that repository) traits implementations. - //Not recommended for internal use! + + // Generic error wrapper for external (out of that repository) traits implementations. + // Not recommended for internal use! #[error("external error")] ExternalError(#[from] Box), } -#[derive(Error, Debug)] -pub enum DecodeError { - #[error("prost error")] - Prost(#[from] prost::DecodeError), - #[error("file info error")] - FileInfo(String), - #[error("uri error")] - Uri(#[from] http::uri::InvalidUri), - #[error("integer conversion error")] - FromInt(#[from] std::num::TryFromIntError), - #[error("error parsing decimal")] - IntoDecimal(#[from] rust_decimal::Error), - #[error("empty field: {0}")] - EmptyField(&'static str), - #[error("unsupported region, type: {0}, value: {1}")] - UnsupportedRegion(String, i32), - #[error("unsupported datarate, type: {0}, value: {1}")] - UnsupportedDataRate(String, i32), - #[error("unsupported invalid_reason, type: {0}, value: {1}")] - UnsupportedInvalidReason(String, i32), - #[error("unsupported participant_side, type: {0}, value: {1}")] - UnsupportedParticipantSide(String, i32), - #[error("unsupported verification status, type: {0}, value: {1}")] - UnsupportedStatusReason(String, i32), - #[error("unsupported signal level, type: {0}, value: {1}")] - UnsupportedSignalLevel(String, i32), - #[error("invalid unix timestamp {0}")] - InvalidTimestamp(u64), - #[error("Uuid error: {0}")] - UuidError(#[from] uuid::Error), - #[error("Invalid cell index error: {0}")] - InvalidCellIndexError(#[from] h3o::error::InvalidCellIndex), - #[error("unsupported packet type, type: {0}, value: {1}")] - UnsupportedPacketType(String, i32), - #[error("file stream try decode error: {0}")] - FileStreamTryDecode(String), - #[error("unsupported token type {0}")] - UnsupportedTokenType(String, i32), -} - -#[derive(Error, Debug)] -pub enum EncodeError { - #[error("prost error")] - Prost(#[from] prost::EncodeError), - #[error("json error")] - Json(#[from] serde_json::Error), -} +#[derive(thiserror::Error, Debug)] +pub enum AwsError { + #[error("s3: {0}")] + S3(Box), -macro_rules! from_err { - ($to_type:ty, $from_type:ty) => { - impl From<$from_type> for Error { - fn from(v: $from_type) -> Self { - Self::from(<$to_type>::from(v)) - } - } - }; + #[error("streaming: {0}")] + Streaming(#[from] aws_sdk_s3::primitives::ByteStreamError), } -// Encode Errors -from_err!(EncodeError, prost::EncodeError); -from_err!(EncodeError, serde_json::Error); - -// Decode Errors -from_err!(DecodeError, prost::DecodeError); - -impl Error { - pub fn not_found(msg: E) -> Self { - Self::NotFound(msg.to_string()) - } - pub fn channel() -> Error { - Error::Channel - } - - pub fn s3_error(err: T) -> Self +impl AwsError { + pub fn s3_error(err: T) -> Error where T: Into, { - Self::from(err.into()) - } - - pub fn file_stream_try_decode(msg: E) -> Error { - DecodeError::file_stream_try_decode(msg) + Error::from(err.into()) } } -impl DecodeError { - pub fn file_info(msg: E) -> Error { - Error::Decode(Self::FileInfo(msg.to_string())) - } - - pub fn unsupported_region(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedRegion(msg1.to_string(), msg2)) - } - - pub fn unsupported_datarate(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedDataRate(msg1.to_string(), msg2)) - } - - pub fn unsupported_packet_type(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedPacketType(msg1.to_string(), msg2)) - } - - pub fn unsupported_participant_side(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedParticipantSide(msg1.to_string(), msg2)) - } +#[derive(thiserror::Error, Debug)] +pub enum ChannelError { + #[error("failed to send {prefix} for process {process}")] + PollerSendError { prefix: String, process: String }, - pub fn unsupported_invalid_reason(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedInvalidReason(msg1.to_string(), msg2)) - } + #[error("channel closed sink {name}")] + SinkClosed { name: String }, - pub fn invalid_timestamp(v: u64) -> Error { - Error::Decode(Self::InvalidTimestamp(v)) - } - - pub fn unsupported_status_reason(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedInvalidReason(msg1.to_string(), msg2)) - } + #[error("timeout for sink {name}")] + SinkTimeout { name: String }, - pub fn unsupported_signal_level(msg1: impl ToString, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedSignalLevel(msg1.to_string(), msg2)) - } + #[error("channel closed for upload {path}")] + UploadClosed { path: PathBuf }, +} - pub fn file_stream_try_decode(msg: E) -> Error { - Error::Decode(Self::FileStreamTryDecode(msg.to_string())) +impl ChannelError { + pub fn poller_send_error(prefix: &str, process: &str) -> Error { + Error::Channel(Self::PollerSendError { + prefix: prefix.to_owned(), + process: process.to_owned(), + }) } - pub fn unsupported_invalidated_reason(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedInvalidReason(msg1.to_string(), msg2)) + pub fn sink_closed(name: &str) -> Error { + Error::Channel(Self::SinkClosed { + name: name.to_owned(), + }) } - pub const fn empty_field(field: &'static str) -> Error { - Error::Decode(Self::EmptyField(field)) + pub fn sink_timeout(name: &str) -> Error { + Error::Channel(Self::SinkTimeout { + name: name.to_owned(), + }) } - pub fn unsupported_token_type(msg1: E, msg2: i32) -> Error { - Error::Decode(Self::UnsupportedTokenType(msg1.to_string(), msg2)) + pub fn upload_closed(path: &Path) -> Error { + Error::Channel(Self::UploadClosed { + path: path.to_owned(), + }) } } @@ -185,6 +107,12 @@ impl From for Error { impl From for Error { fn from(err: aws_sdk_s3::Error) -> Self { - Self::Aws(Box::new(err)) + Self::Aws(AwsError::S3(Box::new(err))) + } +} + +impl From for Error { + fn from(err: aws_sdk_s3::primitives::ByteStreamError) -> Self { + Self::from(AwsError::Streaming(err)) } } diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 63036437a..ca8f38251 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -1,8 +1,8 @@ -use crate::{traits::TimestampDecode, DecodeError, Error, Result}; +use crate::traits::{TimestampDecode, TimestampDecodeError}; use chrono::{DateTime, Utc}; use regex::Regex; use serde::Serialize; -use std::{fmt, os::unix::fs::MetadataExt, path::Path, str::FromStr, sync::LazyLock}; +use std::{fmt, io, os::unix::fs::MetadataExt, path::Path, str::FromStr, sync::LazyLock}; #[derive(Debug, Clone, Serialize)] pub struct FileInfo { @@ -14,17 +14,35 @@ pub struct FileInfo { static RE: LazyLock = LazyLock::new(|| Regex::new(r"([a-z,\d,_]+)\.(\d+)(\.gz)?").unwrap()); +#[derive(thiserror::Error, Debug)] +pub enum FileInfoError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("invalid timestamp string: {0}")] + TimestampStr(#[from] std::num::ParseIntError), + + #[error("filename did not match regex: {0}")] + Regex(String), + + #[error("no file name found")] + MissingFilename, + + #[error("IO: {0}")] + Io(#[from] io::Error), +} + impl FromStr for FileInfo { - type Err = Error; - fn from_str(s: &str) -> Result { + type Err = FileInfoError; + + fn from_str(s: &str) -> std::result::Result { let key = s.to_string(); let cap = RE .captures(s) - .ok_or_else(|| DecodeError::file_info("failed to decode file info"))?; + .ok_or_else(|| FileInfoError::Regex(key.clone()))?; let prefix = cap[1].to_owned(); - let timestamp = u64::from_str(&cap[2]) - .map_err(|_| DecodeError::file_info("failed to decode timestamp"))? - .to_timestamp_millis()?; + + let timestamp = u64::from_str(&cap[2])?.to_timestamp_millis()?; Ok(Self { key, prefix, @@ -66,13 +84,11 @@ impl> From<(T, DateTime)> for FileInfo { } impl TryFrom<&aws_sdk_s3::types::Object> for FileInfo { - type Error = Error; - fn try_from(value: &aws_sdk_s3::types::Object) -> Result { + type Error = FileInfoError; + + fn try_from(value: &aws_sdk_s3::types::Object) -> std::result::Result { let size = value.size().unwrap_or_default() as usize; - let key = value - .key - .as_ref() - .ok_or_else(|| Error::not_found("no file name found"))?; + let key = value.key.as_ref().ok_or(FileInfoError::MissingFilename)?; let mut info = Self::from_str(key)?; info.size = size; Ok(info) @@ -80,8 +96,9 @@ impl TryFrom<&aws_sdk_s3::types::Object> for FileInfo { } impl TryFrom<&Path> for FileInfo { - type Error = Error; - fn try_from(value: &Path) -> Result { + type Error = FileInfoError; + + fn try_from(value: &Path) -> std::result::Result { let mut info = Self::from_str(&value.to_string_lossy())?; info.size = value.metadata()?.size() as usize; Ok(info) diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index 5fdec633a..4b841fe4c 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -1,4 +1,4 @@ -use crate::{traits::MsgDecode, BucketClient, Error, FileInfo, Result}; +use crate::{error::ChannelError, traits::MsgDecode, BucketClient, Error, FileInfo, Result}; use aws_sdk_s3::primitives::ByteStream; use chrono::{DateTime, Utc}; use derive_builder::Builder; @@ -284,6 +284,7 @@ where async fn run(mut self, shutdown: triggered::Listener) -> Result { let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION); let process_name = self.config.process_name.clone(); + let prefix = self.config.prefix.clone(); tracing::info!( r#type = self.config.prefix, @@ -300,7 +301,7 @@ where break; } _ = cleanup_trigger.tick() => self.clean(&self.cache).await?, - result = futures::future::try_join(sender.reserve().map_err(Error::from), self.get_next_file()) => { + result = futures::future::try_join(sender.reserve().map_err(|_| ChannelError::poller_send_error(&prefix, &process_name)), self.get_next_file()) => { let (permit, file) = result?; let byte_stream = self.config.store.get_raw(file.clone()).await?; let data = self.config.parser.parse(byte_stream).await?; @@ -385,7 +386,8 @@ pub struct MsgDecodeFileInfoPollerParser; #[async_trait::async_trait] impl FileInfoPollerParser for MsgDecodeFileInfoPollerParser where - T: MsgDecode + TryFrom + Send + Sync + 'static, + T: MsgDecode + Send + Sync + 'static, + >::Error: std::error::Error, { async fn parse(&self, byte_stream: ByteStream) -> Result> { Ok(crate::stream_source(byte_stream) diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 460ddc84b..1559d1e38 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -1,3 +1,4 @@ +use crate::error::ChannelError; use crate::{file_upload::FileUpload, Error, Result}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; @@ -176,6 +177,7 @@ impl FileSinkClient { ) -> Result> { let (on_write_tx, on_write_rx) = oneshot::channel(); let labels = labels.into_iter().map(Label::from); + tokio::select! { result = self.sender.send_timeout(Message::Data(on_write_tx, item.into()), SEND_TIMEOUT) => match result { Ok(_) => { @@ -196,11 +198,11 @@ impl FileSinkClient { .collect::>() ).increment(1); tracing::error!("file_sink write failed for {:?} channel closed", self.metric); - Err(Error::channel()) + Err(ChannelError::sink_closed(&self.metric)) } Err(SendTimeoutError::Timeout(_)) => { tracing::error!("file_sink write failed for {:?} due to send timeout", self.metric); - Err(Error::SendTimeout) + Err(ChannelError::sink_timeout(&self.metric)) } }, } @@ -228,7 +230,7 @@ impl FileSinkClient { "file_sink failed to commit for {:?} with {e:?}", self.metric ); - Error::channel() + ChannelError::sink_closed(&self.metric) }) .map(|_| on_commit_rx) } @@ -243,7 +245,7 @@ impl FileSinkClient { "file_sink failed to rollback for {:?} with {e:?}", self.metric ); - Error::channel() + ChannelError::sink_closed(&self.metric) }) .map(|_| on_rollback_rx) } diff --git a/file_store/src/file_source.rs b/file_store/src/file_source.rs index e4a4ac8a8..58752feb4 100644 --- a/file_store/src/file_source.rs +++ b/file_store/src/file_source.rs @@ -83,7 +83,9 @@ mod test { use std::str::FromStr; fn infos(names: &'static [&str]) -> FileInfoStream { - futures::stream::iter(names.iter().map(|v| FileInfo::from_str(v))).boxed() + futures::stream::iter(names.iter().map(|v| FileInfo::from_str(v))) + .err_into() + .boxed() } #[tokio::test] diff --git a/file_store/src/file_upload.rs b/file_store/src/file_upload.rs index 40d75673e..c4d51372b 100644 --- a/file_store/src/file_upload.rs +++ b/file_store/src/file_upload.rs @@ -1,4 +1,4 @@ -use crate::{BucketClient, Error, Result}; +use crate::{error::ChannelError, BucketClient, Result}; use futures::{StreamExt, TryFutureExt}; use std::{ path::{Path, PathBuf}, @@ -16,7 +16,8 @@ pub fn message_channel() -> (MessageSender, MessageReceiver) { } pub async fn upload_file(tx: &MessageSender, file: &Path) -> Result { - tx.send(file.to_path_buf()).map_err(|_| Error::channel()) + tx.send(file.to_path_buf()) + .map_err(|_| ChannelError::upload_closed(file)) } #[derive(Debug, Clone)] @@ -50,7 +51,7 @@ impl FileUpload { pub async fn upload_file(&self, file: &Path) -> Result { self.sender .send(file.to_path_buf()) - .map_err(|_| Error::channel()) + .map_err(|_| ChannelError::upload_closed(file)) } } diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index e4d9226ee..336bba204 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -1,9 +1,7 @@ extern crate tls_init; mod error; - -pub use error::{DecodeError, EncodeError, Error, Result}; -use tokio::sync::Mutex; +mod settings; pub mod bucket_client; pub mod file_info; @@ -11,26 +9,28 @@ pub mod file_info_poller; pub mod file_sink; pub mod file_source; pub mod file_upload; -mod settings; pub mod traits; -use std::{collections::HashMap, path::Path, sync::OnceLock}; +// Re-exports +pub use bucket_client::BucketClient; +pub use error::{AwsError, ChannelError, Error, Result}; +pub use file_info::FileInfo; +pub use file_sink::{FileSink, FileSinkBuilder}; +pub use settings::{BucketSettings, Settings}; +// Client functions use aws_config::BehaviorVersion; use aws_sdk_s3::primitives::ByteStream; use aws_smithy_types_convert::stream::PaginationStreamExt; use bytes::BytesMut; use chrono::{DateTime, Utc}; -pub use file_info::FileInfo; -pub use file_sink::{FileSink, FileSinkBuilder}; - -pub use bucket_client::BucketClient; use futures::{ future, stream::{self, BoxStream}, FutureExt, StreamExt, TryFutureExt, TryStreamExt, }; -pub use settings::{BucketSettings, Settings}; +use std::{collections::HashMap, path::Path, sync::OnceLock}; +use tokio::sync::Mutex; pub type Client = aws_sdk_s3::Client; pub type Stream = BoxStream<'static, Result>; @@ -128,7 +128,9 @@ where .map_ok(|page| stream::iter(page.contents.unwrap_or_default()).map(Ok)) .map_err(|err| Error::from(aws_sdk_s3::Error::from(err))) .try_flatten() - .try_filter_map(|file| future::ready(FileInfo::try_from(&file).map(Some))) + .try_filter_map(|file| { + future::ready(FileInfo::try_from(&file).map(Some).map_err(Error::from)) + }) .try_filter(move |info| future::ready(after.is_none_or(|v| info.timestamp > v))) .try_filter(move |info| future::ready(before.is_none_or(|v| info.timestamp <= v))) .boxed() @@ -151,9 +153,8 @@ where } pub async fn put_file(client: &Client, bucket: impl Into, file: &Path) -> Result { - let byte_stream = ByteStream::from_path(&file) - .await - .map_err(|_| Error::not_found(format!("could not open {}", file.display())))?; + let byte_stream = ByteStream::from_path(&file).await?; + poc_metrics::record_duration!( "file_store_put_duration", client @@ -164,7 +165,7 @@ pub async fn put_file(client: &Client, bucket: impl Into, file: &Path) - .content_type("application/octet-stream") .send() .map_ok(|_| ()) - .map_err(Error::s3_error) + .map_err(AwsError::s3_error) .await ) } @@ -182,7 +183,7 @@ pub async fn remove_file( .key(key) .send() .map_ok(|_| ()) - .map_err(Error::s3_error) + .map_err(AwsError::s3_error) .await ) } @@ -275,7 +276,7 @@ async fn get_byte_stream( .key(key) .send() .map_ok(|output| output.body) - .map_err(Error::s3_error) + .map_err(AwsError::s3_error) .fuse() .await } diff --git a/file_store/src/traits/mod.rs b/file_store/src/traits/mod.rs index 25fa645ec..8307ee6be 100644 --- a/file_store/src/traits/mod.rs +++ b/file_store/src/traits/mod.rs @@ -2,4 +2,6 @@ mod msg_decode; mod timestamp; pub use msg_decode::MsgDecode; -pub use timestamp::{TimestampDecode, TimestampEncode}; +pub use timestamp::{ + TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, +}; diff --git a/file_store/src/traits/msg_decode.rs b/file_store/src/traits/msg_decode.rs index 2516d9eca..365a91542 100644 --- a/file_store/src/traits/msg_decode.rs +++ b/file_store/src/traits/msg_decode.rs @@ -1,16 +1,19 @@ -use crate::{Error, Result}; use bytes::Buf; use prost::Message; -pub trait MsgDecode { +#[derive(thiserror::Error, Debug)] +pub enum MsgDecodeError { + #[error("prost decode error: {0}")] + Prost(#[from] prost::DecodeError), + + #[error("conversion error: {0}")] + Conversion(E), +} +pub trait MsgDecode: Sized + TryFrom { type Msg: Message + Default; - fn decode(buf: B) -> Result - where - Self: Sized, - Self: TryFrom, - { + fn decode(buf: B) -> Result> { let req = Self::Msg::decode(buf)?; - Self::try_from(req) + Self::try_from(req).map_err(MsgDecodeError::Conversion) } } diff --git a/file_store/src/traits/timestamp.rs b/file_store/src/traits/timestamp.rs index 05e69997a..fc76fab48 100644 --- a/file_store/src/traits/timestamp.rs +++ b/file_store/src/traits/timestamp.rs @@ -1,29 +1,71 @@ -use crate::{error::DecodeError, Result}; use chrono::{DateTime, TimeZone, Utc}; +// This trait is not being provided by file-store proper +// because it increaseses the chance of running into orphan rules. +// +// A common use is implementing MsgTimestamp for both the prost struct and +// domain struct to ensure the same field is being referenced with the same +// time unit. This becomes not possible if file-store provides the trait. The +// implementer wouldn't own the trait and a high potential they wouldn't own +// the proto struct definition and would be unable to provide half of the +// benefit this trait provides. +// +// If you really want it, here's the trait in it's entirety with some examples, +// put it in your own project. +// +// pub trait MsgTimestamp { +// fn timestamp(&self) -> R; +// } + +// impl MsgTimestamp for ProtoStruct { +// fn timestamp(&self) -> TimestampDecodeResult { +// self.received_timestamp.encode_timestamp_millis() +// } +// } + +// impl MsgTimestamp for DomainStruct { +// fn timestamp(&self) -> u64 { +// self.received_timestamp.to_timestamp_millis() +// } +// } + +pub type TimestampDecodeResult = Result, TimestampDecodeError>; + +#[derive(thiserror::Error, Debug)] +pub enum TimestampDecodeError { + #[error("integer conversion error")] + FromInt(#[from] std::num::TryFromIntError), + + #[error("invalid seconds: {0}")] + InvalidSeconds(u64), + + #[error("invalid millis: {0}")] + InvalidMillis(u64), +} + pub trait TimestampDecode { - fn to_timestamp(self) -> Result>; - fn to_timestamp_millis(self) -> Result>; - fn to_timestamp_nanos(self) -> Result>; + fn to_timestamp(self) -> TimestampDecodeResult; + fn to_timestamp_millis(self) -> TimestampDecodeResult; + fn to_timestamp_nanos(self) -> TimestampDecodeResult; } impl TimestampDecode for u64 { - fn to_timestamp(self) -> Result> { - let decoded = i64::try_from(self).map_err(DecodeError::from)?; + fn to_timestamp(self) -> TimestampDecodeResult { + let decoded = i64::try_from(self)?; Utc.timestamp_opt(decoded, 0) .single() - .ok_or_else(|| DecodeError::invalid_timestamp(self)) + .ok_or(TimestampDecodeError::InvalidSeconds(self)) } - fn to_timestamp_millis(self) -> Result> { - let decoded = i64::try_from(self).map_err(DecodeError::from)?; + fn to_timestamp_millis(self) -> TimestampDecodeResult { + let decoded = i64::try_from(self)?; Utc.timestamp_millis_opt(decoded) .single() - .ok_or_else(|| DecodeError::invalid_timestamp(self)) + .ok_or(TimestampDecodeError::InvalidMillis(self)) } - fn to_timestamp_nanos(self) -> Result> { - let decoded = i64::try_from(self).map_err(DecodeError::from)?; + fn to_timestamp_nanos(self) -> TimestampDecodeResult { + let decoded = i64::try_from(self)?; Ok(Utc.timestamp_nanos(decoded)) } } diff --git a/file_store_oracles/Cargo.toml b/file_store_oracles/Cargo.toml index 6713cf7b5..cb56f4f7f 100644 --- a/file_store_oracles/Cargo.toml +++ b/file_store_oracles/Cargo.toml @@ -7,19 +7,20 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } -chrono = { workspace = true } -helium-crypto = { workspace = true } -helium-proto = { workspace = true } -serde = { workspace = true } -uuid = { workspace = true } -h3o = { workspace = true } +async-trait = { workspace = true } beacon = { workspace = true } blake3 = { workspace = true } -rust_decimal = { workspace = true } -async-trait = { workspace = true } bytes = { workspace = true } +chrono = { workspace = true } +h3o = { workspace = true } +helium-crypto = { workspace = true } +helium-proto = { workspace = true } prost = { workspace = true } +rust_decimal = { workspace = true } +serde = { workspace = true } strum = { workspace = true } +thiserror = { workspace = true } +uuid = { workspace = true } # cli deps base64 = { workspace = true } diff --git a/file_store_oracles/src/iot/iot_beacon_report.rs b/file_store_oracles/src/iot/iot_beacon_report.rs index 737e0639d..cf18ed12f 100644 --- a/file_store_oracles/src/iot/iot_beacon_report.rs +++ b/file_store_oracles/src/iot/iot_beacon_report.rs @@ -1,15 +1,28 @@ use beacon; use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_lora::{LoraBeaconIngestReportV1, LoraBeaconReportReqV1}; -use helium_proto::DataRate; +use helium_proto::{ + services::poc_lora::{LoraBeaconIngestReportV1, LoraBeaconReportReqV1}, + DataRate, +}; use serde::Serialize; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum IotBeaconError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported datarate: {0}")] + DataRate(prost::UnknownEnumValue), +} #[derive(Serialize, Clone, Debug)] pub struct IotBeaconReport { @@ -38,8 +51,9 @@ impl MsgDecode for IotBeaconIngestReport { } impl TryFrom for IotBeaconIngestReport { - type Error = Error; - fn try_from(v: LoraBeaconReportReqV1) -> Result { + type Error = IotBeaconError; + + fn try_from(v: LoraBeaconReportReqV1) -> Result { Ok(Self { received_timestamp: Utc::now(), report: v.try_into()?, @@ -47,8 +61,8 @@ impl TryFrom for IotBeaconIngestReport { } } -impl MsgTimestamp>> for LoraBeaconReportReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraBeaconReportReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_nanos() } } @@ -59,8 +73,8 @@ impl MsgTimestamp for IotBeaconReport { } } -impl MsgTimestamp>> for LoraBeaconIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraBeaconIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -72,13 +86,16 @@ impl MsgTimestamp for IotBeaconIngestReport { } impl TryFrom for IotBeaconIngestReport { - type Error = Error; - fn try_from(v: LoraBeaconIngestReportV1) -> Result { + type Error = IotBeaconError; + + fn try_from(v: LoraBeaconIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("iot beacon ingest report"))? + .ok_or(IotBeaconError::MissingField( + "iot_beacon_ingest_report.report", + ))? .try_into()?, }) } @@ -86,8 +103,8 @@ impl TryFrom for IotBeaconIngestReport { impl From for LoraBeaconReportReqV1 { fn from(v: IotBeaconIngestReport) -> Self { - let timestamp = v.report.timestamp(); Self { + timestamp: v.report.timestamp(), pub_key: v.report.pub_key.into(), local_entropy: v.report.local_entropy, remote_entropy: v.report.remote_entropy, @@ -96,7 +113,6 @@ impl From for LoraBeaconReportReqV1 { channel: v.report.channel, datarate: v.report.datarate as i32, tx_power: v.report.tx_power, - timestamp, signature: v.report.signature, tmst: v.report.tmst, } @@ -104,23 +120,19 @@ impl From for LoraBeaconReportReqV1 { } impl TryFrom for IotBeaconReport { - type Error = Error; - fn try_from(v: LoraBeaconReportReqV1) -> Result { - let data_rate: DataRate = DataRate::try_from(v.datarate).map_err(|_| { - DecodeError::unsupported_datarate("iot_beacon_report_req_v1", v.datarate) - })?; - let timestamp = v.timestamp()?; + type Error = IotBeaconError; + fn try_from(v: LoraBeaconReportReqV1) -> Result { Ok(Self { + timestamp: v.timestamp()?, + datarate: prost_enum(v.datarate, IotBeaconError::DataRate)?, pub_key: v.pub_key.into(), local_entropy: v.local_entropy, remote_entropy: v.remote_entropy, data: v.data, frequency: v.frequency, channel: v.channel, - datarate: data_rate, tx_power: v.tx_power, - timestamp, signature: v.signature, tmst: v.tmst, }) @@ -147,11 +159,7 @@ impl From for LoraBeaconReportReqV1 { } impl IotBeaconReport { - pub fn to_beacon( - &self, - entropy_start: DateTime, - entropy_version: u32, - ) -> Result { + pub fn to_beacon(&self, entropy_start: DateTime, entropy_version: u32) -> beacon::Beacon { let remote_entropy = beacon::Entropy { timestamp: entropy_start.timestamp(), data: self.remote_entropy.clone(), @@ -162,13 +170,13 @@ impl IotBeaconReport { data: self.local_entropy.clone(), version: entropy_version, }; - Ok(beacon::Beacon { + beacon::Beacon { data: self.data.clone(), frequency: self.frequency, datarate: self.datarate, remote_entropy, local_entropy, conducted_power: self.tx_power as u32, - }) + } } } diff --git a/file_store_oracles/src/iot/iot_invalid_poc.rs b/file_store_oracles/src/iot/iot_invalid_poc.rs index 20242a82d..2f3f11698 100644 --- a/file_store_oracles/src/iot/iot_invalid_poc.rs +++ b/file_store_oracles/src/iot/iot_invalid_poc.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_proto::services::poc_lora::{ InvalidDetails, InvalidParticipantSide, InvalidReason, LoraBeaconReportReqV1, @@ -10,9 +9,33 @@ use helium_proto::services::poc_lora::{ use serde::Serialize; use crate::{ - iot_beacon_report::IotBeaconReport, iot_witness_report::IotWitnessReport, traits::MsgTimestamp, + iot_beacon_report::{IotBeaconError, IotBeaconReport}, + iot_witness_report::{IotWitnessError, IotWitnessReport}, + prost_enum, + traits::MsgTimestamp, }; +#[derive(thiserror::Error, Debug)] +pub enum IotInvalidBeaconError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported reason: {0}")] + InvalidReason(prost::UnknownEnumValue), + + #[error("unsupported participant side: {0}")] + InvalidParticipantSide(prost::UnknownEnumValue), + + #[error("invalid witness: {0}")] + InvalidWitness(#[from] IotWitnessError), + + #[error("invalid beacon: {0}")] + InvalidBeacon(#[from] IotBeaconError), +} + #[derive(Serialize, Clone)] pub struct IotInvalidBeaconReport { pub received_timestamp: DateTime, @@ -41,8 +64,8 @@ impl MsgDecode for IotInvalidWitnessReport { type Msg = LoraInvalidWitnessReportV1; } -impl MsgTimestamp>> for LoraInvalidBeaconReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraInvalidBeaconReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -53,8 +76,8 @@ impl MsgTimestamp for IotInvalidBeaconReport { } } -impl MsgTimestamp>> for LoraInvalidWitnessReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraInvalidWitnessReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -66,18 +89,17 @@ impl MsgTimestamp for IotInvalidWitnessReport { } impl TryFrom for IotInvalidBeaconReport { - type Error = Error; - fn try_from(v: LoraInvalidBeaconReportV1) -> Result { - let inv_reason = v.reason; - let invalid_reason: InvalidReason = InvalidReason::try_from(inv_reason).map_err(|_| { - DecodeError::unsupported_invalid_reason("iot_invalid_beacon_report_v1", inv_reason) - })?; + type Error = IotInvalidBeaconError; + + fn try_from(v: LoraInvalidBeaconReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, - reason: invalid_reason, + reason: prost_enum(v.reason, IotInvalidBeaconError::InvalidReason)?, report: v .report - .ok_or_else(|| Error::not_found("iot invalid beacon report v1"))? + .ok_or(IotInvalidBeaconError::MissingField( + "iot_invalid_beacon_report.report", + ))? .try_into()?, location: v.location.parse().ok(), gain: v.gain, @@ -104,29 +126,22 @@ impl From for LoraInvalidBeaconReportV1 { } impl TryFrom for IotInvalidWitnessReport { - type Error = Error; - fn try_from(v: LoraInvalidWitnessReportV1) -> Result { - let inv_reason = v.reason; - let invalid_reason: InvalidReason = InvalidReason::try_from(inv_reason).map_err(|_| { - DecodeError::unsupported_invalid_reason("iot_invalid_witness_report_v1", inv_reason) - })?; - let participant_side = v.participant_side; - let side: InvalidParticipantSide = InvalidParticipantSide::try_from(participant_side) - .map_err(|_| { - DecodeError::unsupported_participant_side( - "iot_invalid_witness_report_v1", - participant_side, - ) - })?; - let received_timestamp = v.timestamp()?; + type Error = IotInvalidBeaconError; + fn try_from(v: LoraInvalidWitnessReportV1) -> Result { Ok(Self { - received_timestamp, - reason: invalid_reason, - participant_side: side, + received_timestamp: v.timestamp()?, + reason: prost_enum(v.reason, IotInvalidBeaconError::InvalidReason)?, + participant_side: prost_enum( + v.participant_side, + IotInvalidBeaconError::InvalidParticipantSide, + )?, + report: v .report - .ok_or_else(|| Error::not_found("iot invalid witness report"))? + .ok_or(IotInvalidBeaconError::MissingField( + "iot_invalid_witness_report.report", + ))? .try_into()?, invalid_details: v.invalid_details, }) diff --git a/file_store_oracles/src/iot/iot_packet.rs b/file_store_oracles/src/iot/iot_packet.rs index c443f8bc5..37e0c2b09 100644 --- a/file_store_oracles/src/iot/iot_packet.rs +++ b/file_store_oracles/src/iot/iot_packet.rs @@ -1,8 +1,7 @@ use blake3::Hasher; use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::{ @@ -14,7 +13,22 @@ use helium_proto::{ }; use serde::Serialize; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum IotPacketError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("unsupported datarate: {0}")] + DataRate(prost::UnknownEnumValue), + + #[error("unsupported region: {0}")] + Region(prost::UnknownEnumValue), + + #[error("unsupported packet type: {0}")] + PacketType(prost::UnknownEnumValue), +} #[derive(Serialize, Clone)] pub struct PacketRouterPacketReport { @@ -49,8 +63,8 @@ impl MsgTimestamp for PacketRouterPacketReport { } } -impl MsgTimestamp>> for PacketRouterPacketReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for PacketRouterPacketReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -61,8 +75,8 @@ impl MsgTimestamp for IotValidPacket { } } -impl MsgTimestamp>> for ValidPacket { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for ValidPacket { + fn timestamp(&self) -> TimestampDecodeResult { self.packet_timestamp.to_timestamp_millis() } } @@ -76,40 +90,31 @@ impl MsgDecode for IotValidPacket { } impl TryFrom for PacketRouterPacketReport { - type Error = Error; - - fn try_from(v: PacketRouterPacketReportV1) -> Result { - let data_rate = DataRate::try_from(v.datarate).map_err(|_| { - DecodeError::unsupported_datarate("iot_packet_router_packet_report_v1", v.datarate) - })?; - let region = Region::try_from(v.region).map_err(|_| { - DecodeError::unsupported_region("iot_packet_router_packet_report_v1", v.region) - })?; - let packet_type = PacketType::try_from(v.r#type).map_err(|_| { - DecodeError::unsupported_packet_type("iot_packet_router_packet_report_v1", v.r#type) - })?; - let received_timestamp = v.timestamp()?; + type Error = IotPacketError; + + fn try_from(v: PacketRouterPacketReportV1) -> Result { Ok(Self { - received_timestamp, + received_timestamp: v.timestamp()?, oui: v.oui, net_id: v.net_id, rssi: v.rssi, free: v.free, frequency: v.frequency, snr: v.snr, - data_rate, - region, + data_rate: prost_enum(v.datarate, IotPacketError::DataRate)?, + region: prost_enum(v.region, IotPacketError::Region)?, gateway: v.gateway.into(), payload_hash: v.payload_hash, payload_size: v.payload_size, - packet_type, + packet_type: prost_enum(v.r#type, IotPacketError::PacketType)?, }) } } impl TryFrom for IotValidPacket { - type Error = Error; - fn try_from(v: ValidPacket) -> Result { + type Error = IotPacketError; + + fn try_from(v: ValidPacket) -> Result { let ts = v.timestamp()?; Ok(Self { gateway: v.gateway.into(), diff --git a/file_store_oracles/src/iot/iot_valid_poc.rs b/file_store_oracles/src/iot/iot_valid_poc.rs index 1cf391005..7c964ca88 100644 --- a/file_store_oracles/src/iot/iot_valid_poc.rs +++ b/file_store_oracles/src/iot/iot_valid_poc.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_proto::services::poc_lora::{ InvalidDetails, InvalidParticipantSide, InvalidReason, LoraBeaconReportReqV1, LoraPocV1, @@ -12,12 +11,39 @@ use rust_decimal::{dec, prelude::ToPrimitive, Decimal}; use serde::Serialize; use crate::{ - iot_beacon_report::IotBeaconReport, iot_witness_report::IotWitnessReport, traits::MsgTimestamp, + iot_beacon_report::{IotBeaconError, IotBeaconReport}, + iot_witness_report::{IotWitnessError, IotWitnessReport}, + prost_enum, + traits::MsgTimestamp, }; const SCALE_MULTIPLIER: Decimal = dec!(10000); pub const SCALING_PRECISION: u32 = 4; +#[derive(thiserror::Error, Debug)] +pub enum IotPocError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("invalid witness report: {0}")] + WitnessReport(#[from] IotWitnessError), + + #[error("invalid beacon report: {0}")] + BeaconReport(#[from] IotBeaconError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported verification status: {0}")] + VerificationStatus(prost::UnknownEnumValue), + + #[error("unsupported invalid reason: {0}")] + InvalidReason(prost::UnknownEnumValue), + + #[error("unsupported participant side: {0}")] + ParticipantSide(prost::UnknownEnumValue), +} + #[derive(Serialize, Clone, Debug)] pub struct IotValidBeaconReport { pub received_timestamp: DateTime, @@ -65,8 +91,8 @@ impl MsgDecode for IotPoc { type Msg = LoraPocV1; } -impl MsgTimestamp>> for LoraValidBeaconReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraValidBeaconReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -77,8 +103,8 @@ impl MsgTimestamp for IotValidBeaconReport { } } -impl MsgTimestamp>> for LoraVerifiedWitnessReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraVerifiedWitnessReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -90,24 +116,25 @@ impl MsgTimestamp for IotVerifiedWitnessReport { } impl TryFrom for IotPoc { - type Error = Error; - fn try_from(v: LoraPocV1) -> Result { + type Error = IotPocError; + + fn try_from(v: LoraPocV1) -> Result { let selected_witnesses = v .selected_witnesses .into_iter() .map(IotVerifiedWitnessReport::try_from) - .collect::>>()?; + .collect::, _>>()?; let unselected_witnesses = v .unselected_witnesses .into_iter() .map(IotVerifiedWitnessReport::try_from) - .collect::>>()?; + .collect::, _>>()?; Ok(Self { poc_id: v.poc_id, beacon_report: v .beacon_report - .ok_or_else(|| Error::not_found("iot poc v1"))? + .ok_or(IotPocError::MissingField("iot_poc.beacon_report"))? .try_into()?, selected_witnesses, unselected_witnesses, @@ -129,8 +156,9 @@ impl From for LoraPocV1 { } impl TryFrom for IotValidBeaconReport { - type Error = Error; - fn try_from(v: LoraValidBeaconReportV1) -> Result { + type Error = IotPocError; + + fn try_from(v: LoraValidBeaconReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, location: v.location.parse().ok(), @@ -139,7 +167,7 @@ impl TryFrom for IotValidBeaconReport { hex_scale: Decimal::new(v.hex_scale as i64, SCALING_PRECISION), report: v .report - .ok_or_else(|| Error::not_found("iot valid beacon report v1"))? + .ok_or(IotPocError::MissingField("iot_valid_beacon_report.report"))? .try_into()?, reward_unit: Decimal::new(v.reward_unit as i64, SCALING_PRECISION), }) @@ -164,39 +192,25 @@ impl From for LoraValidBeaconReportV1 { } impl TryFrom for IotVerifiedWitnessReport { - type Error = Error; - fn try_from(v: LoraVerifiedWitnessReportV1) -> Result { - let received_timestamp = v.timestamp()?; - let status = VerificationStatus::try_from(v.status).map_err(|_| { - DecodeError::unsupported_status_reason("iot_verified_witness_report_v1", v.status) - })?; - let invalid_reason = InvalidReason::try_from(v.invalid_reason).map_err(|_| { - DecodeError::unsupported_invalid_reason( - "iot_verified_witness_report_v1", - v.invalid_reason, - ) - })?; - let participant_side = - InvalidParticipantSide::try_from(v.participant_side).map_err(|_| { - DecodeError::unsupported_participant_side( - "iot_verified_witness_report_v1", - v.participant_side, - ) - })?; + type Error = IotPocError; + + fn try_from(v: LoraVerifiedWitnessReportV1) -> Result { Ok(Self { - received_timestamp, - status, + received_timestamp: v.timestamp()?, + status: prost_enum(v.status, IotPocError::VerificationStatus)?, report: v .report - .ok_or_else(|| Error::not_found("iot valid witness port v1"))? + .ok_or(IotPocError::MissingField( + "iot_verified_witness_report.report", + ))? .try_into()?, location: v.location.parse().ok(), gain: v.gain, elevation: v.elevation, hex_scale: Decimal::new(v.hex_scale as i64, SCALING_PRECISION), reward_unit: Decimal::new(v.reward_unit as i64, SCALING_PRECISION), - invalid_reason, - participant_side, + invalid_reason: prost_enum(v.invalid_reason, IotPocError::InvalidReason)?, + participant_side: prost_enum(v.participant_side, IotPocError::ParticipantSide)?, invalid_details: v.invalid_details, }) } diff --git a/file_store_oracles/src/iot/iot_witness_report.rs b/file_store_oracles/src/iot/iot_witness_report.rs index 29603de02..7087f52b4 100644 --- a/file_store_oracles/src/iot/iot_witness_report.rs +++ b/file_store_oracles/src/iot/iot_witness_report.rs @@ -1,15 +1,27 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_lora::{LoraWitnessIngestReportV1, LoraWitnessReportReqV1}; use helium_proto::DataRate; use serde::Serialize; +use crate::prost_enum; use crate::traits::MsgTimestamp; +#[derive(thiserror::Error, Debug)] +pub enum IotWitnessError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported datarate: {0}")] + DataRate(prost::UnknownEnumValue), +} + #[derive(Serialize, Clone, Debug)] pub struct IotWitnessReport { #[serde(alias = "pubKey")] @@ -35,8 +47,9 @@ impl MsgDecode for IotWitnessIngestReport { } impl TryFrom for IotWitnessIngestReport { - type Error = Error; - fn try_from(v: LoraWitnessReportReqV1) -> Result { + type Error = IotWitnessError; + + fn try_from(v: LoraWitnessReportReqV1) -> Result { Ok(Self { received_timestamp: Utc::now(), report: v.try_into()?, @@ -45,13 +58,16 @@ impl TryFrom for IotWitnessIngestReport { } impl TryFrom for IotWitnessIngestReport { - type Error = Error; - fn try_from(v: LoraWitnessIngestReportV1) -> Result { + type Error = IotWitnessError; + + fn try_from(v: LoraWitnessIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("iot witness ingest report v1"))? + .ok_or(IotWitnessError::MissingField( + "iot_witness_ingest_report.report", + ))? .try_into()?, }) } @@ -75,29 +91,25 @@ impl From for LoraWitnessReportReqV1 { } impl TryFrom for IotWitnessReport { - type Error = Error; - fn try_from(v: LoraWitnessReportReqV1) -> Result { - let dr = v.datarate; - let data_rate: DataRate = DataRate::try_from(dr) - .map_err(|_| DecodeError::unsupported_datarate("iot_witness_report_req_v1", dr))?; - let timestamp = v.timestamp()?; + type Error = IotWitnessError; + fn try_from(v: LoraWitnessReportReqV1) -> Result { Ok(Self { + datarate: prost_enum(v.datarate, IotWitnessError::DataRate)?, + timestamp: v.timestamp()?, pub_key: v.pub_key.into(), data: v.data, - timestamp, signal: v.signal, snr: v.snr, frequency: v.frequency, - datarate: data_rate, signature: v.signature, tmst: v.tmst, }) } } -impl MsgTimestamp>> for LoraWitnessReportReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraWitnessReportReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_nanos() } } @@ -108,8 +120,8 @@ impl MsgTimestamp for IotWitnessReport { } } -impl MsgTimestamp>> for LoraWitnessIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for LoraWitnessIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } diff --git a/file_store_oracles/src/lib.rs b/file_store_oracles/src/lib.rs index fb4edf1bf..b85168e7d 100644 --- a/file_store_oracles/src/lib.rs +++ b/file_store_oracles/src/lib.rs @@ -18,3 +18,14 @@ pub use subscriber::*; // Re-exports pub use file_type::FileType; + +/// Useful in TryFrom implementations when going from a proto field to a rust +/// type that uses the proto enum as a member. It can automatically convert from +/// the underying i32 to the enum type or fallback to a provided error type. +pub fn prost_enum(value: i32, map_err: Op) -> Result +where + Enum: TryFrom, + Op: FnOnce(prost::UnknownEnumValue) -> Err, +{ + Enum::try_from(value).map_err(map_err) +} diff --git a/file_store_oracles/src/mobile/coverage.rs b/file_store_oracles/src/mobile/coverage.rs index eff84ed75..e613821aa 100644 --- a/file_store_oracles/src/mobile/coverage.rs +++ b/file_store_oracles/src/mobile/coverage.rs @@ -1,8 +1,5 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode}, - DecodeError, Error, Result, -}; +use file_store::traits::{MsgDecode, TimestampDecode, TimestampDecodeError}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ coverage_object_req_v1, CoverageObjectIngestReportV1, CoverageObjectReqV1, @@ -11,6 +8,32 @@ use helium_proto::services::poc_mobile::{ use serde::{Deserialize, Serialize}; use uuid::Uuid; +use crate::prost_enum; + +#[derive(thiserror::Error, Debug)] +pub enum CoverageError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("invalid cell index: {0}")] + InvalidCellIndex(#[from] h3o::error::InvalidCellIndex), + + #[error("uuid: {0}")] + Uuid(#[from] uuid::Error), + + #[error("unsupported keytype CbsdId")] + UnsupportedCsbdId, + + #[error("missing key_type")] + MissingKeyType, + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported signal level: {0}")] + SignalLevel(prost::UnknownEnumValue), +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RadioHexSignalLevel { pub location: h3o::CellIndex, @@ -58,42 +81,41 @@ impl MsgDecode for CoverageObjectIngestReport { } impl TryFrom for CoverageObjectIngestReport { - type Error = Error; + type Error = CoverageError; - fn try_from(v: CoverageObjectIngestReportV1) -> Result { + fn try_from(v: CoverageObjectIngestReportV1) -> Result { Ok(Self { received_timestamp: v.received_timestamp.to_timestamp_millis()?, report: v .report - .ok_or_else(|| Error::not_found("ingest coverage object report"))? + .ok_or(CoverageError::MissingField( + "coverage_object_ingest_report.report", + ))? .try_into()?, }) } } impl TryFrom for CoverageObject { - type Error = Error; + type Error = CoverageError; - fn try_from(v: CoverageObjectReqV1) -> Result { - let coverage: Result> = v + fn try_from(v: CoverageObjectReqV1) -> Result { + let coverage: Result, CoverageError> = v .coverage .into_iter() .map(RadioHexSignalLevel::try_from) .collect(); Ok(Self { pub_key: v.pub_key.into(), - uuid: Uuid::from_slice(&v.uuid).map_err(DecodeError::from)?, + uuid: Uuid::from_slice(&v.uuid)?, key_type: match v.key_type { Some(coverage_object_req_v1::KeyType::HotspotKey(key)) => { KeyType::HotspotKey(key.into()) } Some(coverage_object_req_v1::KeyType::CbsdId(_id)) => { - return Err(Error::NotFound( - "coverage objects with KeyType CbsdId are not supported anymore" - .to_string(), - )) + return Err(CoverageError::UnsupportedCsbdId); } - None => return Err(Error::NotFound("key_type".to_string())), + None => return Err(CoverageError::MissingKeyType), }, coverage_claim_time: v.coverage_claim_time.to_timestamp()?, coverage: coverage?, @@ -105,15 +127,13 @@ impl TryFrom for CoverageObject { } impl TryFrom for RadioHexSignalLevel { - type Error = Error; + type Error = CoverageError; - fn try_from(v: RadioHexSignalLevelProto) -> Result { + fn try_from(v: RadioHexSignalLevelProto) -> Result { Ok(Self { - signal_level: SignalLevel::try_from(v.signal_level).map_err(|_| { - DecodeError::unsupported_signal_level("coverage_object_req_v1", v.signal_level) - })?, + signal_level: prost_enum(v.signal_level, CoverageError::SignalLevel)?, signal_power: v.signal_power, - location: v.location.parse().map_err(DecodeError::from)?, + location: v.location.parse()?, }) } } diff --git a/file_store_oracles/src/mobile/mobile_ban.rs b/file_store_oracles/src/mobile/mobile_ban.rs index 9fbefc864..0572325bf 100644 --- a/file_store_oracles/src/mobile/mobile_ban.rs +++ b/file_store_oracles/src/mobile/mobile_ban.rs @@ -1,8 +1,5 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, -}; +use file_store::traits::{MsgDecode, TimestampDecode, TimestampDecodeError, TimestampEncode}; use helium_crypto::PublicKeyBinary; pub mod proto { @@ -15,6 +12,26 @@ pub mod proto { // Re-export proto enums pub use proto::{BanReason, VerifiedBanIngestReportStatus}; +use crate::prost_enum; + +#[derive(thiserror::Error, Debug)] +pub enum BanReportError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported status: {0}")] + Status(prost::UnknownEnumValue), + + #[error("unsupported ban type: {0}")] + BanType(prost::UnknownEnumValue), + + #[error("unsupported reason: {0}")] + Reason(prost::UnknownEnumValue), +} + #[derive(Clone)] pub struct VerifiedBanReport { pub verified_timestamp: DateTime, @@ -77,37 +94,36 @@ impl MsgDecode for VerifiedBanReport { // === Conversion :: proto -> struct impl TryFrom for VerifiedBanReport { - type Error = Error; + type Error = BanReportError; fn try_from(value: proto::VerifiedBanIngestReportV1) -> Result { - let status = value.status(); Ok(Self { verified_timestamp: value.verified_timestamp_ms.to_timestamp_millis()?, report: value .report - .ok_or_else(|| Error::not_found("verified ban report missing"))? + .ok_or(BanReportError::MissingField("verified_ban_report.report"))? .try_into()?, - status, + status: prost_enum(value.status, BanReportError::Status)?, }) } } impl TryFrom for BanReport { - type Error = Error; + type Error = BanReportError; fn try_from(value: proto::BanIngestReportV1) -> Result { Ok(Self { received_timestamp: value.received_timestamp_ms.to_timestamp_millis()?, report: value .report - .ok_or_else(|| Error::not_found("ban report missing"))? + .ok_or(BanReportError::MissingField("ban_report.report"))? .try_into()?, }) } } impl TryFrom for BanRequest { - type Error = Error; + type Error = BanReportError; fn try_from(value: proto::BanReqV1) -> Result { Ok(Self { @@ -121,18 +137,18 @@ impl TryFrom for BanRequest { } impl TryFrom> for BanAction { - type Error = Error; + type Error = BanReportError; fn try_from(value: Option) -> Result { match value { Some(action) => Ok(action.try_into()?), - None => Err(DecodeError::empty_field("ban_action")), + None => Err(BanReportError::MissingField("ban_action")), } } } impl TryFrom for BanAction { - type Error = Error; + type Error = BanReportError; fn try_from(value: proto::BanAction) -> Result { let action = match value { proto::BanAction::Ban(details) => Self::Ban(details.try_into()?), @@ -143,18 +159,16 @@ impl TryFrom for BanAction { } impl TryFrom for BanDetails { - type Error = Error; + type Error = BanReportError; fn try_from(value: proto::BanDetailsV1) -> Result { - let reason = value.reason(); - let ban_type = value.ban_type().into(); let expiration_timestamp = match value.expiration_timestamp_ms { 0 => None, val => Some(val.to_timestamp_millis()?), }; Ok(Self { - reason, - ban_type, + reason: prost_enum(value.reason, BanReportError::Reason)?, + ban_type: prost_enum(value.ban_type, BanReportError::BanType)?, hotspot_serial: value.hotspot_serial, message: value.message, expiration_timestamp, @@ -171,6 +185,16 @@ impl From for UnbanDetails { } } +// Helper to use map_enum that goes through the proto type into our own. +impl TryFrom for BanType { + type Error = prost::UnknownEnumValue; + + fn try_from(value: i32) -> Result { + let val = proto::BanType::try_from(value)?; + Ok(val.into()) + } +} + impl From for BanType { fn from(value: proto::BanType) -> Self { match value { diff --git a/file_store_oracles/src/mobile/mobile_radio_invalidated_threshold.rs b/file_store_oracles/src/mobile/mobile_radio_invalidated_threshold.rs index c49a756e4..a8dfb6218 100644 --- a/file_store_oracles/src/mobile/mobile_radio_invalidated_threshold.rs +++ b/file_store_oracles/src/mobile/mobile_radio_invalidated_threshold.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ @@ -11,7 +10,22 @@ use helium_proto::services::poc_mobile::{ }; use serde::{Deserialize, Serialize}; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum MobileBanInvalidatedThresholdError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported reason: {0}")] + Reason(prost::UnknownEnumValue), + + #[error("unsupported status: {0}")] + Status(prost::UnknownEnumValue), +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct InvalidatedRadioThresholdReportReq { @@ -47,18 +61,13 @@ impl MsgDecode for VerifiedInvalidatedRadioThresholdIngestReport { } impl TryFrom for InvalidatedRadioThresholdReportReq { - type Error = Error; - fn try_from(v: InvalidatedRadioThresholdReportReqV1) -> Result { - let reason = InvalidatedThresholdReason::try_from(v.reason).map_err(|_| { - DecodeError::unsupported_invalidated_reason( - "invalidated_radio_threshold_report_req_v1", - v.reason, - ) - })?; + type Error = MobileBanInvalidatedThresholdError; + + fn try_from(v: InvalidatedRadioThresholdReportReqV1) -> Result { Ok(Self { + reason: prost_enum(v.reason, MobileBanInvalidatedThresholdError::Reason)?, + timestamp: v.timestamp()?, hotspot_pubkey: v.hotspot_pubkey.into(), - reason, - timestamp: v.timestamp.to_timestamp()?, carrier_pub_key: v.carrier_pub_key.into(), }) } @@ -66,20 +75,19 @@ impl TryFrom for InvalidatedRadioThreshold impl From for InvalidatedRadioThresholdReportReqV1 { fn from(v: InvalidatedRadioThresholdReportReq) -> Self { - let timestamp = v.timestamp.timestamp() as u64; Self { cbsd_id: String::default(), + timestamp: v.timestamp(), hotspot_pubkey: v.hotspot_pubkey.into(), reason: v.reason as i32, - timestamp, carrier_pub_key: v.carrier_pub_key.into(), signature: vec![], } } } -impl MsgTimestamp>> for InvalidatedRadioThresholdReportReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for InvalidatedRadioThresholdReportReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp() } } @@ -90,8 +98,8 @@ impl MsgTimestamp for InvalidatedRadioThresholdReportReq { } } -impl MsgTimestamp>> for InvalidatedRadioThresholdIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for InvalidatedRadioThresholdIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -102,8 +110,8 @@ impl MsgTimestamp for InvalidatedRadioThresholdIngestReport { } } -impl MsgTimestamp>> for VerifiedInvalidatedRadioThresholdIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for VerifiedInvalidatedRadioThresholdIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -115,15 +123,16 @@ impl MsgTimestamp for VerifiedInvalidatedRadioThresholdIngestReport { } impl TryFrom for InvalidatedRadioThresholdIngestReport { - type Error = Error; - fn try_from(v: InvalidatedRadioThresholdIngestReportV1) -> Result { + type Error = MobileBanInvalidatedThresholdError; + + fn try_from(v: InvalidatedRadioThresholdIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| { - Error::not_found("ingest invalidated radio threshold ingest report") - })? + .ok_or(MobileBanInvalidatedThresholdError::MissingField( + "invalidated_radio_threshold_ingest_report.report", + ))? .try_into()?, }) } @@ -143,23 +152,18 @@ impl From for InvalidatedRadioThresholdIn impl TryFrom for VerifiedInvalidatedRadioThresholdIngestReport { - type Error = Error; - fn try_from(v: VerifiedInvalidatedRadioThresholdIngestReportV1) -> Result { - let status = InvalidatedRadioThresholdReportVerificationStatus::try_from(v.status) - .map_err(|_| { - DecodeError::unsupported_status_reason( - "verified_invalidated_radio_threshold_ingest_report_v1", - v.status, - ) - })?; - let report = v - .report - .ok_or_else(|| Error::not_found("ingest invalidated radio threshold ingest report"))? - .try_into()?; + type Error = MobileBanInvalidatedThresholdError; + + fn try_from(v: VerifiedInvalidatedRadioThresholdIngestReportV1) -> Result { Ok(Self { - report, - status, - timestamp: v.timestamp.to_timestamp()?, + status: prost_enum(v.status, MobileBanInvalidatedThresholdError::Status)?, + timestamp: v.timestamp()?, + report: v + .report + .ok_or(MobileBanInvalidatedThresholdError::MissingField( + "verified_invalidated_radio_threshold_ingest_report.report", + ))? + .try_into()?, }) } } diff --git a/file_store_oracles/src/mobile/mobile_radio_threshold.rs b/file_store_oracles/src/mobile/mobile_radio_threshold.rs index a035c1cf7..71ab74e81 100644 --- a/file_store_oracles/src/mobile/mobile_radio_threshold.rs +++ b/file_store_oracles/src/mobile/mobile_radio_threshold.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ @@ -10,7 +9,19 @@ use helium_proto::services::poc_mobile::{ }; use serde::{Deserialize, Serialize}; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum RadioThresholdError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported verification status: {0}")] + VerificationStatus(prost::UnknownEnumValue), +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RadioThresholdReportReq { @@ -47,8 +58,9 @@ impl MsgDecode for VerifiedRadioThresholdIngestReport { } impl TryFrom for RadioThresholdReportReq { - type Error = Error; - fn try_from(v: RadioThresholdReportReqV1) -> Result { + type Error = RadioThresholdError; + + fn try_from(v: RadioThresholdReportReqV1) -> Result { Ok(Self { hotspot_pubkey: v.hotspot_pubkey.into(), bytes_threshold: v.bytes_threshold, @@ -74,8 +86,8 @@ impl From for RadioThresholdReportReqV1 { } } -impl MsgTimestamp>> for RadioThresholdReportReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for RadioThresholdReportReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.threshold_timestamp.to_timestamp() } } @@ -86,8 +98,8 @@ impl MsgTimestamp for RadioThresholdReportReq { } } -impl MsgTimestamp>> for RadioThresholdIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for RadioThresholdIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -98,8 +110,8 @@ impl MsgTimestamp for RadioThresholdIngestReport { } } -impl MsgTimestamp>> for VerifiedRadioThresholdIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for VerifiedRadioThresholdIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -111,13 +123,16 @@ impl MsgTimestamp for VerifiedRadioThresholdIngestReport { } impl TryFrom for RadioThresholdIngestReport { - type Error = Error; - fn try_from(v: RadioThresholdIngestReportV1) -> Result { + type Error = RadioThresholdError; + + fn try_from(v: RadioThresholdIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("ingest radio threshold ingest report"))? + .ok_or(RadioThresholdError::MissingField( + "radio_threshold_ingest_report.report", + ))? .try_into()?, }) } @@ -135,20 +150,17 @@ impl From for RadioThresholdIngestReportV1 { } impl TryFrom for VerifiedRadioThresholdIngestReport { - type Error = Error; - fn try_from(v: VerifiedRadioThresholdIngestReportV1) -> Result { - let status = RadioThresholdReportVerificationStatus::try_from(v.status).map_err(|_| { - DecodeError::unsupported_status_reason( - "verified_radio_threshold_ingest_report_v1", - v.status, - ) - })?; + type Error = RadioThresholdError; + + fn try_from(v: VerifiedRadioThresholdIngestReportV1) -> Result { Ok(Self { report: v .report - .ok_or_else(|| Error::not_found("ingest radio threshold ingest report"))? + .ok_or(RadioThresholdError::MissingField( + "verified_radio_threshold_ingest_report.report", + ))? .try_into()?, - status, + status: prost_enum(v.status, RadioThresholdError::VerificationStatus)?, timestamp: v.timestamp.to_timestamp()?, }) } diff --git a/file_store_oracles/src/mobile/mobile_session.rs b/file_store_oracles/src/mobile/mobile_session.rs index d70ae5ae4..96d5f5600 100644 --- a/file_store_oracles/src/mobile/mobile_session.rs +++ b/file_store_oracles/src/mobile/mobile_session.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ @@ -13,7 +12,25 @@ use helium_proto::services::poc_mobile::{ }; use serde::Serialize; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum DataTransferError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported status reason: {0}")] + StatusReason(prost::UnknownEnumValue), + + #[error("unsupported carrier id v2: {0}")] + CarrierIdV2(prost::UnknownEnumValue), + + #[error("unsupported radio access technology: {0}")] + RadioAccessTechnology(prost::UnknownEnumValue), +} #[derive(Serialize, Clone, Debug)] pub struct DataTransferSessionIngestReport { @@ -25,8 +42,8 @@ impl MsgDecode for DataTransferSessionIngestReport { type Msg = DataTransferSessionIngestReportV1; } -impl MsgTimestamp>> for DataTransferSessionIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for DataTransferSessionIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -38,14 +55,16 @@ impl MsgTimestamp for DataTransferSessionIngestReport { } impl TryFrom for DataTransferSessionIngestReport { - type Error = Error; + type Error = DataTransferError; - fn try_from(v: DataTransferSessionIngestReportV1) -> Result { + fn try_from(v: DataTransferSessionIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("data transfer session report"))? + .ok_or(DataTransferError::MissingField( + "data_transfer_session_ingest_report.report", + ))? .try_into()?, }) } @@ -73,8 +92,8 @@ impl MsgDecode for InvalidDataTransferIngestReport { type Msg = InvalidDataTransferIngestReportV1; } -impl MsgTimestamp>> for InvalidDataTransferIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for InvalidDataTransferIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -86,18 +105,18 @@ impl MsgTimestamp for InvalidDataTransferIngestReport { } impl TryFrom for InvalidDataTransferIngestReport { - type Error = Error; - fn try_from(v: InvalidDataTransferIngestReportV1) -> Result { - let reason = DataTransferIngestReportStatus::try_from(v.reason).map_err(|_| { - DecodeError::unsupported_status_reason("invalid_data_transfer_session_reason", v.reason) - })?; + type Error = DataTransferError; + + fn try_from(v: InvalidDataTransferIngestReportV1) -> Result { Ok(Self { timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("data transfer session ingest report"))? + .ok_or(DataTransferError::MissingField( + "invalid_data_transfer_ingest_report.report", + ))? .try_into()?, - reason, + reason: prost_enum(v.reason, DataTransferError::StatusReason)?, }) } } @@ -127,8 +146,8 @@ impl MsgTimestamp for VerifiedDataTransferIngestReport { } } -impl MsgTimestamp>> for VerifiedDataTransferIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for VerifiedDataTransferIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -138,14 +157,17 @@ impl MsgDecode for VerifiedDataTransferIngestReport { } impl TryFrom for VerifiedDataTransferIngestReport { - type Error = Error; - fn try_from(v: VerifiedDataTransferIngestReportV1) -> Result { + type Error = DataTransferError; + + fn try_from(v: VerifiedDataTransferIngestReportV1) -> Result { Ok(Self { - status: v.status(), + status: prost_enum(v.status, DataTransferError::StatusReason)?, timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("data transfer session ingest report"))? + .ok_or(DataTransferError::MissingField( + "verified_data_transfer_ingest_report.report", + ))? .try_into()?, }) } @@ -175,8 +197,8 @@ pub struct DataTransferEvent { pub signature: Vec, } -impl MsgTimestamp>> for DataTransferEventProto { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for DataTransferEventProto { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp() } } @@ -192,17 +214,19 @@ impl MsgDecode for DataTransferEvent { } impl TryFrom for DataTransferEvent { - type Error = Error; + type Error = DataTransferError; - fn try_from(v: DataTransferEventProto) -> Result { + fn try_from(v: DataTransferEventProto) -> Result { let timestamp = v.timestamp()?; - let radio_access_technology = v.radio_access_technology(); Ok(Self { pub_key: v.pub_key.into(), upload_bytes: v.upload_bytes, download_bytes: v.download_bytes, - radio_access_technology, + radio_access_technology: prost_enum( + v.radio_access_technology, + DataTransferError::RadioAccessTechnology, + )?, event_id: v.event_id, payer: v.payer.into(), timestamp, @@ -242,20 +266,20 @@ impl MsgDecode for DataTransferSessionReq { } impl TryFrom for DataTransferSessionReq { - type Error = Error; - - fn try_from(v: DataTransferSessionReqV1) -> Result { - let carrier_id = v.carrier_id_v2(); + type Error = DataTransferError; + fn try_from(v: DataTransferSessionReqV1) -> Result { Ok(Self { rewardable_bytes: v.rewardable_bytes, - signature: v.signature, data_transfer_usage: v .data_transfer_usage - .ok_or_else(|| Error::not_found("data transfer usage"))? + .ok_or(DataTransferError::MissingField( + "data_transfer_session_req.data_transfer_usage", + ))? .try_into()?, + signature: v.signature, pub_key: v.pub_key.into(), - carrier_id, + carrier_id: prost_enum(v.carrier_id_v2, DataTransferError::CarrierIdV2)?, sampling: v.sampling, }) } diff --git a/file_store_oracles/src/mobile/mobile_subscriber.rs b/file_store_oracles/src/mobile/mobile_subscriber.rs index 87eaa4c90..7afba9df1 100644 --- a/file_store_oracles/src/mobile/mobile_subscriber.rs +++ b/file_store_oracles/src/mobile/mobile_subscriber.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ @@ -10,7 +9,19 @@ use helium_proto::services::poc_mobile::{ }; use serde::{Deserialize, Serialize}; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum SubscriberLocationError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported status reason: {0}")] + StatusReason(prost::UnknownEnumValue), +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct SubscriberLocationReq { @@ -45,8 +56,9 @@ impl MsgDecode for VerifiedSubscriberLocationIngestReport { } impl TryFrom for SubscriberLocationReq { - type Error = Error; - fn try_from(v: SubscriberLocationReqV1) -> Result { + type Error = SubscriberLocationError; + + fn try_from(v: SubscriberLocationReqV1) -> Result { Ok(Self { subscriber_id: v.subscriber_id, timestamp: v.timestamp.to_timestamp()?, @@ -67,8 +79,8 @@ impl From for SubscriberLocationReqV1 { } } -impl MsgTimestamp>> for SubscriberLocationReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for SubscriberLocationReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp() } } @@ -79,8 +91,8 @@ impl MsgTimestamp for SubscriberLocationReq { } } -impl MsgTimestamp>> for SubscriberLocationIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for SubscriberLocationIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -91,8 +103,8 @@ impl MsgTimestamp for SubscriberLocationIngestReport { } } -impl MsgTimestamp>> for VerifiedSubscriberLocationIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for VerifiedSubscriberLocationIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -104,13 +116,16 @@ impl MsgTimestamp for VerifiedSubscriberLocationIngestReport { } impl TryFrom for SubscriberLocationIngestReport { - type Error = Error; - fn try_from(v: SubscriberLocationIngestReportV1) -> Result { + type Error = SubscriberLocationError; + + fn try_from(v: SubscriberLocationIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("ingest subscriber location report"))? + .ok_or(SubscriberLocationError::MissingField( + "subscriber_location_ingest_report.report", + ))? .try_into()?, }) } @@ -128,20 +143,17 @@ impl From for SubscriberLocationIngestReportV1 { } impl TryFrom for VerifiedSubscriberLocationIngestReport { - type Error = Error; - fn try_from(v: VerifiedSubscriberLocationIngestReportV1) -> Result { - let status = SubscriberReportVerificationStatus::try_from(v.status).map_err(|_| { - DecodeError::unsupported_status_reason( - "verified_subscriber_location_ingest_report_v1", - v.status, - ) - })?; + type Error = SubscriberLocationError; + + fn try_from(v: VerifiedSubscriberLocationIngestReportV1) -> Result { Ok(Self { report: v .report - .ok_or_else(|| Error::not_found("ingest subscriber location ingest report"))? + .ok_or(SubscriberLocationError::MissingField( + "verified_subscriber_location_ingest_report.report", + ))? .try_into()?, - status, + status: prost_enum(v.status, SubscriberLocationError::StatusReason)?, timestamp: v.timestamp.to_timestamp()?, }) } diff --git a/file_store_oracles/src/mobile/mobile_transfer.rs b/file_store_oracles/src/mobile/mobile_transfer.rs index b9f92e194..96a8c4b6a 100644 --- a/file_store_oracles/src/mobile/mobile_transfer.rs +++ b/file_store_oracles/src/mobile/mobile_transfer.rs @@ -1,12 +1,15 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode}, - Error, Result, -}; +use file_store::traits::{MsgDecode, TimestampDecode, TimestampDecodeError}; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier as proto; use serde::Serialize; +#[derive(thiserror::Error, Debug)] +pub enum ValidDataTransferError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), +} + #[derive(Serialize, Clone)] pub struct ValidDataTransferSession { pub pub_key: PublicKeyBinary, @@ -25,8 +28,9 @@ impl MsgDecode for ValidDataTransferSession { } impl TryFrom for ValidDataTransferSession { - type Error = Error; - fn try_from(v: proto::ValidDataTransferSession) -> Result { + type Error = ValidDataTransferError; + + fn try_from(v: proto::ValidDataTransferSession) -> Result { Ok(Self { payer: v.payer.into(), pub_key: v.pub_key.into(), diff --git a/file_store_oracles/src/mobile/speedtest.rs b/file_store_oracles/src/mobile/speedtest.rs index 2c594abf3..f8945dd3b 100644 --- a/file_store_oracles/src/mobile/speedtest.rs +++ b/file_store_oracles/src/mobile/speedtest.rs @@ -1,7 +1,6 @@ -use chrono::{DateTime, TimeZone, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use chrono::{DateTime, Utc}; +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ @@ -10,7 +9,22 @@ use helium_proto::services::poc_mobile::{ }; use serde::{Deserialize, Serialize}; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum SpeedtestError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported status reason: {0}")] + StatusReason(prost::UnknownEnumValue), + + #[error("unsupported verification result: {0}")] + VerificationResult(prost::UnknownEnumValue), +} #[derive(Clone, Deserialize, Serialize, Debug)] pub struct CellSpeedtest { @@ -40,8 +54,8 @@ impl MsgDecode for VerifiedSpeedtest { type Msg = VerifiedSpeedtestProto; } -impl MsgTimestamp>> for SpeedtestReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for SpeedtestReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp() } } @@ -52,8 +66,8 @@ impl MsgTimestamp for CellSpeedtest { } } -impl MsgTimestamp>> for SpeedtestIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for SpeedtestIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -70,8 +84,8 @@ impl MsgTimestamp for VerifiedSpeedtest { } } -impl MsgTimestamp>> for VerifiedSpeedtestProto { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for VerifiedSpeedtestProto { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -92,8 +106,9 @@ impl From for SpeedtestReqV1 { } impl TryFrom for CellSpeedtest { - type Error = Error; - fn try_from(value: SpeedtestReqV1) -> Result { + type Error = SpeedtestError; + + fn try_from(value: SpeedtestReqV1) -> Result { let timestamp = value.timestamp()?; Ok(Self { pubkey: value.pub_key.into(), @@ -107,13 +122,16 @@ impl TryFrom for CellSpeedtest { } impl TryFrom for CellSpeedtestIngestReport { - type Error = Error; - fn try_from(v: SpeedtestIngestReportV1) -> Result { + type Error = SpeedtestError; + + fn try_from(v: SpeedtestIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("ingest speedtest report"))? + .ok_or(SpeedtestError::MissingField( + "cell_speedtest_ingest_report.report", + ))? .try_into()?, }) } @@ -138,18 +156,16 @@ pub struct VerifiedSpeedtest { } impl TryFrom for VerifiedSpeedtest { - type Error = Error; - fn try_from(v: VerifiedSpeedtestProto) -> Result { - let result = SpeedtestVerificationResult::try_from(v.result).map_err(|_| { - DecodeError::unsupported_status_reason("verified_speedtest_proto", v.result) - })?; + type Error = SpeedtestError; + + fn try_from(v: VerifiedSpeedtestProto) -> Result { Ok(Self { timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("ingest verified speedtest report"))? + .ok_or(SpeedtestError::MissingField("verified_speedtest.report"))? .try_into()?, - result, + result: prost_enum(v.result, SpeedtestError::VerificationResult)?, }) } } @@ -178,17 +194,14 @@ pub mod cli { } impl TryFrom for SpeedtestAverageEntry { - type Error = Error; + type Error = SpeedtestError; - fn try_from(v: Speedtest) -> Result { + fn try_from(v: Speedtest) -> Result { Ok(Self { upload_speed_bps: v.upload_speed_bps, download_speed_bps: v.download_speed_bps, latency_ms: v.latency_ms, - timestamp: Utc - .timestamp_opt(v.timestamp as i64, 0) - .single() - .ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?, + timestamp: v.timestamp.to_timestamp()?, }) } } @@ -206,9 +219,9 @@ pub mod cli { } impl TryFrom for SpeedtestAverage { - type Error = Error; + type Error = SpeedtestError; - fn try_from(v: SpeedtestAvg) -> Result { + fn try_from(v: SpeedtestAvg) -> Result { Ok(Self { pub_key: v.pub_key.clone().into(), upload_speed_avg_bps: v.upload_speed_avg_bps, @@ -219,11 +232,8 @@ pub mod cli { .speedtests .into_iter() .map(SpeedtestAverageEntry::try_from) - .collect::>>()?, - timestamp: Utc - .timestamp_opt(v.timestamp as i64, 0) - .single() - .ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?, + .collect::, _>>()?, + timestamp: v.timestamp.to_timestamp()?, reward_multiplier: v.reward_multiplier, }) } diff --git a/file_store_oracles/src/mobile/wifi_heartbeat.rs b/file_store_oracles/src/mobile/wifi_heartbeat.rs index 1b90689c0..3a2debc09 100644 --- a/file_store_oracles/src/mobile/wifi_heartbeat.rs +++ b/file_store_oracles/src/mobile/wifi_heartbeat.rs @@ -1,8 +1,5 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode}, - Error, Result, -}; +use file_store::traits::{MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ LocationSource, WifiHeartbeatIngestReportV1, WifiHeartbeatReqV1, @@ -10,7 +7,19 @@ use helium_proto::services::poc_mobile::{ use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::traits::MsgTimestamp; +use crate::{prost_enum, traits::MsgTimestamp}; + +#[derive(thiserror::Error, Debug)] +pub enum WifiHeartbeatError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported location source: {0}")] + LocationSource(prost::UnknownEnumValue), +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct WifiHeartbeat { @@ -45,14 +54,15 @@ impl MsgDecode for WifiHeartbeatIngestReport { } impl TryFrom for WifiHeartbeat { - type Error = Error; - fn try_from(v: WifiHeartbeatReqV1) -> Result { + type Error = WifiHeartbeatError; + + fn try_from(v: WifiHeartbeatReqV1) -> Result { let location_validation_timestamp = if v.location_validation_timestamp == 0 { None } else { v.location_validation_timestamp.to_timestamp().ok() }; - let location_source = v.location_source(); + Ok(Self { pubkey: v.pub_key.into(), lat: v.lat, @@ -61,32 +71,35 @@ impl TryFrom for WifiHeartbeat { coverage_object: v.coverage_object, timestamp: v.timestamp.to_timestamp()?, location_validation_timestamp, - location_source, + location_source: prost_enum(v.location_source, WifiHeartbeatError::LocationSource)?, }) } } -impl MsgTimestamp>> for WifiHeartbeatReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for WifiHeartbeatReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp() } } impl TryFrom for WifiHeartbeatIngestReport { - type Error = Error; - fn try_from(v: WifiHeartbeatIngestReportV1) -> Result { + type Error = WifiHeartbeatError; + + fn try_from(v: WifiHeartbeatIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| Error::not_found("ingest wifi heartbeat report"))? + .ok_or(WifiHeartbeatError::MissingField( + "wifi_heartbeat_ingest_report.report", + ))? .try_into()?, }) } } -impl MsgTimestamp>> for WifiHeartbeatIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for WifiHeartbeatIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } diff --git a/file_store_oracles/src/network_common/entropy_report.rs b/file_store_oracles/src/network_common/entropy_report.rs index f47f5d47f..c4cc3a902 100644 --- a/file_store_oracles/src/network_common/entropy_report.rs +++ b/file_store_oracles/src/network_common/entropy_report.rs @@ -1,13 +1,18 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_proto::EntropyReportV1; use serde::Serialize; use crate::traits::MsgTimestamp; +#[derive(thiserror::Error, Debug)] +pub enum EntropyReportError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), +} + #[derive(Serialize, Clone, Debug)] pub struct EntropyReport { pub data: Vec, @@ -21,8 +26,8 @@ impl MsgTimestamp for EntropyReport { } } -impl MsgTimestamp>> for EntropyReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for EntropyReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp() } } @@ -32,10 +37,10 @@ impl MsgDecode for EntropyReport { } impl TryFrom for EntropyReport { - type Error = Error; + type Error = EntropyReportError; - fn try_from(v: EntropyReportV1) -> Result { - let timestamp = v.timestamp.to_timestamp()?; + fn try_from(v: EntropyReportV1) -> Result { + let timestamp = v.timestamp()?; Ok(Self { data: v.data, version: v.version, diff --git a/file_store_oracles/src/network_common/reward_manifest.rs b/file_store_oracles/src/network_common/reward_manifest.rs index c97036071..430962b2c 100644 --- a/file_store_oracles/src/network_common/reward_manifest.rs +++ b/file_store_oracles/src/network_common/reward_manifest.rs @@ -1,9 +1,30 @@ -use chrono::{DateTime, TimeZone, Utc}; -use file_store::{traits::MsgDecode, DecodeError, Error}; -use helium_proto::{self as proto, IotRewardToken, MobileRewardToken}; +use chrono::{DateTime, Utc}; +use file_store::traits::{MsgDecode, TimestampDecode, TimestampDecodeError}; +use helium_proto as proto; +use helium_proto::{IotRewardToken, MobileRewardToken}; use rust_decimal::Decimal; use serde::Serialize; +use crate::prost_enum; + +#[derive(thiserror::Error, Debug)] +pub enum RewardManifestError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("unsupported mobile token type: {0}")] + MobileTokenType(prost::UnknownEnumValue), + + #[error("unsupported iot token type: {0}")] + IotTokenType(prost::UnknownEnumValue), + + #[error("error parsing decimal: {0}")] + Decimal(#[from] rust_decimal::Error), +} + #[derive(Clone, Debug, Serialize)] pub struct RewardManifest { pub written_files: Vec, @@ -34,82 +55,59 @@ impl MsgDecode for RewardManifest { } impl TryFrom for RewardManifest { - type Error = Error; + type Error = RewardManifestError; fn try_from(value: proto::RewardManifest) -> Result { Ok(RewardManifest { written_files: value.written_files, - start_timestamp: Utc - .timestamp_opt(value.start_timestamp as i64, 0) - .single() - .ok_or(Error::Decode(DecodeError::InvalidTimestamp( - value.start_timestamp, - )))?, - end_timestamp: Utc - .timestamp_opt(value.end_timestamp as i64, 0) - .single() - .ok_or(Error::Decode(DecodeError::InvalidTimestamp( - value.end_timestamp, - )))?, + start_timestamp: value.start_timestamp.to_timestamp()?, + end_timestamp: value.end_timestamp.to_timestamp()?, epoch: value.epoch, price: value.price, reward_data: match value.reward_data { Some(proto::reward_manifest::RewardData::MobileRewardData(reward_data)) => { - let token = MobileRewardToken::try_from(reward_data.token).map_err(|_| { - DecodeError::unsupported_token_type( - "mobile_reward_manifest", - reward_data.token, - ) - })?; Some(RewardData::MobileRewardData { poc_bones_per_reward_share: reward_data .poc_bones_per_reward_share - .ok_or(DecodeError::empty_field("poc_bones_per_reward_share"))? + .ok_or(RewardManifestError::MissingField( + "mobile_reward_data.poc_bones_per_reward_share", + ))? .value - .parse() - .map_err(DecodeError::from)?, + .parse()?, boosted_poc_bones_per_reward_share: reward_data .boosted_poc_bones_per_reward_share - .ok_or(DecodeError::empty_field( - "boosted_poc_bones_per_reward_share", + .ok_or(RewardManifestError::MissingField( + "mobile_reward_data.boosted_poc_bones_per_reward_share", ))? .value - .parse() - .map_err(DecodeError::from)?, - token, + .parse()?, + token: prost_enum(reward_data.token, RewardManifestError::MobileTokenType)?, }) } Some(proto::reward_manifest::RewardData::IotRewardData(reward_data)) => { - let token = IotRewardToken::try_from(reward_data.token).map_err(|_| { - DecodeError::unsupported_token_type( - "iot_reward_manifest", - reward_data.token, - ) - })?; Some(RewardData::IotRewardData { poc_bones_per_beacon_reward_share: reward_data .poc_bones_per_beacon_reward_share - .ok_or(DecodeError::empty_field( - "poc_bones_per_beacon_reward_share", + .ok_or(RewardManifestError::MissingField( + "iot_reward_data.poc_bones_per_beacon_reward_share", ))? .value - .parse() - .map_err(DecodeError::from)?, + .parse()?, poc_bones_per_witness_reward_share: reward_data .poc_bones_per_witness_reward_share - .ok_or(DecodeError::empty_field( - "poc_bones_per_witness_reward_share", + .ok_or(RewardManifestError::MissingField( + "iot_reward_data.poc_bones_per_witness_reward_share", ))? .value - .parse() - .map_err(DecodeError::from)?, + .parse()?, dc_bones_per_share: reward_data .dc_bones_per_share - .ok_or(DecodeError::empty_field("dc_bones_per_share"))? + .ok_or(RewardManifestError::MissingField( + "iot_reward_data.dc_bones_per_share", + ))? .value - .parse() - .map_err(DecodeError::from)?, - token, + .parse()?, + token: prost_enum(reward_data.token, RewardManifestError::IotTokenType)?, }) } None => None, diff --git a/file_store_oracles/src/subscriber/subscriber_verified_mapping_event.rs b/file_store_oracles/src/subscriber/subscriber_verified_mapping_event.rs index e0fe14a3e..883fb4713 100644 --- a/file_store_oracles/src/subscriber/subscriber_verified_mapping_event.rs +++ b/file_store_oracles/src/subscriber/subscriber_verified_mapping_event.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::SubscriberVerifiedMappingEventReqV1; @@ -9,6 +8,12 @@ use serde::{Deserialize, Serialize}; use crate::traits::MsgTimestamp; +#[derive(thiserror::Error, Debug)] +pub enum SubscriberMappingError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), +} + #[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] pub struct SubscriberVerifiedMappingEvent { pub subscriber_id: Vec, @@ -21,8 +26,8 @@ impl MsgDecode for SubscriberVerifiedMappingEvent { type Msg = SubscriberVerifiedMappingEventReqV1; } -impl MsgTimestamp>> for SubscriberVerifiedMappingEventReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for SubscriberVerifiedMappingEventReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp() } } @@ -47,8 +52,9 @@ impl From for SubscriberVerifiedMappingEventReqV } impl TryFrom for SubscriberVerifiedMappingEvent { - type Error = Error; - fn try_from(v: SubscriberVerifiedMappingEventReqV1) -> Result { + type Error = SubscriberMappingError; + + fn try_from(v: SubscriberVerifiedMappingEventReqV1) -> Result { let timestamp = v.timestamp()?; Ok(Self { subscriber_id: v.subscriber_id, diff --git a/file_store_oracles/src/subscriber/subscriber_verified_mapping_event_ingest_report.rs b/file_store_oracles/src/subscriber/subscriber_verified_mapping_event_ingest_report.rs index a5125866c..e6965b031 100644 --- a/file_store_oracles/src/subscriber/subscriber_verified_mapping_event_ingest_report.rs +++ b/file_store_oracles/src/subscriber/subscriber_verified_mapping_event_ingest_report.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_proto::services::poc_mobile::{ SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1, @@ -9,9 +8,22 @@ use helium_proto::services::poc_mobile::{ use serde::{Deserialize, Serialize}; use crate::{ - subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, traits::MsgTimestamp, + subscriber_verified_mapping_event::{SubscriberMappingError, SubscriberVerifiedMappingEvent}, + traits::MsgTimestamp, }; +#[derive(thiserror::Error, Debug)] +pub enum SubscriberIngestReportError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("subscriber location: {0}")] + SubscriberLocation(#[from] SubscriberMappingError), +} + #[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] pub struct SubscriberVerifiedMappingEventIngestReport { pub received_timestamp: DateTime, @@ -22,8 +34,8 @@ impl MsgDecode for SubscriberVerifiedMappingEventIngestReport { type Msg = SubscriberVerifiedMappingEventIngestReportV1; } -impl MsgTimestamp>> for SubscriberVerifiedMappingEventIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for SubscriberVerifiedMappingEventIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -50,15 +62,16 @@ impl From impl TryFrom for SubscriberVerifiedMappingEventIngestReport { - type Error = Error; - fn try_from(v: SubscriberVerifiedMappingEventIngestReportV1) -> Result { + type Error = SubscriberIngestReportError; + + fn try_from(v: SubscriberVerifiedMappingEventIngestReportV1) -> Result { Ok(Self { received_timestamp: v.timestamp()?, report: v .report - .ok_or_else(|| { - Error::not_found("ingest SubscriberVerifiedMappingEventIngestReport report") - })? + .ok_or(SubscriberIngestReportError::MissingField( + "subscriber_verified_mapping_event_ingest_report.report", + ))? .try_into()?, }) } diff --git a/file_store_oracles/src/subscriber/unique_connections.rs b/file_store_oracles/src/subscriber/unique_connections.rs index 752c4eae2..2f1d3914c 100644 --- a/file_store_oracles/src/subscriber/unique_connections.rs +++ b/file_store_oracles/src/subscriber/unique_connections.rs @@ -1,12 +1,8 @@ use chrono::{DateTime, Utc}; +use file_store::traits::{MsgDecode, TimestampDecode, TimestampDecodeError}; use helium_crypto::PublicKeyBinary; use serde::{Deserialize, Serialize}; -use file_store::{ - traits::{MsgDecode, TimestampDecode}, - Error, -}; - pub mod proto { pub use helium_proto::services::poc_mobile::{ UniqueConnectionsIngestReportV1, UniqueConnectionsReqV1, @@ -14,6 +10,15 @@ pub mod proto { }; } +#[derive(thiserror::Error, Debug)] +pub enum UniqueConnectionsError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct UniqueConnectionsIngestReport { pub received_timestamp: DateTime, @@ -43,14 +48,16 @@ impl MsgDecode for UniqueConnectionsIngestReport { } impl TryFrom for UniqueConnectionsIngestReport { - type Error = Error; + type Error = UniqueConnectionsError; fn try_from(value: proto::UniqueConnectionsIngestReportV1) -> Result { Ok(Self { received_timestamp: value.received_timestamp.to_timestamp_millis()?, report: value .report - .ok_or_else(|| Error::not_found("ingest unique connections"))? + .ok_or(UniqueConnectionsError::MissingField( + "unique_connections_ingest_report.report", + ))? .try_into()?, }) } @@ -66,7 +73,7 @@ impl From for proto::UniqueConnectionsIngestRepor } impl TryFrom for UniqueConnectionReq { - type Error = Error; + type Error = UniqueConnectionsError; fn try_from(value: proto::UniqueConnectionsReqV1) -> Result { Ok(Self { diff --git a/file_store_oracles/src/subscriber/usage_counts.rs b/file_store_oracles/src/subscriber/usage_counts.rs index 4acdfe285..7c5d77048 100644 --- a/file_store_oracles/src/subscriber/usage_counts.rs +++ b/file_store_oracles/src/subscriber/usage_counts.rs @@ -1,9 +1,8 @@ use std::convert::TryFrom; use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - DecodeError, Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use h3o::CellIndex; use helium_crypto::PublicKeyBinary; @@ -15,6 +14,18 @@ use serde::{Deserialize, Serialize}; use crate::traits::MsgTimestamp; +#[derive(thiserror::Error, Debug)] +pub enum UniqueCountsError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("invalid cell index: {0}")] + InvalidCellIndex(#[from] h3o::error::InvalidCellIndex), +} + #[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] pub struct HexUsageStatsReq { pub hex: CellIndex, @@ -52,8 +63,8 @@ impl MsgDecode for RadioUsageStatsReq { type Msg = RadioUsageStatsReqV1; } -impl MsgTimestamp>> for HexUsageStatsReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for HexUsageStatsReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -64,8 +75,8 @@ impl MsgTimestamp for HexUsageStatsReq { } } -impl MsgTimestamp>> for RadioUsageStatsReqV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for RadioUsageStatsReqV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -77,14 +88,14 @@ impl MsgTimestamp for RadioUsageStatsReq { } impl TryFrom for HexUsageStatsReq { - type Error = Error; - fn try_from(v: HexUsageStatsReqV1) -> Result { + type Error = UniqueCountsError; + + fn try_from(v: HexUsageStatsReqV1) -> Result { let timestamp = v.timestamp()?; let epoch_start_timestamp = v.epoch_start_timestamp.to_timestamp_millis()?; let epoch_end_timestamp = v.epoch_end_timestamp.to_timestamp_millis()?; - let hex = CellIndex::try_from(v.hex).map_err(|_| { - DecodeError::FileStreamTryDecode(format!("invalid CellIndex {}", v.hex)) - })?; + let hex = CellIndex::try_from(v.hex)?; + Ok(Self { hex, service_provider_user_count: v.service_provider_user_count, @@ -123,8 +134,9 @@ impl From for HexUsageStatsReqV1 { } impl TryFrom for RadioUsageStatsReq { - type Error = Error; - fn try_from(v: RadioUsageStatsReqV1) -> Result { + type Error = UniqueCountsError; + + fn try_from(v: RadioUsageStatsReqV1) -> Result { let timestamp = v.timestamp()?; let epoch_start_timestamp = v.epoch_start_timestamp.to_timestamp_millis()?; let epoch_end_timestamp = v.epoch_end_timestamp.to_timestamp_millis()?; @@ -188,8 +200,8 @@ impl MsgDecode for RadioUsageCountsIngestReport { type Msg = RadioUsageStatsIngestReportV1; } -impl MsgTimestamp>> for HexUsageStatsIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for HexUsageStatsIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -200,8 +212,8 @@ impl MsgTimestamp for HexUsageCountsIngestReport { } } -impl MsgTimestamp>> for RadioUsageStatsIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for RadioUsageStatsIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.received_timestamp.to_timestamp_millis() } } @@ -213,15 +225,17 @@ impl MsgTimestamp for RadioUsageCountsIngestReport { } impl TryFrom for HexUsageCountsIngestReport { - type Error = Error; - fn try_from(v: HexUsageStatsIngestReportV1) -> Result { + type Error = UniqueCountsError; + + fn try_from(v: HexUsageStatsIngestReportV1) -> Result { Ok(Self { + received_timestamp: v.timestamp()?, report: v - .clone() .report - .ok_or_else(|| Error::not_found("ingest HexUsageStatsIngestReport report"))? + .ok_or(UniqueCountsError::MissingField( + "hex_usage_counts_ingest_report.report", + ))? .try_into()?, - received_timestamp: v.timestamp()?, }) } } @@ -238,15 +252,17 @@ impl From for HexUsageStatsIngestReportV1 { } impl TryFrom for RadioUsageCountsIngestReport { - type Error = Error; - fn try_from(v: RadioUsageStatsIngestReportV1) -> Result { + type Error = UniqueCountsError; + + fn try_from(v: RadioUsageStatsIngestReportV1) -> Result { Ok(Self { + received_timestamp: v.timestamp()?, report: v - .clone() .report - .ok_or_else(|| Error::not_found("ingest RadioUsageCountsIngestReport report"))? + .ok_or(UniqueCountsError::MissingField( + "hex_usage_stats_ingest_report_v_1.report", + ))? .try_into()?, - received_timestamp: v.timestamp()?, }) } } diff --git a/file_store_oracles/src/subscriber/verified_subscriber_verified_mapping_event_ingest_report.rs b/file_store_oracles/src/subscriber/verified_subscriber_verified_mapping_event_ingest_report.rs index 6d3ca5f1e..7b01c5b75 100644 --- a/file_store_oracles/src/subscriber/verified_subscriber_verified_mapping_event_ingest_report.rs +++ b/file_store_oracles/src/subscriber/verified_subscriber_verified_mapping_event_ingest_report.rs @@ -1,7 +1,6 @@ use chrono::{DateTime, Utc}; -use file_store::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - Error, Result, +use file_store::traits::{ + MsgDecode, TimestampDecode, TimestampDecodeError, TimestampDecodeResult, TimestampEncode, }; use helium_proto::services::poc_mobile::{ SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventVerificationStatus, @@ -10,10 +9,24 @@ use helium_proto::services::poc_mobile::{ use serde::{Deserialize, Serialize}; use crate::{ - subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, + subscriber_verified_mapping_event_ingest_report::{ + SubscriberIngestReportError, SubscriberVerifiedMappingEventIngestReport, + }, traits::MsgTimestamp, }; +#[derive(thiserror::Error, Debug)] +pub enum VerifiedSubscriberMappingError { + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), + + #[error("missing field: {0}")] + MissingField(&'static str), + + #[error("subscriber ingest report: {0}")] + SubscriberIngestReport(#[from] SubscriberIngestReportError), +} + #[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] pub struct VerifiedSubscriberVerifiedMappingEventIngestReport { pub report: SubscriberVerifiedMappingEventIngestReport, @@ -25,8 +38,8 @@ impl MsgDecode for VerifiedSubscriberVerifiedMappingEventIngestReport { type Msg = VerifiedSubscriberVerifiedMappingEventIngestReportV1; } -impl MsgTimestamp>> for VerifiedSubscriberVerifiedMappingEventIngestReportV1 { - fn timestamp(&self) -> Result> { +impl MsgTimestamp for VerifiedSubscriberVerifiedMappingEventIngestReportV1 { + fn timestamp(&self) -> TimestampDecodeResult { self.timestamp.to_timestamp_millis() } } @@ -54,20 +67,20 @@ impl From impl TryFrom for VerifiedSubscriberVerifiedMappingEventIngestReport { - type Error = Error; - fn try_from(v: VerifiedSubscriberVerifiedMappingEventIngestReportV1) -> Result { + type Error = VerifiedSubscriberMappingError; + + fn try_from( + v: VerifiedSubscriberVerifiedMappingEventIngestReportV1, + ) -> Result { Ok(Self { + status: v.status(), + timestamp: v.timestamp()?, report: v - .clone() .report - .ok_or_else(|| { - Error::not_found( - "ingest VerifiedSubscriberVerifiedMappingEventIngestReport report", - ) - })? + .ok_or(VerifiedSubscriberMappingError::MissingField( + "verified_subscriber_verified_mapping_event_ingest_report.report", + ))? .try_into()?, - status: v.status(), - timestamp: v.timestamp()?, }) } } diff --git a/iot_config/src/sub_dao_epoch_reward_info.rs b/iot_config/src/sub_dao_epoch_reward_info.rs index aaeabf339..25b8d7c6f 100644 --- a/iot_config/src/sub_dao_epoch_reward_info.rs +++ b/iot_config/src/sub_dao_epoch_reward_info.rs @@ -1,6 +1,6 @@ use crate::EpochInfo; use chrono::{DateTime, Utc}; -use file_store::traits::{TimestampDecode, TimestampEncode}; +use file_store::traits::{TimestampDecode, TimestampDecodeError, TimestampEncode}; use helium_proto::services::sub_dao::SubDaoEpochRewardInfo as SubDaoEpochRewardInfoProto; use rust_decimal::prelude::*; use std::ops::Range; @@ -27,8 +27,8 @@ pub struct RawSubDaoEpochRewardInfo { #[derive(thiserror::Error, Debug)] pub enum SubDaoRewardInfoParseError { - #[error("file_store: {0}")] - FileStore(#[from] file_store::Error), + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), } impl From for SubDaoEpochRewardInfoProto { diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index c2770e4f3..0c4ab2e0f 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -660,21 +660,7 @@ fn verify_beacon_payload( tracing::debug!("generated beacon {:?}", generated_beacon); // cast the received beaconers report into a beacon - let reported_beacon: beacon::Beacon = - match beacon_report.to_beacon(entropy_start, entropy_version) { - Ok(res) => res, - Err(e) => { - tracing::warn!( - "failed to cast report to beacon, reason: {:?}, pub_key: {:?}", - e, - beacon_report.pub_key - ); - return Err(InvalidResponse { - reason: InvalidReason::InvalidPacket, - details: None, - }); - } - }; + let reported_beacon = beacon_report.to_beacon(entropy_start, entropy_version); tracing::debug!("reported beacon {:?}", reported_beacon); // compare reports diff --git a/mobile_config/src/key_cache.rs b/mobile_config/src/key_cache.rs index eb09dc2ee..0b3077fbd 100644 --- a/mobile_config/src/key_cache.rs +++ b/mobile_config/src/key_cache.rs @@ -174,7 +174,9 @@ mod tests { impl MsgVerify for VerifiedBad { fn verify(&self, _verifier: &helium_crypto::PublicKey) -> file_store::Result { - Err(file_store::Error::NotFound("not found".to_string())) + Err(file_store::Error::ExternalError( + "testing bad verification".into(), + )) } } diff --git a/mobile_config/src/sub_dao_epoch_reward_info.rs b/mobile_config/src/sub_dao_epoch_reward_info.rs index adfb5f323..cfc82e831 100644 --- a/mobile_config/src/sub_dao_epoch_reward_info.rs +++ b/mobile_config/src/sub_dao_epoch_reward_info.rs @@ -1,6 +1,6 @@ use crate::EpochInfo; use chrono::{DateTime, Utc}; -use file_store::traits::{TimestampDecode, TimestampEncode}; +use file_store::traits::{TimestampDecode, TimestampDecodeError, TimestampEncode}; use helium_proto::services::sub_dao::SubDaoEpochRewardInfo as SubDaoEpochRewardInfoProto; use rust_decimal::prelude::*; use std::ops::Range; @@ -27,8 +27,8 @@ pub struct RawSubDaoEpochRewardInfo { #[derive(thiserror::Error, Debug)] pub enum SubDaoRewardInfoParseError { - #[error("file_store: {0}")] - FileStore(#[from] file_store::Error), + #[error("invalid timestamp: {0}")] + Timestamp(#[from] TimestampDecodeError), } impl From for SubDaoEpochRewardInfoProto { diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index 89e63868d..276192deb 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -142,7 +142,7 @@ impl Indexer { manifest .written_files .into_iter() - .map(|file_name| FileInfo::from_str(&file_name)), + .map(|file_name| FileInfo::from_str(&file_name).map_err(file_store::Error::from)), ) .boxed();