Skip to content

Commit 7b68388

Browse files
feat: support Lambda Managed Instances
- Add Config.max_concurrency and size runtime HTTP client pool from AWS_LAMBDA_MAX_CONCURRENCY. - Introduce windowed concurrent /next polling with semaphore-limited handler tasks and shutdown coordination. - Require Clone + Send + 'static handlers in lambda-runtime and lambda-http, and make internal layers/HTTP adapters cloneable. - Adjust streaming HTTP to use BoxCloneService and align bounds for concurrent execution. - Add RIE LMI helper (Makefile + test-rie.sh) and minor robustness improvements (Context parsing, basic example error logging). Tests: cargo +stable fmt --all; cargo +stable clippy --all-targets --all-features; cargo +stable test --all (integration test requiring TEST_ENDPOINT not configured); ./scripts/test-rie.sh basic-lambda
1 parent 1bfa699 commit 7b68388

File tree

12 files changed

+427
-79
lines changed

12 files changed

+427
-79
lines changed

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ INTEG_API_INVOKE := RestApiUrl HttpApiUrl
55
INTEG_EXTENSIONS := extension-fn extension-trait logs-trait
66
# Using musl to run extensions on both AL1 and AL2
77
INTEG_ARCH := x86_64-unknown-linux-musl
8+
RIE_MAX_CONCURRENCY ?= 4
89

910
define uppercase
1011
$(shell sed -r 's/(^|-)(\w)/\U\2/g' <<< $(1))
@@ -111,4 +112,8 @@ fmt:
111112
cargo +nightly fmt --all
112113

113114
test-rie:
114-
./scripts/test-rie.sh $(EXAMPLE)
115+
./scripts/test-rie.sh $(EXAMPLE)
116+
117+
# Run RIE in Lambda Managed Instance (LMI) mode with concurrent polling.
118+
test-rie-lmi:
119+
RIE_MAX_CONCURRENCY=$(RIE_MAX_CONCURRENCY) ./scripts/test-rie.sh $(EXAMPLE)

examples/basic-lambda/src/main.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ async fn main() -> Result<(), Error> {
2828
tracing::init_default_subscriber();
2929

3030
let func = service_fn(my_handler);
31-
lambda_runtime::run(func).await?;
31+
if let Err(err) = lambda_runtime::run(func).await {
32+
eprintln!("run error: {:?}", err);
33+
return Err(err);
34+
}
3235
Ok(())
3336
}
3437

lambda-http/src/lib.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,23 @@ pub struct Adapter<'a, R, S> {
151151
_phantom_data: PhantomData<&'a R>,
152152
}
153153

154+
impl<'a, R, S> Clone for Adapter<'a, R, S>
155+
where
156+
S: Clone,
157+
{
158+
fn clone(&self) -> Self {
159+
Self {
160+
service: self.service.clone(),
161+
_phantom_data: PhantomData,
162+
}
163+
}
164+
}
165+
154166
impl<'a, R, S, E> From<S> for Adapter<'a, R, S>
155167
where
156-
S: Service<Request, Response = R, Error = E>,
157-
S::Future: Send + 'a,
158-
R: IntoResponse,
168+
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
169+
S::Future: Send + 'static,
170+
R: IntoResponse + Send + Sync + 'static,
159171
{
160172
fn from(service: S) -> Self {
161173
Adapter {
@@ -193,12 +205,12 @@ where
193205
///
194206
/// This takes care of transforming the LambdaEvent into a [`Request`] and then
195207
/// converting the result into a `LambdaResponse`.
196-
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
208+
pub async fn run<R, S, E>(handler: S) -> Result<(), Error>
197209
where
198-
S: Service<Request, Response = R, Error = E>,
199-
S::Future: Send + 'a,
200-
R: IntoResponse,
201-
E: std::fmt::Debug + Into<Diagnostic>,
210+
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
211+
S::Future: Send + 'static,
212+
R: IntoResponse + Send + Sync + 'static,
213+
E: std::fmt::Debug + Into<Diagnostic> + Send + 'static,
202214
{
203215
lambda_runtime::run(Adapter::from(handler)).await
204216
}

lambda-http/src/streaming.rs

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ use futures_util::{Stream, TryFutureExt};
99
pub use http::{self, Response};
1010
use http_body::Body;
1111
use lambda_runtime::{
12-
tower::{
13-
util::{MapRequest, MapResponse},
14-
ServiceBuilder, ServiceExt,
15-
},
12+
tower::{util::BoxCloneService, ServiceBuilder, ServiceExt},
1613
Diagnostic,
1714
};
1815
pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
@@ -27,10 +24,22 @@ pub struct StreamAdapter<'a, S, B> {
2724
_phantom_data: PhantomData<&'a B>,
2825
}
2926

27+
impl<'a, S, B> Clone for StreamAdapter<'a, S, B>
28+
where
29+
S: Clone,
30+
{
31+
fn clone(&self) -> Self {
32+
Self {
33+
service: self.service.clone(),
34+
_phantom_data: PhantomData,
35+
}
36+
}
37+
}
38+
3039
impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
3140
where
32-
S: Service<Request, Response = Response<B>, Error = E>,
33-
S::Future: Send + 'a,
41+
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
42+
S::Future: Send + 'static,
3443
B: Body + Unpin + Send + 'static,
3544
B::Data: Into<Bytes> + Send,
3645
B::Error: Into<Error> + Send + Debug,
@@ -45,15 +54,15 @@ where
4554

4655
impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B>
4756
where
48-
S: Service<Request, Response = Response<B>, Error = E>,
49-
S::Future: Send + 'a,
57+
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
58+
S::Future: Send + 'static,
5059
B: Body + Unpin + Send + 'static,
5160
B::Data: Into<Bytes> + Send,
5261
B::Error: Into<Error> + Send + Debug,
5362
{
5463
type Response = StreamResponse<BodyStream<B>>;
5564
type Error = E;
56-
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'a>>;
65+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
5766

5867
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
5968
self.service.poll_ready(cx)
@@ -78,27 +87,23 @@ where
7887
/// Used internally by [`run_with_streaming_response`]; not part of the public
7988
/// API.
8089
#[allow(clippy::type_complexity)]
81-
fn into_stream_service<'a, S, B, E>(
90+
fn into_stream_service<S, B, E>(
8291
handler: S,
83-
) -> MapResponse<
84-
MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>,
85-
impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone,
86-
>
92+
) -> BoxCloneService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E>
8793
where
88-
S: Service<Request, Response = Response<B>, Error = E>,
89-
S::Future: Send + 'a,
90-
E: Debug + Into<Diagnostic>,
94+
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
95+
S::Future: Send + 'static,
96+
E: Debug + Into<Diagnostic> + Send + 'static,
9197
B: Body + Unpin + Send + 'static,
9298
B::Data: Into<Bytes> + Send,
9399
B::Error: Into<Error> + Send + Debug,
94100
{
95-
ServiceBuilder::new()
96-
.map_request(|req: LambdaEvent<LambdaRequest>| {
97-
let event: Request = req.payload.into();
98-
event.with_lambda_context(req.context)
99-
})
101+
let svc = ServiceBuilder::new()
102+
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
100103
.service(handler)
101-
.map_response(into_stream_response)
104+
.map_response(into_stream_response);
105+
106+
BoxCloneService::new(svc)
102107
}
103108

104109
/// Converts an `http::Response<B>` into a streaming Lambda response.
@@ -128,18 +133,23 @@ where
128133
}
129134
}
130135

136+
fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
137+
let event: Request = req.payload.into();
138+
event.with_lambda_context(req.context)
139+
}
140+
131141
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
132142
/// responses.
133143
///
134144
/// See the [AWS docs for response streaming].
135145
///
136146
/// [AWS docs for response streaming]:
137147
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
138-
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
148+
pub async fn run_with_streaming_response<S, B, E>(handler: S) -> Result<(), Error>
139149
where
140-
S: Service<Request, Response = Response<B>, Error = E>,
141-
S::Future: Send + 'a,
142-
E: Debug + Into<Diagnostic>,
150+
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
151+
S::Future: Send + 'static,
152+
E: Debug + Into<Diagnostic> + Send + 'static,
143153
B: Body + Unpin + Send + 'static,
144154
B::Data: Into<Bytes> + Send,
145155
B::Error: Into<Error> + Send + Debug,

lambda-runtime-api-client/src/lib.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl Client {
4141
ClientBuilder {
4242
connector: HttpConnector::new(),
4343
uri: None,
44+
pool_size: None,
4445
}
4546
}
4647
}
@@ -59,11 +60,16 @@ impl Client {
5960
self.client.request(req).map_err(Into::into).boxed()
6061
}
6162

62-
/// Create a new client with a given base URI and HTTP connector.
63-
fn with(base: Uri, connector: HttpConnector) -> Self {
64-
let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
65-
.http1_max_buf_size(1024 * 1024)
66-
.build(connector);
63+
/// Create a new client with a given base URI, HTTP connector, and optional pool size hint.
64+
fn with(base: Uri, connector: HttpConnector, pool_size: Option<usize>) -> Self {
65+
let mut builder = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
66+
builder.http1_max_buf_size(1024 * 1024);
67+
68+
if let Some(size) = pool_size {
69+
builder.pool_max_idle_per_host(size);
70+
}
71+
72+
let client = builder.build(connector);
6773
Self { base, client }
6874
}
6975

@@ -94,6 +100,7 @@ impl Client {
94100
pub struct ClientBuilder {
95101
connector: HttpConnector,
96102
uri: Option<http::Uri>,
103+
pool_size: Option<usize>,
97104
}
98105

99106
impl ClientBuilder {
@@ -102,13 +109,26 @@ impl ClientBuilder {
102109
ClientBuilder {
103110
connector,
104111
uri: self.uri,
112+
pool_size: self.pool_size,
105113
}
106114
}
107115

108116
/// Create a new builder with a given base URI.
109117
/// Inherits all other attributes from the existent builder.
110118
pub fn with_endpoint(self, uri: http::Uri) -> Self {
111-
Self { uri: Some(uri), ..self }
119+
Self {
120+
uri: Some(uri),
121+
pool_size: self.pool_size,
122+
connector: self.connector,
123+
}
124+
}
125+
126+
/// Provide a pool size hint for the underlying Hyper client.
127+
pub fn with_pool_size(self, pool_size: usize) -> Self {
128+
Self {
129+
pool_size: Some(pool_size),
130+
..self
131+
}
112132
}
113133

114134
/// Create the new client to interact with the Runtime API.
@@ -120,7 +140,7 @@ impl ClientBuilder {
120140
uri.try_into().expect("Unable to convert to URL")
121141
}
122142
};
123-
Ok(Client::with(uri, self.connector))
143+
Ok(Client::with(uri, self.connector, self.pool_size))
124144
}
125145
}
126146

@@ -182,4 +202,17 @@ mod tests {
182202
&req.uri().to_string()
183203
);
184204
}
205+
206+
#[test]
207+
fn builder_accepts_pool_size() {
208+
let base = "http://localhost:9001";
209+
let expected: Uri = base.parse().unwrap();
210+
let client = Client::builder()
211+
.with_pool_size(4)
212+
.with_endpoint(base.parse().unwrap())
213+
.build()
214+
.unwrap();
215+
216+
assert_eq!(client.base, expected);
217+
}
185218
}

lambda-runtime/src/layers/api_client.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ where
4444
}
4545
}
4646

47+
impl<S> Clone for RuntimeApiClientService<S>
48+
where
49+
S: Clone,
50+
{
51+
fn clone(&self) -> Self {
52+
Self {
53+
inner: self.inner.clone(),
54+
client: self.client.clone(),
55+
}
56+
}
57+
}
58+
4759
#[pin_project(project = RuntimeApiClientFutureProj)]
4860
pub enum RuntimeApiClientFuture<F> {
4961
First(#[pin] F, Arc<Client>),

lambda-runtime/src/layers/api_response.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,27 @@ impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem,
5151
}
5252
}
5353

54+
impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> Clone
55+
for RuntimeApiResponseService<
56+
S,
57+
EventPayload,
58+
Response,
59+
BufferedResponse,
60+
StreamingResponse,
61+
StreamItem,
62+
StreamError,
63+
>
64+
where
65+
S: Clone,
66+
{
67+
fn clone(&self) -> Self {
68+
Self {
69+
inner: self.inner.clone(),
70+
_phantom: PhantomData,
71+
}
72+
}
73+
}
74+
5475
impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> Service<LambdaInvocation>
5576
for RuntimeApiResponseService<
5677
S,

lambda-runtime/src/layers/trace.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ impl<S> Layer<S> for TracingLayer {
2525
}
2626

2727
/// Tower service returned by [TracingLayer].
28+
#[derive(Clone)]
2829
pub struct TracingService<S> {
2930
inner: S,
3031
}

0 commit comments

Comments
 (0)