Skip to content

Commit 02ae98b

Browse files
authored
Remove blocking calls (#74)
There are currently two blocking calls that will prevent the adapter to utilize the Tokio scheduler correctly. This change removes those calls by using Arc and AtomicBoolwhen necessary. Signed-off-by: David Calavera <[email protected]> Signed-off-by: David Calavera <[email protected]>
1 parent 3d4b821 commit 02ae98b

File tree

3 files changed

+95
-57
lines changed

3 files changed

+95
-57
lines changed

Cargo.lock

Lines changed: 0 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ env_logger = "0.8.3"
2121
http = "0.2.4"
2222
lambda-extension = "0.6.0"
2323
lambda_http = "0.6.1"
24-
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json"] }
24+
reqwest = { version = "0.11", default-features = false, features = ["json"] }
2525
tokio = { version = "1.20.0", features = ["macros", "io-util", "sync", "rt-multi-thread", "time"] }
2626
tokio-retry = "0.3"
2727
tracing = { version = "0.1", features = ["log"] }

src/lib.rs

Lines changed: 94 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,18 @@
44
use lambda_extension::Extension;
55
use lambda_http::{Body, Request, RequestExt, Response};
66
use reqwest::{redirect, Client, Url};
7-
use std::{env, future, future::Future, mem, pin::Pin, time::Duration};
8-
use tokio::{runtime::Handle, time::timeout};
7+
use std::{
8+
env,
9+
future::Future,
10+
mem,
11+
pin::Pin,
12+
sync::{
13+
atomic::{AtomicBool, Ordering},
14+
Arc,
15+
},
16+
time::Duration,
17+
};
18+
use tokio::time::timeout;
919
use tokio_retry::{strategy::FixedInterval, Retry};
1020
use tower::Service;
1121

@@ -39,10 +49,10 @@ impl AdapterOptions {
3949
}
4050

4151
pub struct Adapter {
42-
client: Client,
52+
client: Arc<Client>,
4353
healthcheck_url: String,
4454
async_init: bool,
45-
ready_at_init: bool,
55+
ready_at_init: Arc<AtomicBool>,
4656
domain: Url,
4757
base_path: Option<String>,
4858
}
@@ -66,42 +76,40 @@ impl Adapter {
6676
let domain = format!("http://{}:{}", options.host, options.port).parse().unwrap();
6777

6878
Adapter {
69-
client,
79+
client: Arc::new(client),
7080
healthcheck_url,
7181
domain,
7282
base_path: options.base_path.clone(),
7383
async_init: options.async_init,
74-
ready_at_init: false,
84+
ready_at_init: Arc::new(AtomicBool::new(false)),
7585
}
7686
}
7787

7888
/// Switch the default HTTP client with a different one.
7989
pub fn with_client(self, client: Client) -> Self {
80-
Adapter { client, ..self }
90+
Adapter {
91+
client: Arc::new(client),
92+
..self
93+
}
8194
}
8295

8396
/// Check if the web server has been initialized.
8497
/// If `Adapter.async_init` is true, cancel this check before
8598
/// Lambda's init 10s timeout, and let the server boot in the background.
8699
pub async fn check_init_health(&mut self) {
87-
self.ready_at_init = if self.async_init {
100+
let ready_at_init = if self.async_init {
88101
timeout(Duration::from_secs_f32(9.8), self.check_readiness())
89102
.await
90103
.unwrap_or_default()
91104
} else {
92105
self.check_readiness().await
93106
};
107+
self.ready_at_init.store(ready_at_init, Ordering::SeqCst);
94108
}
95109

96110
async fn check_readiness(&self) -> bool {
97-
Retry::spawn(FixedInterval::from_millis(10), || {
98-
match reqwest::blocking::get(&self.healthcheck_url) {
99-
Ok(response) if { response.status().is_success() } => future::ready(Ok(())),
100-
_ => future::ready(Err::<(), i32>(-1)),
101-
}
102-
})
103-
.await
104-
.is_ok()
111+
let url = self.healthcheck_url.clone();
112+
is_web_ready(&url).await
105113
}
106114
}
107115

@@ -117,44 +125,83 @@ impl Service<Request> for Adapter {
117125
}
118126

119127
fn call(&mut self, event: Request) -> Self::Future {
120-
if self.async_init && !self.ready_at_init {
121-
let handle = Handle::current();
122-
handle.block_on(self.check_readiness());
123-
self.ready_at_init = true;
124-
}
128+
let async_init = self.async_init;
129+
let client = self.client.clone();
130+
let ready_at_init = self.ready_at_init.clone();
131+
let healthcheck_url = self.healthcheck_url.clone();
132+
let domain = self.domain.clone();
133+
let base_path = self.base_path.clone();
125134

126-
let path = event.raw_http_path();
127-
let mut path = path.as_str();
128-
let (parts, body) = event.into_parts();
135+
Box::pin(async move {
136+
fetch_response(
137+
async_init,
138+
ready_at_init,
139+
client,
140+
base_path,
141+
domain,
142+
healthcheck_url,
143+
event,
144+
)
145+
.await
146+
})
147+
}
148+
}
129149

130-
// strip away Base Path if environment variable REMOVE_BASE_PATH is set.
131-
if let Some(base_path) = self.base_path.as_deref() {
132-
path = path.trim_start_matches(base_path);
133-
}
150+
async fn fetch_response(
151+
async_init: bool,
152+
ready_at_init: Arc<AtomicBool>,
153+
client: Arc<Client>,
154+
base_path: Option<String>,
155+
domain: Url,
156+
healthcheck_url: String,
157+
event: Request,
158+
) -> Result<http::Response<Body>, Error> {
159+
if async_init && !ready_at_init.load(Ordering::SeqCst) {
160+
is_web_ready(&healthcheck_url).await;
161+
ready_at_init.store(true, Ordering::SeqCst);
162+
}
134163

135-
let mut app_url = self.domain.clone();
136-
app_url.set_path(path);
137-
app_url.set_query(parts.uri.query());
138-
tracing::debug!(app_url = %app_url, "sending request to server");
164+
let path = event.raw_http_path();
165+
let mut path = path.as_str();
166+
let (parts, body) = event.into_parts();
139167

140-
let app_response = self
141-
.client
142-
.request(parts.method, app_url.to_string())
143-
.headers(parts.headers)
144-
.body(body.to_vec())
145-
.send();
168+
// strip away Base Path if environment variable REMOVE_BASE_PATH is set.
169+
if let Some(base_path) = base_path.as_deref() {
170+
path = path.trim_start_matches(base_path);
171+
}
146172

147-
Box::pin(async move {
148-
let app_response = app_response.await?;
149-
let mut lambda_response = Response::builder();
150-
let _ = mem::replace(lambda_response.headers_mut().unwrap(), app_response.headers().clone());
173+
let mut app_url = domain;
174+
app_url.set_path(path);
175+
app_url.set_query(parts.uri.query());
176+
tracing::debug!(app_url = %app_url, "sending request to server");
151177

152-
let status = app_response.status();
153-
let body = convert_body(app_response).await?;
154-
let resp = lambda_response.status(status).body(body).map_err(Box::new)?;
178+
let app_response = client
179+
.request(parts.method, app_url.to_string())
180+
.headers(parts.headers)
181+
.body(body.to_vec())
182+
.send()
183+
.await?;
155184

156-
Ok(resp)
157-
})
185+
let mut lambda_response = Response::builder();
186+
let _ = mem::replace(lambda_response.headers_mut().unwrap(), app_response.headers().clone());
187+
188+
let status = app_response.status();
189+
let body = convert_body(app_response).await?;
190+
let resp = lambda_response.status(status).body(body).map_err(Box::new)?;
191+
192+
Ok(resp)
193+
}
194+
195+
async fn is_web_ready(url: &str) -> bool {
196+
Retry::spawn(FixedInterval::from_millis(10), || check_web_readiness(url))
197+
.await
198+
.is_ok()
199+
}
200+
201+
async fn check_web_readiness(url: &str) -> Result<(), i8> {
202+
match reqwest::get(url).await {
203+
Ok(response) if { response.status().is_success() } => Ok(()),
204+
_ => Err(-1),
158205
}
159206
}
160207

0 commit comments

Comments
 (0)