Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion file_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ async-trait = { workspace = true }
derive_builder = { workspace = true }
retainer = { workspace = true }
uuid = { workspace = true }
h3o = { workspace = true }
task-manager = { path = "../task_manager" }
tls-init = { path = "../tls_init" }
bs58 = { workspace = true }
Expand Down
210 changes: 69 additions & 141 deletions file_store/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,179 +1,101 @@
use prost::UnknownEnumValue;
use std::path::{Path, PathBuf};

use thiserror::Error;

use crate::file_info::FileInfoError;

pub type Result<T = ()> = std::result::Result<T, Error>;

#[derive(Error, Debug)]
pub enum Error {
#[error("io error")]
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("encode error")]
Encode(#[from] EncodeError),
#[error("decode error")]
Decode(#[from] DecodeError),
#[error("unknown enum value")]
UnknownEnumValue(#[from] UnknownEnumValue),
#[error("not found")]
NotFound(String),
#[error("crypto error")]

#[error("prost encode error: {0}")]
Encode(#[from] prost::EncodeError),

#[error("file info error: {0}")]
FileInfo(#[from] FileInfoError),

#[error("aws error: {0}")]
Aws(#[from] AwsError),

#[error("crypto error: {0}")]
Crypto(Box<helium_crypto::Error>),
#[error("aws error")]
Aws(#[source] Box<aws_sdk_s3::Error>),
#[error("mpsc channel error")]
Channel,
#[error("no manifest")]
NoManifest,
#[error("send timeout")]
SendTimeout,

#[error("channel error: {0}")]
Channel(#[from] ChannelError),

#[error("error building file info poller: {0}")]
FileInfoPollerError(#[from] crate::file_info_poller::FileInfoPollerConfigBuilderError),

#[cfg(feature = "sqlx-postgres")]
#[error("db error")]
DbError(#[from] sqlx::Error),
#[error("channel send error")]
SendError(#[from] tokio::sync::mpsc::error::SendError<()>),
//Generic error wrapper for external (out of that repository) traits implementations.
//Not recommended for internal use!

// Generic error wrapper for external (out of that repository) traits implementations.
// Not recommended for internal use!
#[error("external error")]
ExternalError(#[from] Box<dyn std::error::Error + Send + Sync>),
}

#[derive(Error, Debug)]
pub enum DecodeError {
#[error("prost error")]
Prost(#[from] prost::DecodeError),
#[error("file info error")]
FileInfo(String),
#[error("uri error")]
Uri(#[from] http::uri::InvalidUri),
#[error("integer conversion error")]
FromInt(#[from] std::num::TryFromIntError),
#[error("error parsing decimal")]
IntoDecimal(#[from] rust_decimal::Error),
#[error("empty field: {0}")]
EmptyField(&'static str),
#[error("unsupported region, type: {0}, value: {1}")]
UnsupportedRegion(String, i32),
#[error("unsupported datarate, type: {0}, value: {1}")]
UnsupportedDataRate(String, i32),
#[error("unsupported invalid_reason, type: {0}, value: {1}")]
UnsupportedInvalidReason(String, i32),
#[error("unsupported participant_side, type: {0}, value: {1}")]
UnsupportedParticipantSide(String, i32),
#[error("unsupported verification status, type: {0}, value: {1}")]
UnsupportedStatusReason(String, i32),
#[error("unsupported signal level, type: {0}, value: {1}")]
UnsupportedSignalLevel(String, i32),
#[error("invalid unix timestamp {0}")]
InvalidTimestamp(u64),
#[error("Uuid error: {0}")]
UuidError(#[from] uuid::Error),
#[error("Invalid cell index error: {0}")]
InvalidCellIndexError(#[from] h3o::error::InvalidCellIndex),
#[error("unsupported packet type, type: {0}, value: {1}")]
UnsupportedPacketType(String, i32),
#[error("file stream try decode error: {0}")]
FileStreamTryDecode(String),
#[error("unsupported token type {0}")]
UnsupportedTokenType(String, i32),
}

#[derive(Error, Debug)]
pub enum EncodeError {
#[error("prost error")]
Prost(#[from] prost::EncodeError),
#[error("json error")]
Json(#[from] serde_json::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum AwsError {
#[error("s3: {0}")]
S3(Box<aws_sdk_s3::Error>),

macro_rules! from_err {
($to_type:ty, $from_type:ty) => {
impl From<$from_type> for Error {
fn from(v: $from_type) -> Self {
Self::from(<$to_type>::from(v))
}
}
};
#[error("streaming: {0}")]
Streaming(#[from] aws_sdk_s3::primitives::ByteStreamError),
}

// Encode Errors
from_err!(EncodeError, prost::EncodeError);
from_err!(EncodeError, serde_json::Error);

// Decode Errors
from_err!(DecodeError, prost::DecodeError);

impl Error {
pub fn not_found<E: ToString>(msg: E) -> Self {
Self::NotFound(msg.to_string())
}
pub fn channel() -> Error {
Error::Channel
}

pub fn s3_error<T>(err: T) -> Self
impl AwsError {
pub fn s3_error<T>(err: T) -> Error
where
T: Into<aws_sdk_s3::Error>,
{
Self::from(err.into())
}

pub fn file_stream_try_decode<E: ToString>(msg: E) -> Error {
DecodeError::file_stream_try_decode(msg)
Error::from(err.into())
}
}

impl DecodeError {
pub fn file_info<E: ToString>(msg: E) -> Error {
Error::Decode(Self::FileInfo(msg.to_string()))
}

pub fn unsupported_region<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedRegion(msg1.to_string(), msg2))
}

pub fn unsupported_datarate<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedDataRate(msg1.to_string(), msg2))
}

pub fn unsupported_packet_type<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedPacketType(msg1.to_string(), msg2))
}

pub fn unsupported_participant_side<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedParticipantSide(msg1.to_string(), msg2))
}
#[derive(thiserror::Error, Debug)]
pub enum ChannelError {
#[error("failed to send {prefix} for process {process}")]
PollerSendError { prefix: String, process: String },

pub fn unsupported_invalid_reason<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedInvalidReason(msg1.to_string(), msg2))
}
#[error("channel closed sink {name}")]
SinkClosed { name: String },

pub fn invalid_timestamp(v: u64) -> Error {
Error::Decode(Self::InvalidTimestamp(v))
}

pub fn unsupported_status_reason<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedInvalidReason(msg1.to_string(), msg2))
}
#[error("timeout for sink {name}")]
SinkTimeout { name: String },

pub fn unsupported_signal_level(msg1: impl ToString, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedSignalLevel(msg1.to_string(), msg2))
}
#[error("channel closed for upload {path}")]
UploadClosed { path: PathBuf },
}

pub fn file_stream_try_decode<E: ToString>(msg: E) -> Error {
Error::Decode(Self::FileStreamTryDecode(msg.to_string()))
impl ChannelError {
pub fn poller_send_error(prefix: &str, process: &str) -> Error {
Error::Channel(Self::PollerSendError {
prefix: prefix.to_owned(),
process: process.to_owned(),
})
}

pub fn unsupported_invalidated_reason<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedInvalidReason(msg1.to_string(), msg2))
pub fn sink_closed(name: &str) -> Error {
Error::Channel(Self::SinkClosed {
name: name.to_owned(),
})
}

pub const fn empty_field(field: &'static str) -> Error {
Error::Decode(Self::EmptyField(field))
pub fn sink_timeout(name: &str) -> Error {
Error::Channel(Self::SinkTimeout {
name: name.to_owned(),
})
}

pub fn unsupported_token_type<E: ToString>(msg1: E, msg2: i32) -> Error {
Error::Decode(Self::UnsupportedTokenType(msg1.to_string(), msg2))
pub fn upload_closed(path: &Path) -> Error {
Error::Channel(Self::UploadClosed {
path: path.to_owned(),
})
}
}

Expand All @@ -185,6 +107,12 @@ impl From<helium_crypto::Error> for Error {

impl From<aws_sdk_s3::Error> for Error {
fn from(err: aws_sdk_s3::Error) -> Self {
Self::Aws(Box::new(err))
Self::Aws(AwsError::S3(Box::new(err)))
}
}

impl From<aws_sdk_s3::primitives::ByteStreamError> for Error {
fn from(err: aws_sdk_s3::primitives::ByteStreamError) -> Self {
Self::from(AwsError::Streaming(err))
}
}
49 changes: 33 additions & 16 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{traits::TimestampDecode, DecodeError, Error, Result};
use crate::traits::{TimestampDecode, TimestampDecodeError};
use chrono::{DateTime, Utc};
use regex::Regex;
use serde::Serialize;
use std::{fmt, os::unix::fs::MetadataExt, path::Path, str::FromStr, sync::LazyLock};
use std::{fmt, io, os::unix::fs::MetadataExt, path::Path, str::FromStr, sync::LazyLock};

#[derive(Debug, Clone, Serialize)]
pub struct FileInfo {
Expand All @@ -14,17 +14,35 @@ pub struct FileInfo {

static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"([a-z,\d,_]+)\.(\d+)(\.gz)?").unwrap());

#[derive(thiserror::Error, Debug)]
pub enum FileInfoError {
#[error("invalid timestamp: {0}")]
Timestamp(#[from] TimestampDecodeError),

#[error("invalid timestamp string: {0}")]
TimestampStr(#[from] std::num::ParseIntError),

#[error("filename did not match regex: {0}")]
Regex(String),

#[error("no file name found")]
MissingFilename,

#[error("IO: {0}")]
Io(#[from] io::Error),
}

impl FromStr for FileInfo {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
type Err = FileInfoError;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let key = s.to_string();
let cap = RE
.captures(s)
.ok_or_else(|| DecodeError::file_info("failed to decode file info"))?;
.ok_or_else(|| FileInfoError::Regex(key.clone()))?;
let prefix = cap[1].to_owned();
let timestamp = u64::from_str(&cap[2])
.map_err(|_| DecodeError::file_info("failed to decode timestamp"))?
.to_timestamp_millis()?;

let timestamp = u64::from_str(&cap[2])?.to_timestamp_millis()?;
Ok(Self {
key,
prefix,
Expand Down Expand Up @@ -66,22 +84,21 @@ impl<T: Into<String>> From<(T, DateTime<Utc>)> for FileInfo {
}

impl TryFrom<&aws_sdk_s3::types::Object> for FileInfo {
type Error = Error;
fn try_from(value: &aws_sdk_s3::types::Object) -> Result<Self> {
type Error = FileInfoError;

fn try_from(value: &aws_sdk_s3::types::Object) -> std::result::Result<Self, Self::Error> {
let size = value.size().unwrap_or_default() as usize;
let key = value
.key
.as_ref()
.ok_or_else(|| Error::not_found("no file name found"))?;
let key = value.key.as_ref().ok_or(FileInfoError::MissingFilename)?;
let mut info = Self::from_str(key)?;
info.size = size;
Ok(info)
}
}

impl TryFrom<&Path> for FileInfo {
type Error = Error;
fn try_from(value: &Path) -> Result<Self> {
type Error = FileInfoError;

fn try_from(value: &Path) -> std::result::Result<Self, Self::Error> {
let mut info = Self::from_str(&value.to_string_lossy())?;
info.size = value.metadata()?.size() as usize;
Ok(info)
Expand Down
8 changes: 5 additions & 3 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{traits::MsgDecode, BucketClient, Error, FileInfo, Result};
use crate::{error::ChannelError, traits::MsgDecode, BucketClient, Error, FileInfo, Result};
use aws_sdk_s3::primitives::ByteStream;
use chrono::{DateTime, Utc};
use derive_builder::Builder;
Expand Down Expand Up @@ -284,6 +284,7 @@ where
async fn run(mut self, shutdown: triggered::Listener) -> Result {
let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION);
let process_name = self.config.process_name.clone();
let prefix = self.config.prefix.clone();

tracing::info!(
r#type = self.config.prefix,
Expand All @@ -300,7 +301,7 @@ where
break;
}
_ = cleanup_trigger.tick() => self.clean(&self.cache).await?,
result = futures::future::try_join(sender.reserve().map_err(Error::from), self.get_next_file()) => {
result = futures::future::try_join(sender.reserve().map_err(|_| ChannelError::poller_send_error(&prefix, &process_name)), self.get_next_file()) => {
let (permit, file) = result?;
let byte_stream = self.config.store.get_raw(file.clone()).await?;
let data = self.config.parser.parse(byte_stream).await?;
Expand Down Expand Up @@ -385,7 +386,8 @@ pub struct MsgDecodeFileInfoPollerParser;
#[async_trait::async_trait]
impl<T> FileInfoPollerParser<T> for MsgDecodeFileInfoPollerParser
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
T: MsgDecode + Send + Sync + 'static,
<T as TryFrom<T::Msg>>::Error: std::error::Error,
{
async fn parse(&self, byte_stream: ByteStream) -> Result<Vec<T>> {
Ok(crate::stream_source(byte_stream)
Expand Down
Loading