Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ INTEG_API_INVOKE := RestApiUrl HttpApiUrl
INTEG_EXTENSIONS := extension-fn extension-trait logs-trait
# Using musl to run extensions on both AL1 and AL2
INTEG_ARCH := x86_64-unknown-linux-musl
RIE_MAX_CONCURRENCY ?= 4

define uppercase
$(shell sed -r 's/(^|-)(\w)/\U\2/g' <<< $(1))
Expand Down Expand Up @@ -111,4 +112,8 @@ fmt:
cargo +nightly fmt --all

test-rie:
./scripts/test-rie.sh $(EXAMPLE)
./scripts/test-rie.sh $(EXAMPLE)

# Run RIE in Lambda Managed Instance (LMI) mode with concurrent polling.
test-rie-lmi:
RIE_MAX_CONCURRENCY=$(RIE_MAX_CONCURRENCY) ./scripts/test-rie.sh $(EXAMPLE)
5 changes: 4 additions & 1 deletion examples/basic-lambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();

let func = service_fn(my_handler);
lambda_runtime::run(func).await?;
if let Err(err) = lambda_runtime::run(func).await {
eprintln!("run error: {:?}", err);
return Err(err);
}
Ok(())
}

Expand Down
32 changes: 31 additions & 1 deletion lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
pub use streaming::{run_with_streaming_response, StreamAdapter};
pub use streaming::{run_with_streaming_response, run_with_streaming_response_concurrent, StreamAdapter};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down Expand Up @@ -151,6 +151,18 @@ pub struct Adapter<'a, R, S> {
_phantom_data: PhantomData<&'a R>,
}

impl<'a, R, S> Clone for Adapter<'a, R, S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
_phantom_data: PhantomData,
}
}
}

impl<'a, R, S, E> From<S> for Adapter<'a, R, S>
where
S: Service<Request, Response = R, Error = E>,
Expand Down Expand Up @@ -203,6 +215,24 @@ where
lambda_runtime::run(Adapter::from(handler)).await
}

/// Starts the Lambda Rust runtime in a mode that is compatible with
/// Lambda Managed Instances (concurrent invocations).
///
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
/// will use a concurrent `/next` polling loop with a bounded number of
/// in-flight handler tasks. When the environment variable is unset or `<= 1`,
/// it falls back to the same sequential behavior as [`run`], so the same
/// handler can run on both classic Lambda and Lambda Managed Instances.
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
R: IntoResponse + Send + Sync + 'static,
E: std::fmt::Debug + Into<Diagnostic> + Send + 'static,
{
lambda_runtime::run_concurrent(Adapter::from(handler)).await
}

#[cfg(test)]
mod test_adapter {
use std::task::{Context, Poll};
Expand Down
52 changes: 47 additions & 5 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::{
tower::{
util::{MapRequest, MapResponse},
util::{BoxCloneService, MapRequest, MapResponse},
ServiceBuilder, ServiceExt,
},
Diagnostic,
Expand Down Expand Up @@ -93,14 +93,33 @@ where
B::Error: Into<Error> + Send + Debug,
{
ServiceBuilder::new()
.map_request(|req: LambdaEvent<LambdaRequest>| {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
})
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
.service(handler)
.map_response(into_stream_response)
}

/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
#[allow(clippy::type_complexity)]
fn into_stream_service_boxed<S, B, E>(
handler: S,
) -> BoxCloneService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E>
where
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
E: Debug + Into<Diagnostic> + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = ServiceBuilder::new()
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
.service(handler)
.map_response(into_stream_response);

BoxCloneService::new(svc)
}

/// Converts an `http::Response<B>` into a streaming Lambda response.
fn into_stream_response<B>(res: Response<B>) -> StreamResponse<BodyStream<B>>
where
Expand Down Expand Up @@ -128,6 +147,11 @@ where
}
}

fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
}

/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses.
///
Expand All @@ -147,6 +171,24 @@ where
lambda_runtime::run(into_stream_service(handler)).await
}

/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses, in a mode that is compatible with Lambda Managed Instances.
///
/// This uses a cloneable, boxed service internally so it can be driven by the
/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`,
/// it falls back to the same sequential behavior as [`run_with_streaming_response`].
pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
E: Debug + Into<Diagnostic> + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
lambda_runtime::run_concurrent(into_stream_service_boxed(handler)).await
}

pin_project_lite::pin_project! {
#[non_exhaustive]
pub struct BodyStream<B> {
Expand Down
41 changes: 35 additions & 6 deletions lambda-runtime-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl Client {
ClientBuilder {
connector: HttpConnector::new(),
uri: None,
pool_size: None,
}
}
}
Expand All @@ -59,11 +60,16 @@ impl Client {
self.client.request(req).map_err(Into::into).boxed()
}

/// Create a new client with a given base URI and HTTP connector.
fn with(base: Uri, connector: HttpConnector) -> Self {
let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.http1_max_buf_size(1024 * 1024)
.build(connector);
/// Create a new client with a given base URI, HTTP connector, and optional pool size hint.
fn with(base: Uri, connector: HttpConnector, pool_size: Option<usize>) -> Self {
let mut builder = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
builder.http1_max_buf_size(1024 * 1024);

if let Some(size) = pool_size {
builder.pool_max_idle_per_host(size);
}

let client = builder.build(connector);
Self { base, client }
}

Expand Down Expand Up @@ -94,6 +100,7 @@ impl Client {
pub struct ClientBuilder {
connector: HttpConnector,
uri: Option<http::Uri>,
pool_size: Option<usize>,
}

impl ClientBuilder {
Expand All @@ -102,6 +109,7 @@ impl ClientBuilder {
ClientBuilder {
connector,
uri: self.uri,
pool_size: self.pool_size,
}
}

Expand All @@ -111,6 +119,14 @@ impl ClientBuilder {
Self { uri: Some(uri), ..self }
}

/// Provide a pool size hint for the underlying Hyper client.
pub fn with_pool_size(self, pool_size: usize) -> Self {
Self {
pool_size: Some(pool_size),
..self
}
}

/// Create the new client to interact with the Runtime API.
pub fn build(self) -> Result<Client, Error> {
let uri = match self.uri {
Expand All @@ -120,7 +136,7 @@ impl ClientBuilder {
uri.try_into().expect("Unable to convert to URL")
}
};
Ok(Client::with(uri, self.connector))
Ok(Client::with(uri, self.connector, self.pool_size))
}
}

Expand Down Expand Up @@ -182,4 +198,17 @@ mod tests {
&req.uri().to_string()
);
}

#[test]
fn builder_accepts_pool_size() {
let base = "http://localhost:9001";
let expected: Uri = base.parse().unwrap();
let client = Client::builder()
.with_pool_size(4)
.with_endpoint(base.parse().unwrap())
.build()
.unwrap();

assert_eq!(client.base, expected);
}
}
12 changes: 12 additions & 0 deletions lambda-runtime/src/layers/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ where
}
}

impl<S> Clone for RuntimeApiClientService<S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
client: self.client.clone(),
}
}
}

#[pin_project(project = RuntimeApiClientFutureProj)]
pub enum RuntimeApiClientFuture<F> {
First(#[pin] F, Arc<Client>),
Expand Down
21 changes: 21 additions & 0 deletions lambda-runtime/src/layers/api_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,27 @@ impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem,
}
}

impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> Clone
for RuntimeApiResponseService<
S,
EventPayload,
Response,
BufferedResponse,
StreamingResponse,
StreamItem,
StreamError,
>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_phantom: PhantomData,
}
}
}

impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> Service<LambdaInvocation>
for RuntimeApiResponseService<
S,
Expand Down
1 change: 1 addition & 0 deletions lambda-runtime/src/layers/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl<S> Layer<S> for TracingLayer {
}

/// Tower service returned by [TracingLayer].
#[derive(Clone)]
pub struct TracingService<S> {
inner: S,
}
Expand Down
Loading