Skip to content

feat(tracing, lambda-runtime): add support for custom writer with default tracing subscriber, add turnkey graceful shutdown helper behind 'graceful-shutdown' feature flag #982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,52 @@ async fn handler(_event: LambdaEvent<Request>) -> Result<(), Diagnostic> {

You can see more examples on how to use these error crates in our [example repository](https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples/basic-error-error-crates-integration).

### Graceful shutdown

`lambda_runtime` offers a helper to simplify configuring graceful shutdown signal handling, `spawn_graceful_shutdown_handler()`. This requires the `graceful-shutdown` feature flag and only supports Unix systems.

You can use it by passing a `FnOnce` closure that returns an async block. That async block will be executed
when the function receives a `SIGTERM` or `SIGKILL`.

Note that this helper is opinionated in a number of ways. Most notably:
1. It spawns a task to drive your signal handlers
2. It registers a 'no-op' extension in order to enable graceful shutdown signals
3. It panics on unrecoverable errors

If you prefer to fine-tune the behavior, refer to the implementation of `spawn_graceful_shutdown_handler()` as a starting point for your own.

For more information on graceful shutdown handling in AWS Lambda, see: [aws-samples/graceful-shutdown-with-aws-lambda](https://github.com/aws-samples/graceful-shutdown-with-aws-lambda).

Complete example (cleaning up a non-blocking tracing writer):

```rust,no_run
use lambda_runtime::{service_fn, LambdaEvent, Error};
use serde_json::{json, Value};

#[tokio::main]
async fn main() -> Result<(), Error> {
let func = service_fn(func);

let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout());
lambda_runtime::tracing::init_default_subscriber_with_writer(writer);

let shutdown_hook = || async move {
std::mem::drop(log_guard);
};
lambda_runtime::spawn_graceful_shutdown_handler(shutdown_hook);

lambda_runtime::run(func).await?;
Ok(())
}

async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
let (event, _context) = event.into_parts();
let first_name = event["firstName"].as_str().unwrap_or("world");

Ok(json!({ "message": format!("Hello, {}!", first_name) }))
}
```

## Building and deploying your Lambda functions

If you already have Cargo Lambda installed in your machine, run the next command to build your function:
Expand Down
1 change: 1 addition & 0 deletions lambda-integration-tests/src/helloworld.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();
let func = service_fn(func);
lambda_runtime::spawn_graceful_shutdown_handler(|| async move {});
lambda_runtime::run(func).await?;
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions lambda-runtime-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use error::*;
pub mod body;

#[cfg(feature = "tracing")]
#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
pub mod tracing;

/// API client to interact with the AWS Lambda Runtime API.
Expand Down
35 changes: 33 additions & 2 deletions lambda-runtime-api-client/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ pub use tracing::*;

/// Re-export the `tracing-subscriber` crate to build your own subscribers.
pub use tracing_subscriber as subscriber;
use tracing_subscriber::fmt::MakeWriter;

const DEFAULT_LOG_LEVEL: &str = "INFO";

/// Initialize `tracing-subscriber` with default logging options.
///
/// This function uses environment variables set with [Lambda's advance logging controls](https://aws.amazon.com/blogs/compute/introducing-advanced-logging-controls-for-aws-lambda-functions/)
/// The default subscriber writes logs to STDOUT in the current context.
/// If you want to customize the writer, see [`init_default_subscriber_with_writer()`].
///
/// This function uses environment variables set with [Lambda's advanced logging controls](https://aws.amazon.com/blogs/compute/introducing-advanced-logging-controls-for-aws-lambda-functions/)
/// if they're configured for your function.
///
/// This subscriber sets the logging level based on environment variables:
Expand All @@ -31,6 +35,32 @@ const DEFAULT_LOG_LEVEL: &str = "INFO";
/// If the `AWS_LAMBDA_LOG_FORMAT` environment variable is set to `JSON`, the log lines will be formatted as json objects,
/// otherwise they will be formatted with the default tracing format.
pub fn init_default_subscriber() {
init_default_subscriber_with_writer(std::io::stdout);
}

/// Initialize `tracing-subscriber` with default logging options, and a custom writer.
///
/// You might want to avoid writing to STDOUT in the local context via [`init_default_subscriber()`], if you have a high-throughput Lambdas that involve
/// a lot of async concurrency. Since, writing to STDOUT can briefly block your tokio runtime - ref [tracing #2653](https://github.com/tokio-rs/tracing/issues/2653).
/// In that case, you might prefer to use [tracing_appender::NonBlocking] instead - particularly if your Lambda is fairly long-running and stays warm.
/// Though, note that you are then responsible
/// for ensuring gracefuls shutdown. See [`examples/graceful-shutdown`] for a complete example.
///
/// This function uses environment variables set with [Lambda's advanced logging controls](https://aws.amazon.com/blogs/compute/introducing-advanced-logging-controls-for-aws-lambda-functions/)
/// if they're configured for your function.
///
/// This subscriber sets the logging level based on environment variables:
/// - if `AWS_LAMBDA_LOG_LEVEL` is set, it takes precedence over any other environment variables.
/// - if `AWS_LAMBDA_LOG_LEVEL` is not set, check if `RUST_LOG` is set.
/// - if none of those two variables are set, use `INFO` as the logging level.
///
/// The logging format can also be changed based on Lambda's advanced logging controls.
/// If the `AWS_LAMBDA_LOG_FORMAT` environment variable is set to `JSON`, the log lines will be formatted as json objects,
/// otherwise they will be formatted with the default tracing format.
pub fn init_default_subscriber_with_writer<Writer>(writer: Writer)
where
Writer: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
{
let log_format = env::var("AWS_LAMBDA_LOG_FORMAT").unwrap_or_default();
let log_level_str = env::var("AWS_LAMBDA_LOG_LEVEL").or_else(|_| env::var("RUST_LOG"));
let log_level =
Expand All @@ -43,7 +73,8 @@ pub fn init_default_subscriber() {
EnvFilter::builder()
.with_default_directive(log_level.into())
.from_env_lossy(),
);
)
.with_writer(writer);

if log_format.eq_ignore_ascii_case("json") {
collector.json().init()
Expand Down
8 changes: 8 additions & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ opentelemetry = ["opentelemetry-semantic-conventions"] # enables access to the O
anyhow = ["dep:anyhow"] # enables From<T> for Diagnostic for anyhow error types, see README.md for more info
eyre = ["dep:eyre"] # enables From<T> for Diagnostic for eyre error types, see README.md for more info
miette = ["dep:miette"] # enables From<T> for Diagnostic for miette error types, see README.md for more info
# TODO: remove tokio/rt and rt-multi-thread from non-feature-flagged dependencies in new breaking version, since they are unused:
# as well as default features
# https://github.com/awslabs/aws-lambda-rust-runtime/issues/984
graceful-shutdown = ["tokio/rt", "tokio/signal", "dep:lambda-extension"]

[dependencies]
anyhow = { version = "1.0.86", optional = true }
Expand All @@ -39,6 +43,7 @@ hyper-util = { workspace = true, features = [
"http1",
"tokio",
] }
lambda-extension = { version = "0.11.0", path = "../lambda-extension", default-features = false, optional = true }
lambda_runtime_api_client = { version = "0.11.1", path = "../lambda-runtime-api-client", default-features = false }
miette = { version = "7.2.0", optional = true }
opentelemetry-semantic-conventions = { version = "0.29", optional = true, features = ["semconv_experimental"] }
Expand Down Expand Up @@ -67,4 +72,7 @@ hyper-util = { workspace = true, features = [
"server-auto",
"tokio",
] }
# Self dependency to enable the graceful-shutdown feature for tests
lambda_runtime = { path = ".", features = ["tracing", "graceful-shutdown"] }
pin-project-lite = { workspace = true }
tracing-appender = "0.2"
108 changes: 108 additions & 0 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod streaming;

/// Utilities to initialize and use `tracing` and `tracing-subscriber` in Lambda Functions.
#[cfg(feature = "tracing")]
#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
pub use lambda_runtime_api_client::tracing;

/// Types available to a Lambda function.
Expand Down Expand Up @@ -123,3 +124,110 @@ where
let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
runtime.run().await
}

/// Spawns a task that will be execute a provided async closure when the process
/// receives unix graceful shutdown signals. If the closure takes longer than 500ms
/// to execute, an unhandled `SIGKILL` signal might be received.
///
/// You can use this future to execute cleanup or flush related logic prior to runtime shutdown.
///
/// This function must be called prior to [lambda_runtime::run()].
///
/// Note that this implicitly also registers and drives a no-op internal extension that subscribes to no events.
/// This extension will be named `_lambda-rust-runtime-no-op-graceful-shutdown-helper`. This extension name
/// can not be reused by other registered extensions. This is necessary in order to receive graceful shutdown signals.
///
/// This extension is cheap to run because it receives no events, but is not zero cost. If you have another extension
/// registered already, you might prefer to manually construct your own graceful shutdown handling without the dummy extension.
///
/// For more information on general AWS Lambda graceful shutdown handling, see:
/// https://github.com/aws-samples/graceful-shutdown-with-aws-lambda
///
/// # Panics
///
/// This function panics if:
/// - this function is called after [lambda_runtime::run()]
/// - this function is called outside of a context that has access to the tokio i/o
/// - the no-op extension cannot be registered
/// - either signal listener panics [tokio::signal::unix](https://docs.rs/tokio/latest/tokio/signal/unix/fn.signal.html#errors)
///
/// # Example
/// ```no_run
/// use lambda_runtime::{Error, service_fn, LambdaEvent};
/// use serde_json::Value;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
/// let func = service_fn(func);
///
/// let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout());
/// lambda_runtime::tracing::init_default_subscriber_with_writer(writer);
///
/// let shutdown_hook = || async move {
/// std::mem::drop(log_guard);
/// };
/// lambda_runtime::spawn_graceful_shutdown_handler(shutdown_hook);
///
/// lambda_runtime::run(func).await?;
/// Ok(())
/// }
///
/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
/// Ok(event.payload)
/// }
/// ```
#[cfg(all(unix, feature = "graceful-shutdown"))]
#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "tokio-rt"))))]
pub fn spawn_graceful_shutdown_handler<Fut>(shutdown_hook: impl FnOnce() -> Fut + Send + 'static)
where
Fut: Future<Output = ()> + Send + 'static,
{
tokio::task::spawn(async move {
// You need an extension registered with the Lambda orchestrator in order for your process
// to receive a SIGTERM for graceful shutdown.
//
// We accomplish this here by registering a no-op internal extension, which does not subscribe to any events.
//
// This extension is cheap to run since after it connects to the lambda orchestration, the connection
// will just wait forever for data to come, which never comes, so it won't cause wakes.
let extension = lambda_extension::Extension::new()
// Don't subscribe to any event types
.with_events(&[])
// Internal extension names MUST be unique within a given Lambda function.
.with_extension_name("_lambda-rust-runtime-no-op-graceful-shutdown-helper")
// Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init
// phase and begins the Invoke phase.
.register()
.await
.expect("could not register no-op extension for graceful shutdown");

let graceful_shutdown_future = async move {
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
tokio::select! {
_sigint = sigint.recv() => {
eprintln!("[runtime] SIGINT received");
eprintln!("[runtime] Graceful shutdown in progress ...");
shutdown_hook().await;
eprintln!("[runtime] Graceful shutdown completed");
std::process::exit(0);
},
_sigterm = sigterm.recv()=> {
eprintln!("[runtime] SIGTERM received");
eprintln!("[runtime] Graceful shutdown in progress ...");
shutdown_hook().await;
eprintln!("[runtime] Graceful shutdown completed");
std::process::exit(0);
},
}
};

// TODO: add biased! to always poll the signal handling future first, once supported:
// https://github.com/tokio-rs/tokio/issues/7304
let _: (_, ()) = tokio::join!(graceful_shutdown_future, async {
// we suppress extension errors because we don't actually mind if it crashes,
// all we need to do is kick off the run so that lambda exits the init phase
let _ = extension.run().await;
});
});
}
Loading