diff --git a/policy-controller/core/src/outbound.rs b/policy-controller/core/src/outbound.rs index dcde37fbf3ee4..ca68fa40f1300 100644 --- a/policy-controller/core/src/outbound.rs +++ b/policy-controller/core/src/outbound.rs @@ -6,7 +6,7 @@ use ahash::AHashMap as HashMap; use anyhow::Result; use chrono::{offset::Utc, DateTime}; use futures::prelude::*; -use std::{net::IpAddr, num::NonZeroU16, pin::Pin, time}; +use std::{net::IpAddr, num::NonZeroU16, pin::Pin, sync::Arc, time}; mod policy; mod target; @@ -42,6 +42,31 @@ pub type GrpcRoute = OutboundRoute; pub type RouteSet = HashMap; +#[derive(Debug, Clone, PartialEq)] +pub enum AppProtocol { + Http1, + Http2, + Grpc, + Tls, + Tcp, + Opaque, + Unknown(Arc), +} + +impl From<&str> for AppProtocol { + fn from(value: &str) -> Self { + match value.to_ascii_lowercase().as_str() { + "http" => AppProtocol::Http1, + "kubernetes.io/h2c" => AppProtocol::Http2, + "https" | "linkerd.io/tls" => AppProtocol::Tls, + "linkerd.io/grpc" => AppProtocol::Grpc, + "linkerd.io/tcp" => AppProtocol::Tcp, + "linkerd.io/opaque" => AppProtocol::Opaque, + _ => AppProtocol::Unknown(Arc::from(value.to_string().into_boxed_str())), + } + } +} + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub enum TrafficPolicy { Allow, diff --git a/policy-controller/core/src/outbound/policy.rs b/policy-controller/core/src/outbound/policy.rs index 37c4cbc9e7f6d..35f9e7a6fd0f9 100644 --- a/policy-controller/core/src/outbound/policy.rs +++ b/policy-controller/core/src/outbound/policy.rs @@ -1,6 +1,6 @@ use super::{ - FailureAccrual, GrpcRetryCondition, GrpcRoute, HttpRetryCondition, HttpRoute, RouteRetry, - RouteSet, RouteTimeouts, TcpRoute, TlsRoute, TrafficPolicy, + AppProtocol, FailureAccrual, GrpcRetryCondition, GrpcRoute, HttpRetryCondition, HttpRoute, + RouteRetry, RouteSet, RouteTimeouts, TcpRoute, TlsRoute, TrafficPolicy, }; use std::num::NonZeroU16; @@ -30,6 +30,7 @@ pub struct OutboundPolicy { pub tcp_routes: RouteSet, pub port: NonZeroU16, pub opaque: bool, + pub app_protocol: Option, pub accrual: Option, pub http_retry: Option>, pub grpc_retry: Option>, diff --git a/policy-controller/grpc/src/outbound.rs b/policy-controller/grpc/src/outbound.rs index 404856e20c9e0..2b411afb70c07 100644 --- a/policy-controller/grpc/src/outbound.rs +++ b/policy-controller/grpc/src/outbound.rs @@ -13,13 +13,14 @@ use linkerd2_proxy_api::{ }; use linkerd_policy_controller_core::{ outbound::{ - DiscoverOutboundPolicy, ExternalPolicyStream, Kind, OutboundDiscoverTarget, OutboundPolicy, - OutboundPolicyStream, ParentInfo, ResourceTarget, Route, WeightedEgressNetwork, - WeightedService, + AppProtocol, DiscoverOutboundPolicy, ExternalPolicyStream, Kind, OutboundDiscoverTarget, + OutboundPolicy, OutboundPolicyStream, ParentInfo, ResourceTarget, Route, + WeightedEgressNetwork, WeightedService, }, routes::GroupKindNamespaceName, }; use std::{net::SocketAddr, num::NonZeroU16, str::FromStr, sync::Arc, time}; +use tracing::info; mod grpc; mod http; @@ -372,49 +373,51 @@ fn to_proto( ) -> outbound::OutboundPolicy { let backend: outbound::Backend = default_backend(&policy, original_dst); - let kind = if policy.opaque { - outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque { - routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)], - }) - } else { - let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual { - kind: Some(match accrual { - linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive { + let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual { + kind: Some(match accrual { + linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive { + max_failures, + backoff, + } => outbound::failure_accrual::Kind::ConsecutiveFailures( + outbound::failure_accrual::ConsecutiveFailures { max_failures, - backoff, - } => outbound::failure_accrual::Kind::ConsecutiveFailures( - outbound::failure_accrual::ConsecutiveFailures { - max_failures, - backoff: Some(outbound::ExponentialBackoff { - min_backoff: convert_duration("min_backoff", backoff.min_penalty), - max_backoff: convert_duration("max_backoff", backoff.max_penalty), - jitter_ratio: backoff.jitter, - }), - }, - ), - }), - }); + backoff: Some(outbound::ExponentialBackoff { + min_backoff: convert_duration("min_backoff", backoff.min_penalty), + max_backoff: convert_duration("max_backoff", backoff.max_penalty), + jitter_ratio: backoff.jitter, + }), + }, + ), + }), + }); - let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::>(); - let mut http_routes = policy.http_routes.clone().into_iter().collect::>(); - let mut tls_routes = policy.tls_routes.clone().into_iter().collect::>(); - let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::>(); + let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::>(); + let mut http_routes = policy.http_routes.clone().into_iter().collect::>(); + let mut tls_routes = policy.tls_routes.clone().into_iter().collect::>(); + let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::>(); - if !grpc_routes.is_empty() { - grpc_routes.sort_by(timestamp_then_name); - grpc::protocol( + let kind = match (policy.opaque, &policy.app_protocol) { + (true, _) | (_, Some(AppProtocol::Opaque)) => { + outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque { + routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)], + }) + } + (_, Some(AppProtocol::Http1)) => { + http_routes.sort_by(timestamp_then_name); + http::http1_only_protocol( backend, - grpc_routes.into_iter(), + http_routes.into_iter(), accrual, - policy.grpc_retry.clone(), + policy.http_retry.clone(), policy.timeouts.clone(), allow_l5d_request_headers, &policy.parent_info, original_dst, ) - } else if !http_routes.is_empty() { + } + (_, Some(AppProtocol::Http2)) => { http_routes.sort_by(timestamp_then_name); - http::protocol( + http::http2_only_protocol( backend, http_routes.into_iter(), accrual, @@ -424,15 +427,8 @@ fn to_proto( &policy.parent_info, original_dst, ) - } else if !tls_routes.is_empty() { - tls_routes.sort_by(timestamp_then_name); - tls::protocol( - backend, - tls_routes.into_iter(), - &policy.parent_info, - original_dst, - ) - } else if !tcp_routes.is_empty() { + } + (_, Some(AppProtocol::Tcp)) => { tcp_routes.sort_by(timestamp_then_name); tcp::protocol( backend, @@ -440,19 +436,85 @@ fn to_proto( &policy.parent_info, original_dst, ) - } else { - http_routes.sort_by(timestamp_then_name); - http::protocol( + } + (_, Some(AppProtocol::Grpc)) => { + let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::>(); + grpc_routes.sort_by(timestamp_then_name); + grpc::protocol( backend, - http_routes.into_iter(), + grpc_routes.into_iter(), accrual, - policy.http_retry.clone(), + policy.grpc_retry.clone(), policy.timeouts.clone(), allow_l5d_request_headers, &policy.parent_info, original_dst, ) } + (_, Some(AppProtocol::Tls)) => { + tls_routes.sort_by(timestamp_then_name); + tls::protocol( + backend, + tls_routes.into_iter(), + &policy.parent_info, + original_dst, + ) + } + _ => { + if !grpc_routes.is_empty() { + grpc_routes.sort_by(timestamp_then_name); + grpc::protocol( + backend, + grpc_routes.into_iter(), + accrual, + policy.grpc_retry.clone(), + policy.timeouts.clone(), + allow_l5d_request_headers, + &policy.parent_info, + original_dst, + ) + } else if !http_routes.is_empty() { + http_routes.sort_by(timestamp_then_name); + http::protocol( + backend, + http_routes.into_iter(), + accrual, + policy.http_retry.clone(), + policy.timeouts.clone(), + allow_l5d_request_headers, + &policy.parent_info, + original_dst, + ) + } else if !tls_routes.is_empty() { + tls_routes.sort_by(timestamp_then_name); + tls::protocol( + backend, + tls_routes.into_iter(), + &policy.parent_info, + original_dst, + ) + } else if !tcp_routes.is_empty() { + tcp_routes.sort_by(timestamp_then_name); + tcp::protocol( + backend, + tcp_routes.into_iter(), + &policy.parent_info, + original_dst, + ) + } else { + http_routes.sort_by(timestamp_then_name); + http::protocol( + backend, + http_routes.into_iter(), + accrual, + policy.http_retry.clone(), + policy.timeouts.clone(), + allow_l5d_request_headers, + &policy.parent_info, + original_dst, + ) + } + } }; let (parent_group, parent_kind, namespace, name) = match policy.parent_info { @@ -475,6 +537,12 @@ fn to_proto( })), }; + info!( + ?metadata, + ?policy.app_protocol, + "created outbound policy" + ); + outbound::OutboundPolicy { metadata: Some(metadata), protocol: Some(outbound::ProxyProtocol { kind: Some(kind) }), diff --git a/policy-controller/grpc/src/outbound/http.rs b/policy-controller/grpc/src/outbound/http.rs index 8f8e9d63d48dc..e35fa9e236d58 100644 --- a/policy-controller/grpc/src/outbound/http.rs +++ b/policy-controller/grpc/src/outbound/http.rs @@ -84,6 +84,110 @@ pub(crate) fn protocol( }) } +#[allow(clippy::too_many_arguments)] +pub(crate) fn http1_only_protocol( + default_backend: outbound::Backend, + routes: impl Iterator, + accrual: Option, + service_retry: Option>, + service_timeouts: RouteTimeouts, + allow_l5d_request_headers: bool, + parent_info: &ParentInfo, + original_dst: Option, +) -> outbound::proxy_protocol::Kind { + let mut routes = routes + .map(|(gknn, route)| { + convert_outbound_route( + gknn, + route, + default_backend.clone(), + service_retry.clone(), + service_timeouts.clone(), + allow_l5d_request_headers, + parent_info, + original_dst, + ) + }) + .collect::>(); + + match parent_info { + ParentInfo::Service { .. } => { + if routes.is_empty() { + routes.push(default_outbound_service_route( + default_backend, + service_retry.clone(), + service_timeouts.clone(), + )); + } + } + ParentInfo::EgressNetwork { traffic_policy, .. } => { + routes.push(default_outbound_egress_route( + default_backend, + service_retry.clone(), + service_timeouts.clone(), + traffic_policy, + )); + } + } + + outbound::proxy_protocol::Kind::Http1(outbound::proxy_protocol::Http1 { + routes: routes.clone(), + failure_accrual: accrual.clone(), + }) +} + +#[allow(clippy::too_many_arguments)] +pub(crate) fn http2_only_protocol( + default_backend: outbound::Backend, + routes: impl Iterator, + accrual: Option, + service_retry: Option>, + service_timeouts: RouteTimeouts, + allow_l5d_request_headers: bool, + parent_info: &ParentInfo, + original_dst: Option, +) -> outbound::proxy_protocol::Kind { + let mut routes = routes + .map(|(gknn, route)| { + convert_outbound_route( + gknn, + route, + default_backend.clone(), + service_retry.clone(), + service_timeouts.clone(), + allow_l5d_request_headers, + parent_info, + original_dst, + ) + }) + .collect::>(); + + match parent_info { + ParentInfo::Service { .. } => { + if routes.is_empty() { + routes.push(default_outbound_service_route( + default_backend, + service_retry.clone(), + service_timeouts.clone(), + )); + } + } + ParentInfo::EgressNetwork { traffic_policy, .. } => { + routes.push(default_outbound_egress_route( + default_backend, + service_retry.clone(), + service_timeouts.clone(), + traffic_policy, + )); + } + } + + outbound::proxy_protocol::Kind::Http2(outbound::proxy_protocol::Http2 { + routes: routes.clone(), + failure_accrual: accrual.clone(), + }) +} + #[allow(clippy::too_many_arguments)] fn convert_outbound_route( gknn: GroupKindNamespaceName, diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index eb1281be5ba3b..4dc9934138590 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -1,5 +1,5 @@ use crate::{ - ports::{ports_annotation, PortSet}, + ports::{ports_annotation, PortMap, PortSet}, routes::{ExplicitGKN, HttpRouteResource, ImpliedGKN}, ClusterInfo, }; @@ -9,9 +9,9 @@ use egress_network::EgressNetwork; use kube::Resource; use linkerd_policy_controller_core::{ outbound::{ - Backend, Backoff, FailureAccrual, GrpcRetryCondition, GrpcRoute, HttpRetryCondition, - HttpRoute, Kind, OutboundPolicy, ParentInfo, ResourceTarget, RouteRetry, RouteSet, - RouteTimeouts, TcpRoute, TlsRoute, TrafficPolicy, + AppProtocol, Backend, Backoff, FailureAccrual, GrpcRetryCondition, GrpcRoute, + HttpRetryCondition, HttpRoute, Kind, OutboundPolicy, ParentInfo, ResourceTarget, + RouteRetry, RouteSet, RouteTimeouts, TcpRoute, TlsRoute, TrafficPolicy, }, routes::GroupKindNamespaceName, }; @@ -92,6 +92,7 @@ struct Namespace { #[derive(Debug)] struct ResourceInfo { opaque_ports: PortSet, + app_protocols: PortMap, accrual: Option, http_retry: Option>, grpc_retry: Option>, @@ -111,6 +112,7 @@ struct ResourceRoutes { parent_info: ParentInfo, namespace: Arc, port: NonZeroU16, + app_protocol: Option, watches_by_ns: HashMap, opaque: bool, accrual: Option, @@ -123,6 +125,7 @@ struct ResourceRoutes { struct RoutesWatch { parent_info: ParentInfo, opaque: bool, + app_protocol: Option, accrual: Option, http_retry: Option>, grpc_retry: Option>, @@ -218,9 +221,36 @@ impl kubert::index::IndexNamespacedResource for Index { let accrual = parse_accrual_config(service.annotations()) .map_err(|error| tracing::warn!(%error, service=name, namespace=ns, "Failed to parse accrual config")) .unwrap_or_default(); - let opaque_ports = - ports_annotation(service.annotations(), "config.linkerd.io/opaque-ports") - .unwrap_or_else(|| self.namespaces.cluster_info.default_opaque_ports.clone()); + let app_protocols = service + .spec + .as_ref() + .and_then(|spec| { + spec.ports.as_ref().map(|ports| { + ports + .iter() + .filter_map(|port| { + port.app_protocol.as_ref().and_then(|p| { + Some(( + NonZeroU16::new(port.port as u16)?, + AppProtocol::from(p.as_str()), + )) + }) + }) + .collect::>() + }) + }) + .unwrap_or_default(); + let opaque_ports = { + let mut opaque_ports = + ports_annotation(service.annotations(), "config.linkerd.io/opaque-ports") + .unwrap_or_else(|| self.namespaces.cluster_info.default_opaque_ports.clone()); + for (&port, app_protocol) in &app_protocols { + if app_protocol == &AppProtocol::Opaque { + opaque_ports.insert(port); + } + } + opaque_ports + }; let timeouts = parse_timeouts(service.annotations()) .map_err(|error| tracing::warn!(%error, service=name, namespace=ns, "Failed to parse timeouts")) @@ -260,6 +290,7 @@ impl kubert::index::IndexNamespacedResource for Index { let service_info = ResourceInfo { opaque_ports, + app_protocols, accrual, http_retry, grpc_retry, @@ -361,6 +392,7 @@ impl kubert::index::IndexNamespacedResource for grpc_retry, timeouts, traffic_policy, + app_protocols: PortMap::default(), }; let ns = Arc::new(ns); @@ -1151,9 +1183,11 @@ impl Namespace { } let opaque = resource.opaque_ports.contains(&resource_port.port); + let app_protocol = resource.app_protocols.get(&resource_port.port).cloned(); resource_routes.update_resource( opaque, + app_protocol, resource.accrual, resource.http_retry.clone(), resource.grpc_retry.clone(), @@ -1239,12 +1273,14 @@ impl Namespace { } }; let mut opaque = false; + let mut app_protocol = None; let mut accrual = None; let mut http_retry = None; let mut grpc_retry = None; let mut timeouts = Default::default(); if let Some(resource) = resource_info.get(&resource_ref) { opaque = resource.opaque_ports.contains(&rp.port); + app_protocol = resource.app_protocols.get(&rp.port).cloned(); accrual = resource.accrual; http_retry = resource.http_retry.clone(); grpc_retry = resource.grpc_retry.clone(); @@ -1292,6 +1328,7 @@ impl Namespace { port: rp.port, namespace: self.namespace.clone(), watches_by_ns: Default::default(), + app_protocol, }; // Producer routes are routes in the same namespace as @@ -1578,6 +1615,7 @@ impl ResourceRoutes { parent_info: self.parent_info.clone(), port: self.port, opaque: self.opaque, + app_protocol: self.app_protocol.clone(), accrual: self.accrual, http_retry: self.http_retry.clone(), grpc_retry: self.grpc_retry.clone(), @@ -1596,6 +1634,7 @@ impl ResourceRoutes { tcp_routes, watch: sender, opaque: self.opaque, + app_protocol: self.app_protocol.clone(), accrual: self.accrual, http_retry: self.http_retry.clone(), grpc_retry: self.grpc_retry.clone(), @@ -1692,9 +1731,11 @@ impl ResourceRoutes { } } + #[allow(clippy::too_many_arguments)] fn update_resource( &mut self, opaque: bool, + app_protocol: Option, accrual: Option, http_retry: Option>, grpc_retry: Option>, @@ -1702,6 +1743,7 @@ impl ResourceRoutes { traffic_policy: Option, ) { self.opaque = opaque; + self.app_protocol = app_protocol.clone(); self.accrual = accrual; self.http_retry = http_retry.clone(); self.grpc_retry = grpc_retry.clone(); @@ -1709,6 +1751,7 @@ impl ResourceRoutes { self.update_traffic_policy(traffic_policy); for watch in self.watches_by_ns.values_mut() { watch.opaque = opaque; + watch.app_protocol = app_protocol.clone(); watch.accrual = accrual; watch.http_retry = http_retry.clone(); watch.grpc_retry = grpc_retry.clone(); @@ -1804,6 +1847,11 @@ impl RoutesWatch { modified = true; } + if self.app_protocol != policy.app_protocol { + policy.app_protocol = self.app_protocol.clone(); + modified = true; + } + if self.accrual != policy.accrual { policy.accrual = self.accrual; modified = true; diff --git a/policy-controller/k8s/index/src/outbound/index/http.rs b/policy-controller/k8s/index/src/outbound/index/http.rs index b394e7b413246..030ae82f1cb92 100644 --- a/policy-controller/k8s/index/src/outbound/index/http.rs +++ b/policy-controller/k8s/index/src/outbound/index/http.rs @@ -196,7 +196,7 @@ pub(super) fn convert_backend>( cluster: &ClusterInfo, resources: &HashMap, ) -> Option { - let backend = backend.into(); + let backend: gateway::HttpBackendRef = backend.into(); let filters = backend.filters; let backend = backend.backend_ref?;