diff --git a/Cargo.lock b/Cargo.lock index e8b689e3f7..8a5f5c5250 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1506,6 +1506,7 @@ dependencies = [ "quickcheck", "thiserror", "tonic", + "tower", ] [[package]] @@ -1921,8 +1922,7 @@ dependencies = [ [[package]] name = "linkerd2-proxy-api" version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2348745f909668e6de2dbd175eeeac374887ffb33989a0e09766f1807b27cdfe" +source = "git+https://github.com/linkerd/linkerd2-proxy-api?branch=eliza/retry-policy#7790fa05aab1c3c11793852a6d571845b1b3ead9" dependencies = [ "h2", "http", diff --git a/Cargo.toml b/Cargo.toml index dd2bdac671..0fd386ec68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,3 +83,8 @@ lto = true [patch.crates-io] webpki = { git = "https://github.com/linkerd/webpki", branch = "cert-dns-names-0.22" } + +[patch.crates-io.linkerd2-proxy-api] +git = "https://github.com/linkerd/linkerd2-proxy-api" +branch = "eliza/retry-policy" + diff --git a/linkerd/app/integration/src/policy.rs b/linkerd/app/integration/src/policy.rs index 004baa2efc..6e2090ad61 100644 --- a/linkerd/app/integration/src/policy.rs +++ b/linkerd/app/integration/src/policy.rs @@ -120,10 +120,12 @@ pub fn outbound_default(dst: impl ToString) -> outbound::OutboundPolicy { http1: Some(proxy_protocol::Http1 { routes: vec![route.clone()], failure_accrual: None, + retry_budget: None, }), http2: Some(proxy_protocol::Http2 { routes: vec![route], failure_accrual: None, + retry_budget: None, }), opaque: Some(proxy_protocol::Opaque { routes: vec![outbound_default_opaque_route(dst)], @@ -134,28 +136,33 @@ pub fn outbound_default(dst: impl ToString) -> outbound::OutboundPolicy { } pub fn outbound_default_http_route(dst: impl ToString) -> outbound::HttpRoute { - use api::http_route; outbound::HttpRoute { metadata: Some(api::meta::Metadata { kind: Some(api::meta::metadata::Kind::Default("default".to_string())), }), hosts: Vec::new(), rules: vec![outbound::http_route::Rule { - matches: vec![http_route::HttpRouteMatch { - path: Some(http_route::PathMatch { - kind: Some(http_route::path_match::Kind::Prefix("/".to_string())), - }), - headers: Vec::new(), - query_params: Vec::new(), - method: None, - }], + matches: vec![match_path_prefix("/")], filters: Vec::new(), backends: Some(http_first_available(std::iter::once(backend(dst)))), request_timeout: None, + retry_policy: None, }], } } +pub fn match_path_prefix(path: impl ToString) -> api::http_route::HttpRouteMatch { + use api::http_route; + http_route::HttpRouteMatch { + path: Some(http_route::PathMatch { + kind: Some(http_route::path_match::Kind::Prefix(path.to_string())), + }), + headers: Vec::new(), + query_params: Vec::new(), + method: None, + } +} + pub fn outbound_default_opaque_route(dst: impl ToString) -> outbound::OpaqueRoute { use outbound::opaque_route::{self, distribution}; outbound::OpaqueRoute { diff --git a/linkerd/app/integration/src/tests/client_policy.rs b/linkerd/app/integration/src/tests/client_policy.rs index 50de7fa640..2b2ba6b8f6 100644 --- a/linkerd/app/integration/src/tests/client_policy.rs +++ b/linkerd/app/integration/src/tests/client_policy.rs @@ -2,6 +2,8 @@ use crate::*; use linkerd2_proxy_api::{self as api}; use policy::outbound::{self, proxy_protocol}; +mod retries; + #[tokio::test] async fn default_http1_route() { let _trace = trace_init(); @@ -64,10 +66,12 @@ async fn empty_http1_route() { rules: Vec::new(), }], failure_accrual: None, + retry_budget: None, }), http2: Some(proxy_protocol::Http2 { routes: vec![policy::outbound_default_http_route(&dst)], failure_accrual: None, + retry_budget: None, }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst)], @@ -149,6 +153,7 @@ async fn empty_http2_route() { http1: Some(proxy_protocol::Http1 { routes: vec![policy::outbound_default_http_route(&dst)], failure_accrual: None, + retry_budget: None, }), http2: Some(proxy_protocol::Http2 { routes: vec![outbound::HttpRoute { @@ -157,6 +162,7 @@ async fn empty_http2_route() { rules: Vec::new(), }], failure_accrual: None, + retry_budget: None, }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst)], @@ -224,6 +230,7 @@ async fn header_based_routing() { policy::backend(dst), ))), request_timeout: None, + retry_policy: None, }; let route = outbound::HttpRoute { @@ -238,6 +245,7 @@ async fn header_based_routing() { policy::backend(&dst_world), ))), request_timeout: None, + retry_policy: None, }, // x-hello-city: sf | x-hello-city: san francisco mk_header_rule( @@ -267,10 +275,12 @@ async fn header_based_routing() { http1: Some(proxy_protocol::Http1 { routes: vec![route.clone()], failure_accrual: None, + retry_budget: None, }), http2: Some(proxy_protocol::Http2 { routes: vec![route], failure_accrual: None, + retry_budget: None, }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst_world)], @@ -400,8 +410,8 @@ async fn path_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(dst), ))), - request_timeout: None, + retry_policy: None, }; let route = outbound::HttpRoute { @@ -416,6 +426,7 @@ async fn path_based_routing() { policy::backend(&dst_world), ))), request_timeout: None, + retry_policy: None, }, // /goodbye/* mk_path_rule( @@ -450,10 +461,12 @@ async fn path_based_routing() { http1: Some(proxy_protocol::Http1 { routes: vec![route.clone()], failure_accrual: None, + retry_budget: None, }), http2: Some(proxy_protocol::Http2 { routes: vec![route], failure_accrual: None, + retry_budget: None, }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst_world)], diff --git a/linkerd/app/integration/src/tests/client_policy/retries.rs b/linkerd/app/integration/src/tests/client_policy/retries.rs new file mode 100644 index 0000000000..6d34e54156 --- /dev/null +++ b/linkerd/app/integration/src/tests/client_policy/retries.rs @@ -0,0 +1,461 @@ +use super::*; +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +/// Tests that a failed HTTP/1.1 response is retried. +#[tokio::test] +async fn http1_retries() { + test_basic_retry(client::http1, server::http1).await +} + +/// Tests that a failed HTTP/2 response is retried. +#[tokio::test] +async fn http2_retries() { + test_basic_retry(client::http1, server::http1).await +} + +/// Tests that a failed HTTP/1.1 response that matches a route rule *without* a +/// retry policy iss not retried, even if the `OutboundPolicy` contains a retry +/// budget. +#[tokio::test] +async fn http1_doesnt_retry_non_retryable_rule() { + test_non_retryable(client::http1, server::http1).await +} + +/// Tests that a failed HTTP/2 response that matches a route rule *without* a +/// retry policy iss not retried, even if the `OutboundPolicy` contains a retry +/// budget. +#[tokio::test] +async fn http2_doesnt_retry_non_retryable_rule() { + test_non_retryable(client::http2, server::http2).await +} + +/// Tests that a failed HTTP/1.1 response is retried until the per-request retry +/// limit is reached, and that a subsequent request has its own separate +/// per-request retry limit. +#[tokio::test] +async fn http1_doesnt_retry_at_per_request_limit() { + test_retry_limit(client::http1, server::http1).await +} + +/// Tests that a failed HTTP/2 response is retried until the per-request retry +/// limit is reached, and that a subsequent request has its own separate +/// per-request retry limit. +#[tokio::test] +async fn http2_doesnt_retry_at_per_request_limit() { + test_retry_limit(client::http2, server::http2).await +} + +/// Tests that an HTTP/1.1 POST request with a body is retried. +#[tokio::test] +async fn http1_retries_post_body() { + test_retry_body(client::http1, server::http1, http::Method::POST).await +} + +/// Tests that an HTTP/2 POST request with a body is retried. +#[tokio::test] +async fn http2_retries_post_body() { + test_retry_body(client::http2, server::http2, http::Method::POST).await +} + +/// Tests that an HTTP/1.1 PUT request with a body is retried. +#[tokio::test] +async fn http1_retries_put_body() { + test_retry_body(client::http1, server::http1, http::Method::PUT).await +} + +/// Tests that an HTTP/2 PUT request with a body is retried. +#[tokio::test] +async fn http2_retries_put_body() { + test_retry_body(client::http2, server::http2, http::Method::PUT).await +} + +/// Tests that an HTTP/1.1 request with a body is not retried if the body's +/// length exceeds the maximum buffered body limit. +#[tokio::test] +async fn http1_doesnt_retry_long_body() { + test_too_long_retry_body(client::http1, server::http1).await +} + +/// Tests that an HTTP/2 request with a body is not retried if the body's +/// length exceeds the maximum buffered body limit. +#[tokio::test] +async fn http2_doesnt_retry_long_body() { + test_too_long_retry_body(client::http2, server::http2).await +} + +/// Tests that the `timeouts.backend_request` timeout is applied separately for +/// each individual time a request is retried, rather than starting when the +/// request is recieved and including the time spent on all retries of that +/// request. Also, tests that requests which hit the `backend_request` timeout +/// are retried as though they were HTTP 504 responses. +#[tokio::test] +// ignore this until we implement a way of marking timeouts as retryable... +#[ignore] +async fn http1_backend_request_timeout_is_per_try() { + test_backend_timeout_is_per_try(client::http1, server::http1).await +} + +/// Tests that the `timeouts.backend_request` timeout is applied separately for +/// each individual time a request is retried, rather than starting when the +/// request is recieved and including the time spent on all retries of that +/// request. Also, tests that requests which hit the `backend_request` timeout +/// are retried as though they were HTTP 504 responses. +#[tokio::test] +// ignore this until we implement a way of marking timeouts as retryable... +#[ignore] +async fn http2_backend_request_timeout_is_per_try() { + test_backend_timeout_is_per_try(client::http2, server::http2).await +} + +/// Tests that the `timeouts.request` timeout applies to *all* retries of a +/// request, and that a request will fail if the total time spent retrying it +/// exceeds the `request` timeout, even if each individual retry request is +/// below that timeout. + +#[tokio::test] +async fn http1_request_timeout_across_retries() { + test_request_timeout_across_retries(client::http1, server::http1).await +} + +/// Tests that the `timeouts.request` timeout applies to *all* retries of a +/// request, and that a request will fail if the total time spent retrying it +/// exceeds the `request` timeout, even if each individual retry request is +/// below that timeout. +#[tokio::test] +async fn http2_request_timeout_across_retries() { + test_request_timeout_across_retries(client::http2, server::http2).await +} + +async fn test_basic_retry( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + mk_server: fn() -> server::Server, +) { + let srv = retry_server(mk_server, 1).run().await; + run_retry_test(mk_client, srv, |client| async move { + let rsp = client + .request(client.request_builder("/retry")) + .await + .unwrap(); + assert_eq!( + rsp.status(), + http::StatusCode::OK, + "retry route should succeed" + ); + }) + .await +} + +async fn test_non_retryable( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + mk_server: fn() -> server::Server, +) { + let srv = retry_server(mk_server, 1).run().await; + run_retry_test(mk_client, srv, |client| async move { + let rsp = client + .request(client.request_builder("/no-retry")) + .await + .unwrap(); + assert_eq!( + rsp.status(), + http::StatusCode::INTERNAL_SERVER_ERROR, + "non-retryable route should not be retried" + ); + }) + .await; +} + +async fn test_retry_limit( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + mk_server: fn() -> server::Server, +) { + // fail 4 times, so that the first request will reach the per-request limit + // of 3 retries. + let srv = retry_server(mk_server, 4).run().await; + run_retry_test(mk_client, srv, |client| async move { + let rsp = client + .request(client.request_builder("/retry")) + .await + .unwrap(); + assert_eq!( + rsp.status(), + http::StatusCode::INTERNAL_SERVER_ERROR, + "retries should stop when at the per-request limit" + ); + + let rsp = client + .request(client.request_builder("/retry")) + .await + .unwrap(); + assert_eq!( + rsp.status(), + http::StatusCode::OK, + "retry limit should be tracked per-request" + ); + }) + .await +} + +async fn test_retry_body( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + mk_server: fn() -> server::Server, + method: http::Method, +) { + let srv = retry_server(mk_server, 1).run().await; + run_retry_test(mk_client, srv, move |client| async move { + let req = client + .request_builder("/retry") + .method(method.clone()) + .body("i'm a request body".into()) + .unwrap(); + let rsp = client.request_body(req).await; + + assert_eq!( + rsp.status(), + http::StatusCode::OK, + "{method:?} request with body should be retried" + ); + }) + .await +} + +async fn test_too_long_retry_body( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + mk_server: fn() -> server::Server, +) { + let srv = retry_server(mk_server, 1).run().await; + run_retry_test(mk_client, srv, move |client| async move { + let req = client + .request_builder("/retry") + .method(http::Method::POST) + .body(hyper::Body::from(&[1u8; 64 * 1024 + 1][..])) + .unwrap(); + let rsp = client.request_body(req).await; + + assert_eq!( + rsp.status(), + http::StatusCode::INTERNAL_SERVER_ERROR, + "request with too long body should not be retried" + ); + }) + .await +} + +async fn test_backend_timeout_is_per_try( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + mk_server: fn() -> server::Server, +) { + let srv = retry_server(mk_server, 1).run().await; + run_retry_timeout_test( + mk_client, + srv, + None, + Some(Duration::from_millis(400)), + move |client| async move { + let req = client + .request_builder("/retry/timeout") + .method(http::Method::GET) + .body("".into()) + .unwrap(); + let rsp = client.request_body(req).await; + tracing::info!(?rsp); + assert_eq!( + rsp.status(), + http::StatusCode::OK, + "backend request timeouts should be retried" + ); + }, + ) + .await +} + +async fn test_request_timeout_across_retries( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + mk_server: fn() -> server::Server, +) { + let srv = retry_server(mk_server, 2).run().await; + run_retry_timeout_test( + mk_client, + srv, + Some(Duration::from_millis(800)), + Some(Duration::from_millis(400)), + move |client| async move { + let req = client + .request_builder("/retry/timeout") + .method(http::Method::GET) + .body("".into()) + .unwrap(); + let rsp = client.request_body(req).await; + + assert_eq!( + rsp.status(), + http::StatusCode::GATEWAY_TIMEOUT, + "the request timeout should be tracked across retries" + ); + }, + ) + .await +} + +async fn run_retry_test>( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + srv: server::Listening, + test: impl FnOnce(client::Client) -> F, +) { + run_retry_timeout_test(mk_client, srv, None, None, test).await +} + +async fn run_retry_timeout_test>( + mk_client: fn(addr: SocketAddr, auth: &'static str) -> client::Client, + srv: server::Listening, + request_timeout: Option, + backend_timeout: Option, + test: impl FnOnce(client::Client) -> F, +) { + let _trace = trace_init(); + + const AUTHORITY: &str = "policy.test.svc.cluster.local"; + let ctrl = controller::new(); + + let dst = format!("{AUTHORITY}:{}", srv.addr.port()); + let dst_tx = ctrl.destination_tx(&dst); + dst_tx.send_addr(srv.addr); + let _profile_tx = ctrl.profile_tx_default(srv.addr, AUTHORITY); + let policy = controller::policy() + // stop the admin server from entering an infinite retry loop + .with_inbound_default(policy::all_unauthenticated()) + .outbound( + srv.addr, + retry_policy(dst, request_timeout, backend_timeout), + ); + + let proxy = proxy::new() + .controller(ctrl.run().await) + .policy(policy.run().await) + .outbound(srv) + .run() + .await; + let client = mk_client(proxy.outbound, AUTHORITY); + test(client).await; + + // ensure panics from the server are propagated + proxy.join_servers().await; +} + +fn retry_server(srv: fn() -> server::Server, fails: usize) -> server::Server { + srv() + .route_fn("/retry", { + let retries = AtomicUsize::new(0); + move |_| { + let attempt = retries.fetch_add(1, Ordering::SeqCst); + tracing::info!(attempt, "/retry"); + let status = if attempt > fails { + http::StatusCode::OK + } else { + http::StatusCode::INTERNAL_SERVER_ERROR + }; + http::Response::builder() + .status(status) + .body("".into()) + .unwrap() + } + }) + .route_async("/retry/timeout", { + let retries = Arc::new(AtomicUsize::new(0)); + move |_| { + let retries = retries.clone(); + async move { + let attempt = retries.fetch_add(1, Ordering::SeqCst); + tracing::info!(attempt, "/retry/timeout"); + if attempt < fails { + tokio::time::sleep(Duration::from_millis(500)).await; + }; + http::Response::builder() + .status(http::StatusCode::OK) + .body("".into()) + } + } + }) + .route_fn("/no-retry", move |_| { + tracing::info!("/no-retry"); + http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body("".into()) + .unwrap() + }) +} + +fn retry_policy( + dst: impl ToString, + request_timeout: Option, + backend_timeout: Option, +) -> outbound::OutboundPolicy { + use outbound::http_route::{self, distribution}; + let dst = dst.to_string(); + + let dist = http_route::Distribution { + kind: Some(distribution::Kind::FirstAvailable( + distribution::FirstAvailable { + backends: vec![http_route::RouteBackend { + backend: Some(policy::backend(dst.clone())), + filters: Vec::new(), + request_timeout: backend_timeout.map(|t| t.try_into().unwrap()), + }], + }, + )), + }; + + let routes = vec![outbound::HttpRoute { + metadata: Some(httproute_meta("retry")), + hosts: Vec::new(), + rules: vec![ + // this rule is retryable + outbound::http_route::Rule { + matches: vec![policy::match_path_prefix("/retry")], + filters: Vec::new(), + backends: Some(dist.clone()), + request_timeout: request_timeout.map(|t| t.try_into().unwrap()), + retry_policy: Some(outbound::http_route::RetryPolicy { + retry_statuses: vec![controller::pb::HttpStatusRange { min: 500, max: 599 }], + max_per_request: 3, + }), + }, + // this route is not retryable + outbound::http_route::Rule { + matches: vec![policy::match_path_prefix("/no-retry")], + filters: Vec::new(), + backends: Some(dist), + request_timeout: None, + retry_policy: None, + }, + ], + }]; + + let retry_budget = Some(controller::retry_budget(Duration::from_secs(10), 0.5, 10)); + + outbound::OutboundPolicy { + metadata: Some(api::meta::Metadata { + kind: Some(api::meta::metadata::Kind::Default("retry-test".to_string())), + }), + protocol: Some(outbound::ProxyProtocol { + kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect { + timeout: Some(Duration::from_secs(10).try_into().unwrap()), + http1: Some(proxy_protocol::Http1 { + routes: routes.clone(), + failure_accrual: None, + retry_budget: retry_budget.clone(), + }), + http2: Some(proxy_protocol::Http2 { + routes, + failure_accrual: None, + retry_budget, + }), + opaque: Some(proxy_protocol::Opaque { + routes: vec![policy::outbound_default_opaque_route(&dst)], + }), + })), + }), + } +} diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index 7d99e442b0..fa97ae0f74 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -247,6 +247,7 @@ fn policy_for_backend( request_timeout: None, }, ])), + retry_policy: None, }), }; @@ -266,6 +267,7 @@ fn policy_for_backend( request_timeout: None, }, ])), + retry_policy: None, }, }], }]); @@ -275,10 +277,12 @@ fn policy_for_backend( http1: policy::http::Http1 { routes: routes.clone(), failure_accrual: Default::default(), + retry_budget: None, }, http2: policy::http::Http2 { routes, failure_accrual: Default::default(), + retry_budget: None, }, opaque, }; diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index 24bf493280..67e5543741 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -18,7 +18,7 @@ mod endpoint; mod handle_proxy_error_headers; pub mod logical; mod require_id_header; -mod retry; +pub(crate) mod retry; mod server; pub use self::logical::{policy, profile, LogicalAddr, Routes}; @@ -26,7 +26,9 @@ pub(crate) use self::require_id_header::IdentityRequired; pub use linkerd_app_core::proxy::http::{self as http, *}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct Http(T); +pub struct Http { + target: T, +} pub fn spawn_routes( mut route_rx: watch::Receiver, @@ -115,7 +117,7 @@ impl Outbound { .push_http_logical() .map_stack(move |config, _, stk| { stk.push_new_idle_cached(config.discovery_idle_timeout) - .push_map_target(Http) + .push_map_target(|target| Http { target }) .push(svc::ArcNewService::layer()) }) } @@ -128,7 +130,7 @@ where T: svc::Param, { fn param(&self) -> http::Version { - self.0.param() + self.target.param() } } @@ -137,6 +139,6 @@ where T: svc::Param>, { fn param(&self) -> watch::Receiver { - self.0.param() + self.target.param() } } diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index fd35ced06b..735490932c 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -22,7 +22,7 @@ mod tests; pub struct LogicalAddr(pub Addr); /// Configures the flavor of HTTP routing. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub enum Routes { /// Policy routes. Policy(policy::Params), diff --git a/linkerd/app/outbound/src/http/logical/policy.rs b/linkerd/app/outbound/src/http/logical/policy.rs index 7a79dc6985..904356361a 100644 --- a/linkerd/app/outbound/src/http/logical/policy.rs +++ b/linkerd/app/outbound/src/http/logical/policy.rs @@ -14,7 +14,7 @@ pub use self::{ pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual}; /// HTTP or gRPC policy route parameters. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub enum Params { Http(router::HttpParams), Grpc(router::GrpcParams), diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index 864e2b3661..184d0cc826 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -1,10 +1,10 @@ use super::super::Concrete; -use crate::RouteRef; +use crate::{http::retry, RouteRef}; use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result}; use linkerd_distribute as distribute; use linkerd_http_route as http_route; use linkerd_proxy_client_policy as policy; -use std::{fmt::Debug, hash::Hash, sync::Arc}; +use std::{fmt::Debug, hash::Hash, num::NonZeroU32, sync::Arc}; pub(crate) mod backend; pub(crate) mod filters; @@ -31,6 +31,14 @@ pub(crate) struct Route { pub(super) distribution: BackendDistribution, pub(super) failure_policy: E, pub(super) request_timeout: Option, + pub(super) retry_policy: Option>, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(super) struct RouteRetryPolicy { + pub(super) budget: policy::retry::Budget, + pub(super) max_per_request: Option, + pub(super) retryable: E, } pub(crate) type MatchedRoute = Matched>; @@ -76,6 +84,7 @@ where // Assert that filters can be applied. Self: filters::Apply, Self: svc::Param, + Self: svc::Param>, MatchedBackend: filters::Apply, backend::ExtractMetrics: svc::ExtractParam>, { @@ -115,10 +124,22 @@ where .push(MatchedBackend::layer(backend_metrics.clone())) .lift_new_with_target() .push(NewDistribute::layer()) - // The router does not take the backend's availability into - // consideration, so we must eagerly fail requests to prevent - // leaking tasks onto the runtime. - .push_on_service(svc::LoadShed::layer()) + .check_new_service::>() + .push_on_service( + svc::layers() + // The router does not take the backend's availability into + // consideration, so we must eagerly fail requests to prevent + // leaking tasks onto the runtime. + .push(svc::LoadShed::layer()) + // Depending on whether or not the request can b + // retried, it may have one of two `Body` types. This + // layer unifies any `Body` type into `BoxBody`. + .push(http::BoxRequest::erased()), + ) + // Sets an optional retry policy. + // TODO(eliza): currently, HTTPRoute retries don't have metrics + // the way profile retries do... + .push(retry::layer(None)) // TODO(ver) attach the `E` typed failure policy to requests. .push(filters::NewApplyFilters::::layer()) // Sets an optional request timeout. @@ -171,6 +192,24 @@ impl svc::Param for Http { } } +impl svc::Param> for Http { + fn param(&self) -> Option { + let &RouteRetryPolicy { + ref budget, + max_per_request, + ref retryable, + } = self.params.retry_policy.as_ref()?; + Some(retry::Params { + budget: budget.clone().into(), + max_per_request, + profile_labels: None, + response_classes: classify::Request::ClientPolicy(classify::ClientPolicy::Http( + retryable.clone(), + )), + }) + } +} + impl filters::Apply for Grpc { #[inline] fn apply_request(&self, req: &mut ::http::Request) -> Result<()> { @@ -190,3 +229,31 @@ impl svc::Param for Grpc { )) } } + +impl svc::Param> for Grpc { + fn param(&self) -> Option { + // gRPC client policies cannot currently configure retries. + // TODO: in the future, this should be implemented. + None + } +} + +// === impl RouteRetryPolicy === + +impl RouteRetryPolicy { + pub(super) fn new( + budget: Option, + policy: Option>, + ) -> Option { + let budget = budget?; + let policy::retry::RoutePolicy { + retryable, + max_per_request, + } = policy?; + Some(Self { + budget, + max_per_request, + retryable, + }) + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs index f790f14b85..abae4c9e16 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs @@ -3,7 +3,7 @@ use crate::{BackendRef, RouteRef}; use linkerd_app_core::{proxy::http, svc, Error, Result}; use linkerd_http_route as http_route; use linkerd_proxy_client_policy as policy; -use std::{fmt::Debug, hash::Hash, sync::Arc}; +use std::{fmt::Debug, future::Future, hash::Hash, sync::Arc}; mod count_reqs; mod metrics; @@ -93,7 +93,7 @@ where http::Request, Response = http::Response, Error = Error, - Future = impl Send, + Future = impl Future, Error>> + Send, > + Clone, >, > + Clone @@ -131,6 +131,7 @@ where }) } })) + .push_on_service(http::BoxResponse::<_>::layer()) .push(svc::ArcNewService::layer()) .into_inner() }) diff --git a/linkerd/app/outbound/src/http/logical/policy/router.rs b/linkerd/app/outbound/src/http/logical/policy/router.rs index e2d39d8a6c..71db236640 100644 --- a/linkerd/app/outbound/src/http/logical/policy/router.rs +++ b/linkerd/app/outbound/src/http/logical/policy/router.rs @@ -2,7 +2,7 @@ use super::{ super::{concrete, Concrete, LogicalAddr, NoRoute}, route, RouteBackendMetrics, }; -use crate::{BackendRef, EndpointRef, ParentRef, RouteRef}; +use crate::{http::retry, BackendRef, EndpointRef, ParentRef, RouteRef}; use linkerd_app_core::{ classify, proxy::http, svc, transport::addrs::*, Addr, Error, NameAddr, Result, }; @@ -11,13 +11,14 @@ use linkerd_http_route as http_route; use linkerd_proxy_client_policy as policy; use std::{fmt::Debug, hash::Hash, sync::Arc}; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Params { pub addr: Addr, pub meta: ParentRef, pub routes: Arc<[http_route::Route>]>, pub backends: Arc<[policy::Backend]>, pub failure_accrual: policy::FailureAccrual, + pub retry_budget: Option, } pub type HttpParams = @@ -62,7 +63,8 @@ where Key = route::MatchedRoute, Error = NoRoute, >, - route::MatchedRoute: route::filters::Apply + svc::Param, + route::MatchedRoute: + route::filters::Apply + svc::Param + svc::Param>, route::MatchedBackend: route::filters::Apply, route::backend::ExtractMetrics: svc::ExtractParam>, @@ -125,6 +127,7 @@ where routes, backends, failure_accrual, + retry_budget, } = rts; let mk_concrete = { @@ -206,6 +209,7 @@ where distribution, failure_policy, request_timeout, + retry_policy, }| { let route_ref = RouteRef(meta); let distribution = mk_distribution(&route_ref, &distribution); @@ -217,6 +221,7 @@ where failure_policy, distribution, request_timeout, + retry_policy: route::RouteRetryPolicy::new(retry_budget.clone(), retry_policy), } }; diff --git a/linkerd/app/outbound/src/http/logical/policy/tests.rs b/linkerd/app/outbound/src/http/logical/policy/tests.rs index 866a6ee2f4..7365abf953 100644 --- a/linkerd/app/outbound/src/http/logical/policy/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/tests.rs @@ -54,6 +54,7 @@ async fn header_based_route() { backend, request_timeout: None, }])), + retry_policy: None, }; // Stack that produces mock services. @@ -112,6 +113,7 @@ async fn header_based_route() { }]), backends: std::iter::once(default).chain(Some(special)).collect(), failure_accrual: Default::default(), + retry_budget: None, } }); @@ -218,11 +220,13 @@ async fn http_filter_request_headers() { request_timeout: None, }, ])), + retry_policy: None, }, }], }]), backends: std::iter::once(backend).collect(), failure_accrual: Default::default(), + retry_budget: None, } }); diff --git a/linkerd/app/outbound/src/http/logical/profile.rs b/linkerd/app/outbound/src/http/logical/profile.rs index 8bdfa6db96..512aa7670f 100644 --- a/linkerd/app/outbound/src/http/logical/profile.rs +++ b/linkerd/app/outbound/src/http/logical/profile.rs @@ -304,7 +304,7 @@ impl RouteParams { // layer unifies any `Body` type into `BoxBody`. .push_on_service(http::BoxRequest::erased()) // Sets an optional retry policy. - .push(retry::layer(metrics.http_profile_route_retry.clone())) + .push(retry::layer(Some(metrics.http_profile_route_retry.clone()))) // Sets an optional request timeout. .push(http::NewTimeout::layer()) // Records per-route metrics. @@ -353,6 +353,22 @@ impl svc::Param for RouteParams { } } +impl svc::Param> for RouteParams { + fn param(&self) -> Option { + let retries = self.profile.retries()?; + Some(retry::Params { + budget: retries.budget().clone(), + // Per-request retry limits are not configured by ServiceProfiles + max_per_request: None, + profile_labels: Some(metrics::ProfileRouteLabels::outbound( + self.addr.clone(), + &self.profile, + )), + response_classes: self.profile.response_classes().clone().into(), + }) + } +} + impl svc::Param for RouteParams { fn param(&self) -> metrics::ProfileRouteLabels { metrics::ProfileRouteLabels::outbound(self.addr.clone(), &self.profile) diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index 429b9e8390..b5a7d3faf5 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -41,6 +41,7 @@ async fn routes() { backends: Arc::new([backend.clone()]), routes: Arc::new([default_route(backend)]), failure_accrual: client_policy::FailureAccrual::None, + retry_budget: None, }))); let target = Target { num: 1, @@ -102,6 +103,7 @@ async fn consecutive_failures_accrue() { max_failures: 3, backoff, }, + retry_budget: None, }))); let target = Target { num: 1, @@ -238,6 +240,7 @@ async fn balancer_doesnt_select_tripped_breakers() { max_failures: 3, backoff, }, + retry_budget: None, }))); let target = Target { num: 1, @@ -315,6 +318,7 @@ async fn route_request_timeout() { backends: Arc::new([backend]), routes: Arc::new([route]), failure_accrual: client_policy::FailureAccrual::None, + retry_budget: None, }))) }; let target = Target { @@ -381,6 +385,7 @@ async fn backend_request_timeout() { backends: Arc::new([backend]), routes: Arc::new([route]), failure_accrual: client_policy::FailureAccrual::None, + retry_budget: None, }))) }; let target = Target { @@ -596,6 +601,7 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route backend, request_timeout: None, }])), + retry_policy: None, }, }], } @@ -626,6 +632,7 @@ fn timeout_route( backend, request_timeout: backend_timeout, }])), + retry_policy: None, }, }], } diff --git a/linkerd/app/outbound/src/http/retry.rs b/linkerd/app/outbound/src/http/retry.rs index b61ddf3722..896da72b91 100644 --- a/linkerd/app/outbound/src/http/retry.rs +++ b/linkerd/app/outbound/src/http/retry.rs @@ -3,7 +3,6 @@ use linkerd_app_core::{ classify, http_metrics::retries::Handle, metrics::{self, ProfileRouteLabels}, - profiles::{self, http::Route}, proxy::http::{ClientHandle, EraseResponse, HttpBody}, svc::{layer, Either, Param}, Error, @@ -14,10 +13,11 @@ use linkerd_http_retry::{ ReplayBody, }; use linkerd_retry as retry; -use std::sync::Arc; +pub use retry::Budget; +use std::{num::NonZeroU32, sync::Arc}; pub fn layer( - metrics: metrics::HttpProfileRouteRetry, + metrics: Option, ) -> impl layer::Layer>> + Clone { retry::layer(NewRetryPolicy::new(metrics)) // Because we wrap the response body type on retries, we must include a @@ -26,42 +26,63 @@ pub fn layer( .with_proxy(EraseResponse::new(())) } +#[derive(Clone, Debug)] +pub struct Params { + pub budget: Arc, + pub max_per_request: Option, + pub profile_labels: Option, + pub response_classes: classify::Request, +} + #[derive(Clone, Debug)] pub struct NewRetryPolicy { - metrics: metrics::HttpProfileRouteRetry, + metrics: Option, } #[derive(Clone, Debug)] pub struct RetryPolicy { - metrics: Handle, + metrics: Option, budget: Arc, - response_classes: profiles::http::ResponseClasses, + response_classes: classify::Request, + max_per_request: Option, } /// Allow buffering requests up to 64 kb const MAX_BUFFERED_BYTES: usize = 64 * 1024; +#[derive(Copy, Clone, Debug)] +pub struct RetryCount(u32); + // === impl NewRetryPolicy === impl NewRetryPolicy { - pub fn new(metrics: metrics::HttpProfileRouteRetry) -> Self { + pub fn new(metrics: Option) -> Self { Self { metrics } } } impl retry::NewPolicy for NewRetryPolicy where - T: Param + Param, + T: Param>, { type Policy = RetryPolicy; fn new_policy(&self, target: &T) -> Option { - let route: Route = target.param(); - let labels: ProfileRouteLabels = target.param(); + let Params { + budget, + max_per_request, + profile_labels, + response_classes, + } = Param::>::param(target)?; + let metrics = self + .metrics + .as_ref() + .and_then(|metrics| Some(metrics.get_handle(profile_labels?))); Some(RetryPolicy { - metrics: self.metrics.get_handle(labels), - budget: route.retries()?.budget().clone(), - response_classes: route.response_classes().clone(), + metrics, + budget, + response_classes, + max_per_request, }) } } @@ -86,15 +107,36 @@ where Err(_) => false, Ok(rsp) => { // is the request a failure? - let is_failure = classify::Request::from(self.response_classes.clone()) + let is_failure = self + .response_classes .classify(req) .start(rsp) .eos(rsp.body().trailers()) .is_failure(); + // did the body exceed the maximum length limit? let exceeded_max_len = req.body().is_capped(); - let retryable = is_failure && !exceeded_max_len; - tracing::trace!(is_failure, exceeded_max_len, retryable); + + // was the per-request retry limit exceeded? + let exceeded_max_retries = + match (self.max_per_request, req.extensions().get::()) { + (Some(max_retries), Some(RetryCount(retries))) => { + retries >= &max_retries.get() + } + // if `max_retries_per_request` is `None`, we don't have + // a per-request retry limit. if the request's + // `RetryCount` is `None`, then it was the initial request. + _ => false, + }; + + let retryable = is_failure && !exceeded_max_len && !exceeded_max_retries; + + tracing::trace!( + is_failure, + exceeded_max_len, + exceeded_max_retries, + retryable + ); retryable } }; @@ -105,7 +147,9 @@ where } let withdrew = self.budget.withdraw().is_ok(); - self.metrics.incr_retryable(withdrew); + if let Some(ref metrics) = self.metrics { + metrics.incr_retryable(withdrew); + } if !withdrew { return None; } @@ -131,6 +175,18 @@ where clone.extensions_mut().insert(client_handle); } + // Increment the retry count, if we care about tracking retry counts. + if self.max_per_request.is_some() { + let prev = req + .extensions() + .get::() + .map(|&RetryCount(i)| i) + // If there's no `retry_count` extension, then this request is + // the first retry. + .unwrap_or(0); + clone.extensions_mut().insert(RetryCount(prev + 1)); + } + Some(clone) } } diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 0c2453f337..f3f8d7abb0 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -530,9 +530,17 @@ fn policy_routes( ref http2, .. } => { - let (routes, failure_accrual) = match version { - http::Version::Http1 => (http1.routes.clone(), http1.failure_accrual), - http::Version::H2 => (http2.routes.clone(), http2.failure_accrual), + let (routes, failure_accrual, retry_budget) = match version { + http::Version::Http1 => ( + http1.routes.clone(), + http1.failure_accrual, + http1.retry_budget.clone(), + ), + http::Version::H2 => ( + http2.routes.clone(), + http2.failure_accrual, + http2.retry_budget.clone(), + ), }; Some(http::Routes::Policy(http::policy::Params::Http( http::policy::HttpParams { @@ -541,6 +549,7 @@ fn policy_routes( backends: policy.backends.clone(), routes, failure_accrual, + retry_budget, }, ))) } @@ -550,6 +559,7 @@ fn policy_routes( policy::Protocol::Http1(policy::http::Http1 { ref routes, failure_accrual, + ref retry_budget, }) => Some(http::Routes::Policy(http::policy::Params::Http( http::policy::HttpParams { addr, @@ -557,11 +567,13 @@ fn policy_routes( backends: policy.backends.clone(), routes: routes.clone(), failure_accrual, + retry_budget: retry_budget.clone(), }, ))), policy::Protocol::Http2(policy::http::Http2 { ref routes, failure_accrual, + ref retry_budget, }) => Some(http::Routes::Policy(http::policy::Params::Http( http::policy::HttpParams { addr, @@ -569,6 +581,7 @@ fn policy_routes( backends: policy.backends.clone(), routes: routes.clone(), failure_accrual, + retry_budget: retry_budget.clone(), }, ))), policy::Protocol::Grpc(policy::grpc::Grpc { @@ -581,6 +594,9 @@ fn policy_routes( backends: policy.backends.clone(), routes: routes.clone(), failure_accrual, + // Retry configuration is not currently implemented for gRPC + // policy routes... + retry_budget: None, }, ))), _ => None, diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index 387389fbf9..e99f7fd4d2 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -219,23 +219,33 @@ impl HttpSidecar { // protocol changes but remains HTTP-ish, we propagate those // changes. If the protocol flips to an opaque protocol, we ignore // the protocol update. - let (routes, failure_accrual) = match policy.protocol { + let (routes, failure_accrual, retry_budget) = match policy.protocol { policy::Protocol::Detect { ref http1, ref http2, .. } => match version { - http::Version::Http1 => (http1.routes.clone(), http1.failure_accrual), - http::Version::H2 => (http2.routes.clone(), http2.failure_accrual), + http::Version::Http1 => ( + http1.routes.clone(), + http1.failure_accrual, + http1.retry_budget.clone(), + ), + http::Version::H2 => ( + http2.routes.clone(), + http2.failure_accrual, + http2.retry_budget.clone(), + ), }, policy::Protocol::Http1(policy::http::Http1 { ref routes, failure_accrual, - }) => (routes.clone(), failure_accrual), + ref retry_budget, + }) => (routes.clone(), failure_accrual, retry_budget.clone()), policy::Protocol::Http2(policy::http::Http2 { ref routes, failure_accrual, - }) => (routes.clone(), failure_accrual), + ref retry_budget, + }) => (routes.clone(), failure_accrual, retry_budget.clone()), policy::Protocol::Grpc(policy::grpc::Grpc { ref routes, failure_accrual, @@ -247,6 +257,7 @@ impl HttpSidecar { backends: policy.backends.clone(), routes: routes.clone(), failure_accrual, + retry_budget: None, }, ))) } @@ -265,6 +276,7 @@ impl HttpSidecar { routes, backends: policy.backends.clone(), failure_accrual, + retry_budget, }, ))) } diff --git a/linkerd/app/test/src/resolver/client_policy.rs b/linkerd/app/test/src/resolver/client_policy.rs index c8860425cf..d4d820af96 100644 --- a/linkerd/app/test/src/resolver/client_policy.rs +++ b/linkerd/app/test/src/resolver/client_policy.rs @@ -79,6 +79,7 @@ impl ClientPolicies { backend: backend.clone(), request_timeout: None, }])), + retry_policy: None, }, }], }]); @@ -88,10 +89,12 @@ impl ClientPolicies { http1: http::Http1 { routes: http_routes.clone(), failure_accrual: Default::default(), + retry_budget: None, }, http2: http::Http2 { routes: http_routes, failure_accrual: Default::default(), + retry_budget: None, }, opaque: opaq::Opaque { policy: Some(opaq::Policy { @@ -104,6 +107,7 @@ impl ClientPolicies { backend: backend.clone(), request_timeout: None, }])), + retry_policy: None, }), }, }; diff --git a/linkerd/proxy/client-policy/Cargo.toml b/linkerd/proxy/client-policy/Cargo.toml index 28b4a95bf8..8eae533553 100644 --- a/linkerd/proxy/client-policy/Cargo.toml +++ b/linkerd/proxy/client-policy/Cargo.toml @@ -29,6 +29,7 @@ linkerd-proxy-core = { path = "../core" } once_cell = { version = "1" } prost-types = { version = "0.11", optional = true } tonic = { version = "0.8", default-features = false } +tower = { version = "0.4", default-features = false, features = ["retry"] } thiserror = { version = "1", optional = true } [dev-dependencies] diff --git a/linkerd/proxy/client-policy/src/grpc.rs b/linkerd/proxy/client-policy/src/grpc.rs index 7f6a05fc2c..f547c3830b 100644 --- a/linkerd/proxy/client-policy/src/grpc.rs +++ b/linkerd/proxy/client-policy/src/grpc.rs @@ -38,6 +38,7 @@ pub fn default(distribution: crate::RouteDistribution) -> Route { distribution, failure_policy: Codes::default(), request_timeout: None, + retry_policy: None, }, }], } @@ -229,6 +230,7 @@ pub mod proto { filters, distribution, failure_policy: Codes::default(), + retry_policy: None, request_timeout, }, }) diff --git a/linkerd/proxy/client-policy/src/http.rs b/linkerd/proxy/client-policy/src/http.rs index 58664c8e64..fcfa70128c 100644 --- a/linkerd/proxy/client-policy/src/http.rs +++ b/linkerd/proxy/client-policy/src/http.rs @@ -1,4 +1,4 @@ -use crate::FailureAccrual; +use crate::{retry, FailureAccrual}; use linkerd_http_route::http; use std::{ops::RangeInclusive, sync::Arc}; @@ -15,6 +15,9 @@ pub struct Http1 { /// Configures how endpoints accrue observed failures. pub failure_accrual: FailureAccrual, + + /// Configures retry budgets. + pub retry_budget: Option, } // TODO: window sizes, etc @@ -24,6 +27,9 @@ pub struct Http2 { /// Configures how endpoints accrue observed failures. pub failure_accrual: FailureAccrual, + + /// Configures retry budgets. + pub retry_budget: Option, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -49,6 +55,7 @@ pub fn default(distribution: crate::RouteDistribution) -> Route { distribution, failure_policy: StatusRanges::default(), request_timeout: None, + retry_policy: None, }, }], } @@ -61,6 +68,7 @@ impl Default for Http1 { Self { routes: Arc::new([]), failure_accrual: Default::default(), + retry_budget: None, } } } @@ -72,6 +80,7 @@ impl Default for Http2 { Self { routes: Arc::new([]), failure_accrual: Default::default(), + retry_budget: None, } } } @@ -92,16 +101,26 @@ impl Default for StatusRanges { } } +impl FromIterator> for StatusRanges { + fn from_iter>>(iter: T) -> Self { + Self(iter.into_iter().collect()) + } +} + #[cfg(feature = "proto")] pub mod proto { use super::*; use crate::{ proto::{ BackendSet, InvalidBackend, InvalidDistribution, InvalidFailureAccrual, InvalidMeta, + InvalidRetryBudget, }, Meta, RouteBackend, RouteDistribution, }; - use linkerd2_proxy_api::outbound::{self, http_route}; + use linkerd2_proxy_api::{ + destination, + outbound::{self, http_route}, + }; use linkerd_http_route::http::{ filter::{ inject_failure::proto::InvalidFailureResponse, @@ -109,6 +128,7 @@ pub mod proto { }, r#match::{host::proto::InvalidHostMatch, proto::InvalidRouteMatch}, }; + use std::num::NonZeroU32; #[derive(Debug, thiserror::Error)] pub enum InvalidHttpRoute { @@ -135,6 +155,12 @@ pub mod proto { #[error("invalid request timeout: {0}")] Timeout(#[from] prost_types::DurationError), + + #[error("invalid retry budget: {0}")] + RetryBudget(#[from] InvalidRetryBudget), + + #[error("invalid retry policy: {0}")] + RetryPolicy(#[from] InvalidStatusRange), } #[derive(Debug, thiserror::Error)] @@ -152,6 +178,14 @@ pub mod proto { Redirect(#[from] InvalidRequestRedirect), } + #[derive(Debug, thiserror::Error)] + pub enum InvalidStatusRange { + #[error("min ({min}) greater than max ({max})")] + MinGreaterThanMax { min: u16, max: u16 }, + #[error("{which} not a u16: {value}")] + NotAU16 { which: &'static str, value: u32 }, + } + pub(crate) fn fill_route_backends(rts: &[Route], set: &mut BackendSet) { for Route { ref rules, .. } in rts { for Rule { ref policy, .. } in rules { @@ -168,9 +202,14 @@ pub mod proto { .into_iter() .map(try_route) .collect::, _>>()?; + let retry_budget = proto + .retry_budget + .map(retry::Budget::try_from) + .transpose()?; Ok(Self { routes, failure_accrual: proto.failure_accrual.try_into()?, + retry_budget, }) } } @@ -183,9 +222,14 @@ pub mod proto { .into_iter() .map(try_route) .collect::, _>>()?; + let retry_budget = proto + .retry_budget + .map(retry::Budget::try_from) + .transpose()?; Ok(Self { routes, failure_accrual: proto.failure_accrual.try_into()?, + retry_budget, }) } } @@ -223,6 +267,7 @@ pub mod proto { backends, filters, request_timeout, + retry_policy, } = proto; let matches = matches @@ -243,6 +288,8 @@ pub mod proto { .map(std::time::Duration::try_from) .transpose()?; + let retry_policy = retry_policy.map(retry::RoutePolicy::try_from).transpose()?; + Ok(Rule { matches, policy: Policy { @@ -251,6 +298,7 @@ pub mod proto { distribution, failure_policy: StatusRanges::default(), request_timeout, + retry_policy, }, }) } @@ -329,4 +377,37 @@ pub mod proto { } } } + + impl TryFrom for retry::RoutePolicy { + type Error = InvalidStatusRange; + fn try_from( + http_route::RetryPolicy { + max_per_request, + retry_statuses, + }: http_route::RetryPolicy, + ) -> Result { + let retryable = retry_statuses + .into_iter() + .map(|destination::HttpStatusRange { min, max }| { + let min = u16::try_from(min).map_err(|_| InvalidStatusRange::NotAU16 { + value: min, + which: "min", + })?; + let max = u16::try_from(max).map_err(|_| InvalidStatusRange::NotAU16 { + value: max, + which: "max", + })?; + if min > max { + return Err(InvalidStatusRange::MinGreaterThanMax { min, max }); + } + Ok(min..=max) + }) + .collect::>()?; + let max_per_request = NonZeroU32::new(max_per_request); + Ok(retry::RoutePolicy { + retryable, + max_per_request, + }) + } + } } diff --git a/linkerd/proxy/client-policy/src/lib.rs b/linkerd/proxy/client-policy/src/lib.rs index ccbfa27be1..9b16b25839 100644 --- a/linkerd/proxy/client-policy/src/lib.rs +++ b/linkerd/proxy/client-policy/src/lib.rs @@ -7,6 +7,7 @@ use std::{borrow::Cow, fmt, hash::Hash, net::SocketAddr, num::NonZeroU16, sync:: pub mod grpc; pub mod http; pub mod opaq; +pub mod retry; pub use linkerd_http_route as route; pub use linkerd_proxy_api_resolve::Metadata as EndpointMetadata; @@ -73,6 +74,9 @@ pub struct RoutePolicy { /// Configures what responses are classified as failures. pub failure_policy: F, + + /// Configures retries for this route. + pub retry_policy: Option>, } // TODO(ver) Weighted random WITHOUT availability awareness, as required by @@ -145,7 +149,6 @@ pub enum FailureAccrual { backoff: linkerd_exp_backoff::ExponentialBackoff, }, } - // === impl ClientPolicy === impl ClientPolicy { @@ -169,6 +172,7 @@ impl ClientPolicy { distribution: RouteDistribution::Empty, failure_policy: http::StatusRanges::default(), request_timeout: None, + retry_policy: None, }, }], }]) @@ -182,10 +186,12 @@ impl ClientPolicy { http1: http::Http1 { routes: HTTP_ROUTES.clone(), failure_accrual: Default::default(), + retry_budget: None, }, http2: http::Http2 { routes: HTTP_ROUTES.clone(), failure_accrual: Default::default(), + retry_budget: None, }, opaque: opaq::Opaque { // TODO(eliza): eventually, can we configure the opaque @@ -213,10 +219,12 @@ impl ClientPolicy { http1: http::Http1 { routes: NO_HTTP_ROUTES.clone(), failure_accrual: Default::default(), + retry_budget: None, }, http2: http::Http2 { routes: NO_HTTP_ROUTES.clone(), failure_accrual: Default::default(), + retry_budget: None, }, opaque: opaq::Opaque { // TODO(eliza): eventually, can we configure the opaque @@ -327,6 +335,7 @@ impl Default for FailureAccrual { #[cfg(feature = "proto")] pub mod proto { use super::*; + pub use crate::retry::proto::InvalidRetryBudget; use linkerd2_proxy_api::{ meta, outbound::{self, backend::BalanceP2c}, diff --git a/linkerd/proxy/client-policy/src/opaq.rs b/linkerd/proxy/client-policy/src/opaq.rs index f708b8a2c0..4ae8e43cf4 100644 --- a/linkerd/proxy/client-policy/src/opaq.rs +++ b/linkerd/proxy/client-policy/src/opaq.rs @@ -129,6 +129,8 @@ pub(crate) mod proto { distribution, // Request timeouts are ignored on opaque routes. request_timeout: None, + // Retry policies are ignored on opaque routes. + retry_policy: None, }) } diff --git a/linkerd/proxy/client-policy/src/retry.rs b/linkerd/proxy/client-policy/src/retry.rs new file mode 100644 index 0000000000..c409a4357e --- /dev/null +++ b/linkerd/proxy/client-policy/src/retry.rs @@ -0,0 +1,96 @@ +use std::{num::NonZeroU32, sync::Arc}; + +#[derive(Clone, Debug)] +pub struct Budget(Arc); + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct RoutePolicy { + /// Configures how responses are classified as retryable or non-retryable. + pub retryable: F, + /// Configures a per-request retry limit. + pub max_per_request: Option, +} + +// === impl Budget === + +impl PartialEq for Budget { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.0, &other.0) + } +} + +// It's okay for `Budget` to be `Eq` because we assert that the +// ratio field (a float) is finite when constructing the backoff. +impl Eq for Budget {} + +impl std::hash::Hash for Budget { + fn hash(&self, state: &mut H) { + state.write_usize(Arc::as_ref(&self.0) as *const _ as usize); + } +} + +impl From for Arc { + fn from(Budget(this): Budget) -> Self { + this + } +} + +#[cfg(feature = "proto")] +pub mod proto { + use super::*; + use linkerd2_proxy_api::destination; + use std::{ops::RangeInclusive, time::Duration}; + + #[derive(Debug, thiserror::Error)] + pub enum InvalidRetryBudget { + #[error("missing `ttl` field")] + NoTtl, + + #[error("invalid `ttl` field: {0}")] + BadDuration(#[from] prost_types::DurationError), + + #[error("retry ratio must be finite")] + InfiniteRatio, + + #[error("`ttl` must be within {VALID_TTLS:?} (was {0:?})")] + TtlOutOfRange(Duration), + + #[error("`retry_ratio` must be within {VALID_RATIOS:?} (was {0})")] + PercentOutOfRange(f32), + } + + const VALID_RATIOS: RangeInclusive = 0.0..=1000.0; + const VALID_TTLS: RangeInclusive = Duration::from_secs(1)..=Duration::from_secs(60); + + impl TryFrom for Budget { + type Error = InvalidRetryBudget; + fn try_from( + destination::RetryBudget { + ttl, + retry_ratio, + min_retries_per_second, + }: destination::RetryBudget, + ) -> Result { + let ttl = ttl.ok_or(InvalidRetryBudget::NoTtl)?; + let ttl = ttl.try_into()?; + + if retry_ratio.is_infinite() { + return Err(InvalidRetryBudget::InfiniteRatio); + } + + if !VALID_RATIOS.contains(&retry_ratio) { + return Err(InvalidRetryBudget::PercentOutOfRange(retry_ratio)); + } + + if !VALID_TTLS.contains(&ttl) { + return Err(InvalidRetryBudget::TtlOutOfRange(ttl)); + } + + Ok(Self(Arc::new(tower::retry::budget::Budget::new( + ttl, + min_retries_per_second, + retry_ratio, + )))) + } + } +}