Skip to content

Commit bd88e65

Browse files
authored
Remove FileStore struct from file_store (#1048)
* Remove FileStore struct from file_store * Remove region from settings, just inherit region from environment * move all low level functions to lib.rs * Fix aws_local * fix PriceTracker * add lock file * fix price server * Update ingest * Added reexported Client type from file-store * update price to use file_store::Client * updateig mobile packet verifier * update mobile-verifier * update reward-indexer * update iot packet verifier * update poc_entropy and iot_verifier * update aws_local to file_store_client * fixing tests and clippy * run formatter * remove commented out code * update file_info_poller builder to take Into<String> for bucket * update generics on file_info_poller and minor refactor of file uploader * update bucket args to be impl Into<String> in file_store/src/lib.rs * refactor to file_store_client
1 parent 49c9a91 commit bd88e65

File tree

42 files changed

+698
-639
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+698
-639
lines changed

aws_local/src/lib.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use anyhow::{anyhow, Result};
2-
use aws_config::meta::region::RegionProviderChain;
3-
use aws_config::{BehaviorVersion, Region};
2+
use aws_config::BehaviorVersion;
43
use aws_sdk_s3::Client;
54
use chrono::Utc;
65
use file_store::traits::MsgBytes;
7-
use file_store::{file_sink, file_upload, FileStore, FileType, Settings};
6+
use file_store::{file_sink, file_upload, FileType, Settings};
87
use std::env;
98
use std::path::Path;
109
use std::sync::Arc;
@@ -27,22 +26,17 @@ pub fn gen_bucket_name() -> String {
2726
// Used to create mocked aws buckets and files.
2827
pub struct AwsLocal {
2928
pub fs_settings: Settings,
30-
pub file_store: FileStore,
31-
pub aws_client: aws_sdk_s3::Client,
29+
pub file_store_client: file_store::Client,
30+
bucket: String,
3231
}
3332

3433
impl AwsLocal {
3534
async fn create_aws_client(settings: &Settings) -> aws_sdk_s3::Client {
36-
let region = Region::new(settings.region.clone());
37-
let region_provider = RegionProviderChain::first_try(region).or_default_provider();
38-
39-
let config = aws_config::defaults(BehaviorVersion::latest())
40-
.region(region_provider)
41-
.load()
42-
.await;
35+
let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
4336

4437
let mut s3_config = aws_sdk_s3::config::Builder::from(&config)
4538
.force_path_style(true)
39+
.region(aws_config::Region::new("us-east-1"))
4640
.endpoint_url(settings.endpoint.as_ref().expect("endpoint"));
4741

4842
let creds = aws_sdk_s3::config::Credentials::builder()
@@ -62,20 +56,16 @@ impl AwsLocal {
6256

6357
pub async fn new(endpoint: &str, bucket: &str) -> AwsLocal {
6458
let settings = Settings {
65-
bucket: bucket.into(),
6659
endpoint: Some(endpoint.into()),
67-
region: "us-east-1".into(),
6860
access_key_id: Some("random".into()),
6961
secret_access_key: Some("random2".into()),
7062
};
7163
let client = Self::create_aws_client(&settings).await;
7264
client.create_bucket().bucket(bucket).send().await.unwrap();
7365
AwsLocal {
74-
aws_client: client,
66+
file_store_client: client,
7567
fs_settings: settings.clone(),
76-
file_store: file_store::FileStore::from_settings(&settings)
77-
.await
78-
.unwrap(),
68+
bucket: bucket.to_string(),
7969
}
8070
}
8171

@@ -95,9 +85,7 @@ impl AwsLocal {
9585
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
9686

9787
let (file_upload, file_upload_server) =
98-
file_upload::FileUpload::from_settings_tm(&self.fs_settings)
99-
.await
100-
.unwrap();
88+
file_upload::FileUpload::new(self.file_store_client.clone(), self.bucket.clone()).await;
10189

10290
let (item_sink, item_server) =
10391
file_sink::FileSinkBuilder::new(file_type, &tmp_dir_path, file_upload, metric_name)
@@ -158,7 +146,7 @@ impl AwsLocal {
158146
if !file_path.is_file() {
159147
return Err(anyhow!("File {path_str} is not a file"));
160148
}
161-
self.file_store.put(file_path).await?;
149+
file_store::put_file(&self.file_store_client, &self.bucket, file_path).await?;
162150

163151
Ok(())
164152
}

file_store/src/cli/bucket.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
mobile_radio_threshold::VerifiedRadioThresholdIngestReport,
77
speedtest::{cli::SpeedtestAverage, CellSpeedtest},
88
traits::MsgDecode,
9-
Error, FileInfoStream, FileStore, FileType, Result, Settings,
9+
Error, FileInfoStream, FileType, Result, Settings,
1010
};
1111
use chrono::{NaiveDateTime, TimeZone, Utc};
1212
use futures::{stream::TryStreamExt, StreamExt, TryFutureExt};
@@ -73,8 +73,10 @@ pub struct FileFilter {
7373
}
7474

7575
impl FileFilter {
76-
pub fn list(&self, store: &FileStore) -> FileInfoStream {
77-
store.list(
76+
pub fn list(&self, client: &aws_sdk_s3::Client, bucket: &str) -> FileInfoStream {
77+
crate::list_files(
78+
client,
79+
bucket,
7880
&self.prefix,
7981
self.after.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
8082
self.before.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
@@ -85,37 +87,40 @@ impl FileFilter {
8587
/// List keys in a given bucket
8688
#[derive(Debug, clap::Args)]
8789
pub struct List {
90+
#[clap(short)]
91+
bucket: String,
8892
#[clap(flatten)]
8993
filter: FileFilter,
9094
}
9195

9296
impl List {
9397
pub async fn run(&self, settings: &Settings) -> Result {
94-
let store = FileStore::from_settings(settings).await?;
95-
let mut file_infos = self.filter.list(&store);
98+
let client = settings.connect().await;
99+
let mut file_infos = self.filter.list(&client, &self.bucket);
96100
let mut ser = serde_json::Serializer::new(io::stdout());
97101
let mut seq = ser.serialize_seq(None)?;
98102
while let Some(info) = file_infos.try_next().await? {
99103
seq.serialize_element(&info)?;
100104
}
101105
seq.end()?;
102106
Ok(())
103-
// print_json(&file_infos)
104107
}
105108
}
106109

107110
/// Put one or more files in a given bucket
108111
#[derive(Debug, clap::Args)]
109112
pub struct Put {
113+
#[clap(short)]
114+
bucket: String,
110115
/// The files to upload to the bucket
111116
files: Vec<PathBuf>,
112117
}
113118

114119
impl Put {
115120
pub async fn run(&self, settings: &Settings) -> Result {
116-
let file_store = FileStore::from_settings(settings).await?;
121+
let client = settings.connect().await;
117122
for file in self.files.iter() {
118-
file_store.put(file).await?;
123+
crate::put_file(&client, &self.bucket, file).await?;
119124
}
120125
Ok(())
121126
}
@@ -132,9 +137,9 @@ pub struct Remove {
132137

133138
impl Remove {
134139
pub async fn run(&self, settings: &Settings) -> Result {
135-
let file_store = FileStore::from_settings(settings).await?;
140+
let client = settings.connect().await;
136141
for key in self.keys.iter() {
137-
file_store.remove(key).await?;
142+
crate::remove_file(&client, &self.bucket, key).await?;
138143
}
139144
Ok(())
140145
}
@@ -143,6 +148,7 @@ impl Remove {
143148
/// Get one or more files from a given bucket to a given folder
144149
#[derive(Debug, clap::Args)]
145150
pub struct Get {
151+
bucket: String,
146152
/// The target folder to download files to
147153
dest: PathBuf,
148154
#[clap(flatten)]
@@ -151,24 +157,27 @@ pub struct Get {
151157

152158
impl Get {
153159
pub async fn run(&self, settings: &Settings) -> Result {
154-
let store = FileStore::from_settings(settings).await?;
155-
let file_infos = self.filter.list(&store);
160+
let client = settings.connect().await;
161+
let file_infos = self.filter.list(&client, &self.bucket);
156162
file_infos
157-
.map_ok(|info| (store.clone(), info))
158-
.try_for_each_concurrent(5, |(store, info)| async move {
163+
.map_ok(|info| (client.clone(), self.bucket.clone(), info))
164+
.try_for_each_concurrent(5, |(client, bucket, info)| async move {
159165
fs::OpenOptions::new()
160166
.write(true)
161167
.create(true)
162168
.truncate(true)
163169
.open(&self.dest.join(Path::new(&info.key)))
164170
.map_err(Error::from)
165171
.and_then(|mut file| {
166-
store.get_raw(&info.key).and_then(|stream| async move {
167-
let mut reader = tokio::io::BufReader::new(stream.into_async_read());
168-
tokio::io::copy(&mut reader, &mut file)
169-
.map_err(Error::from)
170-
.await
171-
})
172+
crate::get_raw_file(&client, &bucket, &info.key).and_then(
173+
|stream| async move {
174+
let mut reader =
175+
tokio::io::BufReader::new(stream.into_async_read());
176+
tokio::io::copy(&mut reader, &mut file)
177+
.map_err(Error::from)
178+
.await
179+
},
180+
)
172181
})
173182
.map_ok(|_| ())
174183
.await
@@ -181,6 +190,7 @@ impl Get {
181190
/// Locate specific records in a time range
182191
#[derive(Debug, clap::Args)]
183192
pub struct Locate {
193+
bucket: String,
184194
gateway: PublicKey,
185195

186196
#[clap(flatten)]
@@ -189,12 +199,11 @@ pub struct Locate {
189199

190200
impl Locate {
191201
pub async fn run(&self, settings: &Settings) -> Result {
192-
let store = FileStore::from_settings(settings).await?;
193-
let file_infos = self.filter.list(&store);
202+
let client = settings.connect().await;
203+
let file_infos = self.filter.list(&client, &self.bucket);
194204
let prefix = self.filter.prefix.clone();
195205
let gateway = &self.gateway.clone();
196-
let mut events = store
197-
.source(file_infos)
206+
let mut events = crate::source_files(&client, &self.bucket, file_infos)
198207
.map_ok(|buf| (buf, gateway))
199208
.try_filter_map(|(buf, gateway)| {
200209
let prefix = prefix.clone();

file_store/src/file_info_poller.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{file_store, traits::MsgDecode, Error, FileInfo, FileStore, Result};
1+
use crate::{traits::MsgDecode, Error, FileInfo, Result};
22
use aws_sdk_s3::primitives::ByteStream;
33
use chrono::{DateTime, Utc};
44
use derive_builder::Builder;
@@ -130,11 +130,19 @@ pub struct FileInfoPollerConfig<Message, State, Store, Parser> {
130130
p: PhantomData<Message>,
131131
}
132132

133+
impl<Message, State, Parser>
134+
FileInfoPollerConfigBuilder<Message, State, FileStoreInfoPollerStore, Parser>
135+
{
136+
pub fn file_store(self, client: crate::Client, bucket: impl Into<String>) -> Self {
137+
self.store(FileStoreInfoPollerStore::new(client, bucket))
138+
}
139+
}
140+
133141
#[derive(Clone)]
134142
pub struct FileInfoPollerServer<
135143
Message,
136144
State,
137-
Store = FileStore,
145+
Store = FileStoreInfoPollerStore,
138146
Parser = MsgDecodeFileInfoPollerParser,
139147
> {
140148
config: FileInfoPollerConfig<Message, State, Store, Parser>,
@@ -179,11 +187,13 @@ where
179187
}
180188
}
181189

182-
impl<Message, State, Parser> ManagedTask for FileInfoPollerServer<Message, State, FileStore, Parser>
190+
impl<Message, State, Store, Parser> ManagedTask
191+
for FileInfoPollerServer<Message, State, Store, Parser>
183192
where
184193
Message: Send + Sync + 'static,
185194
State: FileInfoPollerState,
186195
Parser: FileInfoPollerParser<Message>,
196+
Store: FileInfoPollerStore,
187197
{
188198
fn start_task(
189199
self: Box<Self>,
@@ -339,7 +349,7 @@ where
339349
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
340350
{
341351
async fn parse(&self, byte_stream: ByteStream) -> Result<Vec<T>> {
342-
Ok(file_store::stream_source(byte_stream)
352+
Ok(crate::stream_source(byte_stream)
343353
.filter_map(|msg| async {
344354
msg.map_err(|err| {
345355
tracing::error!(
@@ -374,7 +384,7 @@ where
374384
T: helium_proto::Message + Default,
375385
{
376386
async fn parse(&self, byte_stream: ByteStream) -> Result<Vec<T>> {
377-
Ok(file_store::stream_source(byte_stream)
387+
Ok(crate::stream_source(byte_stream)
378388
.filter_map(|msg| async {
379389
msg.map_err(|err| {
380390
tracing::error!(
@@ -409,21 +419,35 @@ async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) {
409419
cache.insert(file_info.key.clone(), true, CACHE_TTL).await;
410420
}
411421

422+
pub struct FileStoreInfoPollerStore {
423+
client: crate::Client,
424+
bucket: String,
425+
}
426+
427+
impl FileStoreInfoPollerStore {
428+
fn new(client: crate::Client, bucket: impl Into<String>) -> Self {
429+
Self {
430+
client,
431+
bucket: bucket.into(),
432+
}
433+
}
434+
}
435+
412436
#[async_trait::async_trait]
413-
impl FileInfoPollerStore for FileStore {
437+
impl FileInfoPollerStore for FileStoreInfoPollerStore {
414438
async fn list_all<A, B>(&self, file_type: &str, after: A, before: B) -> Result<Vec<FileInfo>>
415439
where
416440
A: Into<Option<DateTime<Utc>>> + Send + Sync + Copy,
417441
B: Into<Option<DateTime<Utc>>> + Send + Sync + Copy,
418442
{
419-
self.list_all(file_type, after, before).await
443+
crate::list_all_files(&self.client, &self.bucket, file_type, after, before).await
420444
}
421445

422446
async fn get_raw<K>(&self, key: K) -> Result<ByteStream>
423447
where
424448
K: Into<String> + Send + Sync,
425449
{
426-
self.get_raw(key).await
450+
crate::get_raw_file(&self.client, &self.bucket, key).await
427451
}
428452
}
429453

0 commit comments

Comments
 (0)