Skip to content

Commit 8496dce

Browse files
authored
Add region back in file_store::Settings and add tracking of new clien… (#1073)
* Add region back in file_store::Settings and add tracking of new clients to reduce aws credential caches * Update aws_local and clippy fixes * Re-add comment for force_path_style * Add debug statements * Add BucketClient to file_store and update mobile-verifier to use it * only set region if Option is Some * Updated aws_local::AwsLocal::new to take region as a parameter
1 parent de9a487 commit 8496dce

File tree

25 files changed

+266
-161
lines changed

25 files changed

+266
-161
lines changed

aws_local/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ prost = { workspace = true }
1616
anyhow = { workspace = true }
1717
uuid = { workspace = true }
1818
tempfile = { workspace = true }
19-
file-store = { path = "../file_store", features = ["local"] }
19+
file-store = { path = "../file_store"}

aws_local/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ impl AwsLocal {
5353
Client::from_conf(s3_config.build())
5454
}
5555

56-
pub async fn new(endpoint: &str, bucket: &str) -> AwsLocal {
56+
pub async fn new(region: &str, endpoint: &str, bucket: &str) -> AwsLocal {
5757
let settings = Settings {
58+
region: Some(region.into()),
5859
endpoint: Some(endpoint.into()),
5960
access_key_id: Some("random".into()),
6061
secret_access_key: Some("random2".into()),

file_store/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,4 @@ tempfile = { workspace = true }
5252

5353
[features]
5454
default = ["sqlx-postgres"]
55-
local = []
5655
sqlx-postgres = ["sqlx/postgres"]

file_store/src/bucket_client.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::path::Path;
2+
3+
use aws_sdk_s3::primitives::ByteStream;
4+
use chrono::{DateTime, Utc};
5+
6+
use crate::{error::Result, BytesMutStream, FileInfo, FileInfoStream};
7+
8+
#[derive(Clone, Debug)]
9+
pub struct BucketClient {
10+
pub client: aws_sdk_s3::Client,
11+
pub bucket: String,
12+
}
13+
14+
impl BucketClient {
15+
pub async fn new(
16+
bucket: String,
17+
region: Option<String>,
18+
endpoint: Option<String>,
19+
access_key_id: Option<String>,
20+
secret_access_key: Option<String>,
21+
) -> Self {
22+
let client = crate::new_client(region, endpoint, access_key_id, secret_access_key).await;
23+
Self { client, bucket }
24+
}
25+
26+
pub fn list_files<A, B>(&self, prefix: impl Into<String>, after: A, before: B) -> FileInfoStream
27+
where
28+
A: Into<Option<DateTime<Utc>>> + Copy,
29+
B: Into<Option<DateTime<Utc>>> + Copy,
30+
{
31+
crate::list_files(&self.client, self.bucket.clone(), prefix, after, before)
32+
}
33+
34+
pub async fn list_all_files<A, B>(
35+
&self,
36+
prefix: impl Into<String>,
37+
after: A,
38+
before: B,
39+
) -> Result<Vec<FileInfo>>
40+
where
41+
A: Into<Option<DateTime<Utc>>> + Copy,
42+
B: Into<Option<DateTime<Utc>>> + Copy,
43+
{
44+
crate::list_all_files(&self.client, self.bucket.clone(), prefix, after, before).await
45+
}
46+
47+
pub async fn put_file(&self, file: &Path) -> Result {
48+
crate::put_file(&self.client, self.bucket.clone(), file).await
49+
}
50+
51+
pub async fn remove_file(&self, key: impl Into<String>) -> Result {
52+
crate::remove_file(&self.client, self.bucket.clone(), key).await
53+
}
54+
55+
pub async fn get_raw_file(&self, key: impl Into<String>) -> Result<ByteStream> {
56+
crate::get_raw_file(&self.client, self.bucket.clone(), key).await
57+
}
58+
59+
pub async fn get_file(&self, key: impl Into<String>) -> Result<BytesMutStream> {
60+
crate::get_file(&self.client, self.bucket.clone(), key).await
61+
}
62+
63+
pub fn source_files(&self, infos: FileInfoStream) -> BytesMutStream {
64+
crate::source_files(&self.client, self.bucket.clone(), infos)
65+
}
66+
67+
pub fn source_files_unordered(&self, workers: usize, infos: FileInfoStream) -> BytesMutStream {
68+
crate::source_files_unordered(&self.client, self.bucket.clone(), workers, infos)
69+
}
70+
71+
pub async fn stream_single_file(&self, file_info: FileInfo) -> Result<BytesMutStream> {
72+
crate::stream_single_file(&self.client, self.bucket.clone(), file_info).await
73+
}
74+
}

file_store/src/file_info_poller.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{traits::MsgDecode, Error, FileInfo, Result};
1+
use crate::{traits::MsgDecode, BucketClient, Error, FileInfo, Result};
22
use aws_sdk_s3::primitives::ByteStream;
33
use chrono::{DateTime, Utc};
44
use derive_builder::Builder;
@@ -173,6 +173,13 @@ impl<Message, State, Parser>
173173
pub fn file_store(self, client: crate::Client, bucket: impl Into<String>) -> Self {
174174
self.store(FileStoreInfoPollerStore::new(client, bucket))
175175
}
176+
177+
pub fn bucket_client(self, bucket_client: BucketClient) -> Self {
178+
self.store(FileStoreInfoPollerStore::new(
179+
bucket_client.client,
180+
bucket_client.bucket,
181+
))
182+
}
176183
}
177184

178185
#[derive(Clone)]

file_store/src/file_source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ mod test {
100100
//
101101

102102
let settings = Settings {
103+
region: None,
103104
endpoint: None,
104105
access_key_id: None,
105106
secret_access_key: None,

file_store/src/file_upload.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Error, Result};
1+
use crate::{BucketClient, Error, Result};
22
use futures::{StreamExt, TryFutureExt};
33
use std::{
44
path::{Path, PathBuf},
@@ -43,6 +43,10 @@ impl FileUpload {
4343
)
4444
}
4545

46+
pub async fn from_bucket_client(bucket_client: BucketClient) -> (Self, FileUploadServer) {
47+
Self::new(bucket_client.client.clone(), bucket_client.bucket.clone()).await
48+
}
49+
4650
pub async fn upload_file(&self, file: &Path) -> Result {
4751
self.sender
4852
.send(file.to_path_buf())

file_store/src/lib.rs

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ extern crate tls_init;
33
mod error;
44

55
pub use error::{DecodeError, EncodeError, Error, Result};
6+
use tokio::sync::Mutex;
67

8+
pub mod bucket_client;
79
pub mod file_info;
810
pub mod file_info_poller;
911
pub mod file_sink;
@@ -12,7 +14,7 @@ pub mod file_upload;
1214
mod settings;
1315
pub mod traits;
1416

15-
use std::path::Path;
17+
use std::{collections::HashMap, path::Path, sync::OnceLock};
1618

1719
use aws_config::BehaviorVersion;
1820
use aws_sdk_s3::primitives::ByteStream;
@@ -22,52 +24,82 @@ use chrono::{DateTime, Utc};
2224
pub use file_info::FileInfo;
2325
pub use file_sink::{FileSink, FileSinkBuilder};
2426

27+
pub use bucket_client::BucketClient;
2528
use futures::{
2629
future,
2730
stream::{self, BoxStream},
2831
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
2932
};
30-
pub use settings::Settings;
33+
pub use settings::{BucketSettings, Settings};
3134

3235
pub type Client = aws_sdk_s3::Client;
3336
pub type Stream<T> = BoxStream<'static, Result<T>>;
3437
pub type FileInfoStream = Stream<FileInfo>;
3538
pub type BytesMutStream = Stream<BytesMut>;
3639

40+
static CLIENT_MAP: OnceLock<Mutex<HashMap<ClientKey, Client>>> = OnceLock::new();
41+
42+
#[derive(PartialEq, Eq, Hash, Debug)]
43+
struct ClientKey {
44+
region: Option<String>,
45+
endpoint: Option<String>,
46+
access_key_id: Option<String>,
47+
secret_access_key: Option<String>,
48+
}
49+
3750
pub async fn new_client(
51+
region: Option<String>,
3852
endpoint: Option<String>,
3953
_access_key_id: Option<String>,
4054
_secret_access_key: Option<String>,
41-
) -> Client {
55+
) -> aws_sdk_s3::Client {
56+
let mut client_map = CLIENT_MAP
57+
.get_or_init(|| Mutex::new(HashMap::new()))
58+
.lock()
59+
.await;
60+
61+
let key = ClientKey {
62+
region: region.clone(),
63+
endpoint: endpoint.clone(),
64+
access_key_id: _access_key_id.clone(),
65+
secret_access_key: _secret_access_key.clone(),
66+
};
67+
68+
if let Some(client) = client_map.get(&key) {
69+
tracing::debug!(params = ?key, "Using existing file-store s3 client");
70+
return client.clone();
71+
}
72+
4273
let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
4374

4475
let mut s3_config = aws_sdk_s3::config::Builder::from(&config);
45-
if let Some(endpoint) = endpoint {
46-
s3_config = s3_config.endpoint_url(endpoint);
76+
77+
if let Some(region_str) = region {
78+
s3_config = s3_config.region(aws_config::Region::new(region_str));
4779
}
4880

49-
#[cfg(feature = "local")]
50-
{
81+
if let Some(endpoint) = endpoint {
82+
s3_config = s3_config.endpoint_url(endpoint);
5183
// NOTE(mj): If you see something like a DNS error, this is probably
5284
// the culprit. Need to find a way to make this configurable. It
5385
// would be nice to allow the "local" feature to be active, but not
5486
// enforce path style.
5587
s3_config = s3_config.force_path_style(true);
88+
}
5689

57-
// Set a default region for local development (MinIO doesn't care about the region)
58-
s3_config = s3_config.region(aws_config::Region::new("us-east-1"));
59-
60-
if let Some((access_key_id, secret_access_key)) = _access_key_id.zip(_secret_access_key) {
61-
let creds = aws_sdk_s3::config::Credentials::builder()
62-
.provider_name("Static")
63-
.access_key_id(access_key_id)
64-
.secret_access_key(secret_access_key);
90+
if let Some((access_key_id, secret_access_key)) = _access_key_id.zip(_secret_access_key) {
91+
let creds = aws_sdk_s3::config::Credentials::builder()
92+
.provider_name("Static")
93+
.access_key_id(access_key_id)
94+
.secret_access_key(secret_access_key);
6595

66-
s3_config = s3_config.credentials_provider(creds.build());
67-
}
96+
s3_config = s3_config.credentials_provider(creds.build());
6897
}
6998

70-
Client::from_conf(s3_config.build())
99+
tracing::debug!(params = ?key, "Creating new file-store s3 client");
100+
let client = Client::from_conf(s3_config.build());
101+
client_map.insert(key, client.clone());
102+
client
71103
}
72104

73105
pub fn list_files<A, B>(

file_store/src/settings.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ use config::{Config, File};
22
use serde::{Deserialize, Serialize};
33
use std::path::Path;
44

5+
use crate::bucket_client::BucketClient;
6+
57
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
68
pub struct Settings {
9+
pub region: Option<String>,
710
/// Optional api endpoint for the bucket. Default none
811
pub endpoint: Option<String>,
912
/// Should only be used for local testing
@@ -13,6 +16,13 @@ pub struct Settings {
1316
pub secret_access_key: Option<String>,
1417
}
1518

19+
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
20+
pub struct BucketSettings {
21+
bucket: String,
22+
#[serde(flatten)]
23+
settings: Settings,
24+
}
25+
1626
impl Settings {
1727
/// Load Settings from a given path.
1828
///
@@ -26,10 +36,31 @@ impl Settings {
2636

2737
pub async fn connect(&self) -> crate::Client {
2838
crate::new_client(
39+
self.region.clone(),
2940
self.endpoint.clone(),
3041
self.access_key_id.clone(),
3142
self.secret_access_key.clone(),
3243
)
3344
.await
3445
}
3546
}
47+
48+
impl BucketSettings {
49+
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, config::ConfigError> {
50+
Config::builder()
51+
.add_source(File::with_name(&path.as_ref().to_string_lossy()))
52+
.build()
53+
.and_then(|config| config.try_deserialize())
54+
}
55+
56+
pub async fn connect(&self) -> BucketClient {
57+
BucketClient::new(
58+
self.bucket.clone(),
59+
self.settings.region.clone(),
60+
self.settings.endpoint.clone(),
61+
self.settings.access_key_id.clone(),
62+
self.settings.secret_access_key.clone(),
63+
)
64+
.await
65+
}
66+
}

iot_verifier/src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ impl Server {
101101
// *
102102
// setup the price tracker requirements
103103
// *
104-
let (price_tracker, price_daemon) =
105-
PriceTracker::new(&settings.price_tracker, file_store_client.clone()).await?;
104+
let (price_tracker, price_daemon) = PriceTracker::new(&settings.price_tracker).await?;
106105

107106
// *
108107
// setup the loader requirements

0 commit comments

Comments
 (0)