diff --git a/README.md b/README.md index 40fa7a13..981633c9 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,52 @@ async fn handler(_event: LambdaEvent) -> Result<(), Diagnostic> { You can see more examples on how to use these error crates in our [example repository](https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples/basic-error-error-crates-integration). +### Graceful shutdown + +`lambda_runtime` offers a helper to simplify configuring graceful shutdown signal handling, `spawn_graceful_shutdown_handler()`. This requires the `graceful-shutdown` feature flag and only supports Unix systems. + +You can use it by passing a `FnOnce` closure that returns an async block. That async block will be executed +when the function receives a `SIGTERM` or `SIGKILL`. + +Note that this helper is opinionated in a number of ways. Most notably: +1. It spawns a task to drive your signal handlers +2. It registers a 'no-op' extension in order to enable graceful shutdown signals +3. It panics on unrecoverable errors + +If you prefer to fine-tune the behavior, refer to the implementation of `spawn_graceful_shutdown_handler()` as a starting point for your own. + +For more information on graceful shutdown handling in AWS Lambda, see: [aws-samples/graceful-shutdown-with-aws-lambda](https://github.com/aws-samples/graceful-shutdown-with-aws-lambda). + +Complete example (cleaning up a non-blocking tracing writer): + +```rust,no_run +use lambda_runtime::{service_fn, LambdaEvent, Error}; +use serde_json::{json, Value}; + +#[tokio::main] +async fn main() -> Result<(), Error> { + let func = service_fn(func); + + let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout()); + lambda_runtime::tracing::init_default_subscriber_with_writer(writer); + + let shutdown_hook = || async move { + std::mem::drop(log_guard); + }; + lambda_runtime::spawn_graceful_shutdown_handler(shutdown_hook); + + lambda_runtime::run(func).await?; + Ok(()) +} + +async fn func(event: LambdaEvent) -> Result { + let (event, _context) = event.into_parts(); + let first_name = event["firstName"].as_str().unwrap_or("world"); + + Ok(json!({ "message": format!("Hello, {}!", first_name) })) +} +``` + ## Building and deploying your Lambda functions If you already have Cargo Lambda installed in your machine, run the next command to build your function: diff --git a/lambda-integration-tests/src/helloworld.rs b/lambda-integration-tests/src/helloworld.rs index 9989da43..92fc7d80 100644 --- a/lambda-integration-tests/src/helloworld.rs +++ b/lambda-integration-tests/src/helloworld.rs @@ -8,6 +8,7 @@ use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; async fn main() -> Result<(), Error> { tracing::init_default_subscriber(); let func = service_fn(func); + lambda_runtime::spawn_graceful_shutdown_handler(|| async move {}); lambda_runtime::run(func).await?; Ok(()) } diff --git a/lambda-runtime-api-client/src/lib.rs b/lambda-runtime-api-client/src/lib.rs index 00c00e4c..78b51db1 100644 --- a/lambda-runtime-api-client/src/lib.rs +++ b/lambda-runtime-api-client/src/lib.rs @@ -22,6 +22,7 @@ pub use error::*; pub mod body; #[cfg(feature = "tracing")] +#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))] pub mod tracing; /// API client to interact with the AWS Lambda Runtime API. diff --git a/lambda-runtime-api-client/src/tracing.rs b/lambda-runtime-api-client/src/tracing.rs index 09f41991..79216cf7 100644 --- a/lambda-runtime-api-client/src/tracing.rs +++ b/lambda-runtime-api-client/src/tracing.rs @@ -14,12 +14,16 @@ pub use tracing::*; /// Re-export the `tracing-subscriber` crate to build your own subscribers. pub use tracing_subscriber as subscriber; +use tracing_subscriber::fmt::MakeWriter; const DEFAULT_LOG_LEVEL: &str = "INFO"; /// Initialize `tracing-subscriber` with default logging options. /// -/// This function uses environment variables set with [Lambda's advance logging controls](https://aws.amazon.com/blogs/compute/introducing-advanced-logging-controls-for-aws-lambda-functions/) +/// The default subscriber writes logs to STDOUT in the current context. +/// If you want to customize the writer, see [`init_default_subscriber_with_writer()`]. +/// +/// This function uses environment variables set with [Lambda's advanced logging controls](https://aws.amazon.com/blogs/compute/introducing-advanced-logging-controls-for-aws-lambda-functions/) /// if they're configured for your function. /// /// This subscriber sets the logging level based on environment variables: @@ -31,6 +35,32 @@ const DEFAULT_LOG_LEVEL: &str = "INFO"; /// If the `AWS_LAMBDA_LOG_FORMAT` environment variable is set to `JSON`, the log lines will be formatted as json objects, /// otherwise they will be formatted with the default tracing format. pub fn init_default_subscriber() { + init_default_subscriber_with_writer(std::io::stdout); +} + +/// Initialize `tracing-subscriber` with default logging options, and a custom writer. +/// +/// You might want to avoid writing to STDOUT in the local context via [`init_default_subscriber()`], if you have a high-throughput Lambdas that involve +/// a lot of async concurrency. Since, writing to STDOUT can briefly block your tokio runtime - ref [tracing #2653](https://github.com/tokio-rs/tracing/issues/2653). +/// In that case, you might prefer to use [tracing_appender::NonBlocking] instead - particularly if your Lambda is fairly long-running and stays warm. +/// Though, note that you are then responsible +/// for ensuring gracefuls shutdown. See [`examples/graceful-shutdown`] for a complete example. +/// +/// This function uses environment variables set with [Lambda's advanced logging controls](https://aws.amazon.com/blogs/compute/introducing-advanced-logging-controls-for-aws-lambda-functions/) +/// if they're configured for your function. +/// +/// This subscriber sets the logging level based on environment variables: +/// - if `AWS_LAMBDA_LOG_LEVEL` is set, it takes precedence over any other environment variables. +/// - if `AWS_LAMBDA_LOG_LEVEL` is not set, check if `RUST_LOG` is set. +/// - if none of those two variables are set, use `INFO` as the logging level. +/// +/// The logging format can also be changed based on Lambda's advanced logging controls. +/// If the `AWS_LAMBDA_LOG_FORMAT` environment variable is set to `JSON`, the log lines will be formatted as json objects, +/// otherwise they will be formatted with the default tracing format. +pub fn init_default_subscriber_with_writer(writer: Writer) +where + Writer: for<'writer> MakeWriter<'writer> + Send + Sync + 'static, +{ let log_format = env::var("AWS_LAMBDA_LOG_FORMAT").unwrap_or_default(); let log_level_str = env::var("AWS_LAMBDA_LOG_LEVEL").or_else(|_| env::var("RUST_LOG")); let log_level = @@ -43,7 +73,8 @@ pub fn init_default_subscriber() { EnvFilter::builder() .with_default_directive(log_level.into()) .from_env_lossy(), - ); + ) + .with_writer(writer); if log_format.eq_ignore_ascii_case("json") { collector.json().init() diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 5cf67ae2..fb68aa76 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -20,6 +20,10 @@ opentelemetry = ["opentelemetry-semantic-conventions"] # enables access to the O anyhow = ["dep:anyhow"] # enables From for Diagnostic for anyhow error types, see README.md for more info eyre = ["dep:eyre"] # enables From for Diagnostic for eyre error types, see README.md for more info miette = ["dep:miette"] # enables From for Diagnostic for miette error types, see README.md for more info +# TODO: remove tokio/rt and rt-multi-thread from non-feature-flagged dependencies in new breaking version, since they are unused: +# as well as default features +# https://github.com/awslabs/aws-lambda-rust-runtime/issues/984 +graceful-shutdown = ["tokio/rt", "tokio/signal", "dep:lambda-extension"] [dependencies] anyhow = { version = "1.0.86", optional = true } @@ -39,6 +43,7 @@ hyper-util = { workspace = true, features = [ "http1", "tokio", ] } +lambda-extension = { version = "0.11.0", path = "../lambda-extension", default-features = false, optional = true } lambda_runtime_api_client = { version = "0.11.1", path = "../lambda-runtime-api-client", default-features = false } miette = { version = "7.2.0", optional = true } opentelemetry-semantic-conventions = { version = "0.29", optional = true, features = ["semconv_experimental"] } @@ -67,4 +72,7 @@ hyper-util = { workspace = true, features = [ "server-auto", "tokio", ] } +# Self dependency to enable the graceful-shutdown feature for tests +lambda_runtime = { path = ".", features = ["tracing", "graceful-shutdown"] } pin-project-lite = { workspace = true } +tracing-appender = "0.2" diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 76d0562a..e1b15a25 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -32,6 +32,7 @@ pub mod streaming; /// Utilities to initialize and use `tracing` and `tracing-subscriber` in Lambda Functions. #[cfg(feature = "tracing")] +#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))] pub use lambda_runtime_api_client::tracing; /// Types available to a Lambda function. @@ -123,3 +124,110 @@ where let runtime = Runtime::new(handler).layer(layers::TracingLayer::new()); runtime.run().await } + +/// Spawns a task that will be execute a provided async closure when the process +/// receives unix graceful shutdown signals. If the closure takes longer than 500ms +/// to execute, an unhandled `SIGKILL` signal might be received. +/// +/// You can use this future to execute cleanup or flush related logic prior to runtime shutdown. +/// +/// This function must be called prior to [lambda_runtime::run()]. +/// +/// Note that this implicitly also registers and drives a no-op internal extension that subscribes to no events. +/// This extension will be named `_lambda-rust-runtime-no-op-graceful-shutdown-helper`. This extension name +/// can not be reused by other registered extensions. This is necessary in order to receive graceful shutdown signals. +/// +/// This extension is cheap to run because it receives no events, but is not zero cost. If you have another extension +/// registered already, you might prefer to manually construct your own graceful shutdown handling without the dummy extension. +/// +/// For more information on general AWS Lambda graceful shutdown handling, see: +/// https://github.com/aws-samples/graceful-shutdown-with-aws-lambda +/// +/// # Panics +/// +/// This function panics if: +/// - this function is called after [lambda_runtime::run()] +/// - this function is called outside of a context that has access to the tokio i/o +/// - the no-op extension cannot be registered +/// - either signal listener panics [tokio::signal::unix](https://docs.rs/tokio/latest/tokio/signal/unix/fn.signal.html#errors) +/// +/// # Example +/// ```no_run +/// use lambda_runtime::{Error, service_fn, LambdaEvent}; +/// use serde_json::Value; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Error> { +/// let func = service_fn(func); +/// +/// let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout()); +/// lambda_runtime::tracing::init_default_subscriber_with_writer(writer); +/// +/// let shutdown_hook = || async move { +/// std::mem::drop(log_guard); +/// }; +/// lambda_runtime::spawn_graceful_shutdown_handler(shutdown_hook); +/// +/// lambda_runtime::run(func).await?; +/// Ok(()) +/// } +/// +/// async fn func(event: LambdaEvent) -> Result { +/// Ok(event.payload) +/// } +/// ``` +#[cfg(all(unix, feature = "graceful-shutdown"))] +#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "tokio-rt"))))] +pub fn spawn_graceful_shutdown_handler(shutdown_hook: impl FnOnce() -> Fut + Send + 'static) +where + Fut: Future + Send + 'static, +{ + tokio::task::spawn(async move { + // You need an extension registered with the Lambda orchestrator in order for your process + // to receive a SIGTERM for graceful shutdown. + // + // We accomplish this here by registering a no-op internal extension, which does not subscribe to any events. + // + // This extension is cheap to run since after it connects to the lambda orchestration, the connection + // will just wait forever for data to come, which never comes, so it won't cause wakes. + let extension = lambda_extension::Extension::new() + // Don't subscribe to any event types + .with_events(&[]) + // Internal extension names MUST be unique within a given Lambda function. + .with_extension_name("_lambda-rust-runtime-no-op-graceful-shutdown-helper") + // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init + // phase and begins the Invoke phase. + .register() + .await + .expect("could not register no-op extension for graceful shutdown"); + + let graceful_shutdown_future = async move { + let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap(); + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); + tokio::select! { + _sigint = sigint.recv() => { + eprintln!("[runtime] SIGINT received"); + eprintln!("[runtime] Graceful shutdown in progress ..."); + shutdown_hook().await; + eprintln!("[runtime] Graceful shutdown completed"); + std::process::exit(0); + }, + _sigterm = sigterm.recv()=> { + eprintln!("[runtime] SIGTERM received"); + eprintln!("[runtime] Graceful shutdown in progress ..."); + shutdown_hook().await; + eprintln!("[runtime] Graceful shutdown completed"); + std::process::exit(0); + }, + } + }; + + // TODO: add biased! to always poll the signal handling future first, once supported: + // https://github.com/tokio-rs/tokio/issues/7304 + let _: (_, ()) = tokio::join!(graceful_shutdown_future, async { + // we suppress extension errors because we don't actually mind if it crashes, + // all we need to do is kick off the run so that lambda exits the init phase + let _ = extension.run().await; + }); + }); +}