Skip to content

Commit

Permalink
feat: add basic outbound protocol detection
Browse files Browse the repository at this point in the history
This looks for the `appProtocol` field on the ports in a service, and limits the set of outbound routes to that protocol if found.

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Feb 11, 2025
1 parent f50a7a7 commit c519ced
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 60 deletions.
27 changes: 26 additions & 1 deletion policy-controller/core/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ahash::AHashMap as HashMap;
use anyhow::Result;
use chrono::{offset::Utc, DateTime};
use futures::prelude::*;
use std::{net::IpAddr, num::NonZeroU16, pin::Pin, time};
use std::{net::IpAddr, num::NonZeroU16, pin::Pin, sync::Arc, time};

mod policy;
mod target;
Expand Down Expand Up @@ -42,6 +42,31 @@ pub type GrpcRoute = OutboundRoute<GrpcRouteMatch, GrpcRetryCondition>;

pub type RouteSet<T> = HashMap<GroupKindNamespaceName, T>;

#[derive(Debug, Clone, PartialEq)]
pub enum AppProtocol {
Http1,
Http2,
Grpc,
Tls,
Tcp,
Opaque,
Unknown(Arc<str>),
}

impl From<&str> for AppProtocol {
fn from(value: &str) -> Self {
match value.to_ascii_lowercase().as_str() {
"http" => AppProtocol::Http1,
"kubernetes.io/h2c" => AppProtocol::Http2,
"https" | "linkerd.io/tls" => AppProtocol::Tls,
"linkerd.io/grpc" => AppProtocol::Grpc,
"linkerd.io/tcp" => AppProtocol::Tcp,
"linkerd.io/opaque" => AppProtocol::Opaque,
_ => AppProtocol::Unknown(Arc::from(value.to_string().into_boxed_str())),
}
}
}

#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum TrafficPolicy {
Allow,
Expand Down
5 changes: 3 additions & 2 deletions policy-controller/core/src/outbound/policy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
FailureAccrual, GrpcRetryCondition, GrpcRoute, HttpRetryCondition, HttpRoute, RouteRetry,
RouteSet, RouteTimeouts, TcpRoute, TlsRoute, TrafficPolicy,
AppProtocol, FailureAccrual, GrpcRetryCondition, GrpcRoute, HttpRetryCondition, HttpRoute,
RouteRetry, RouteSet, RouteTimeouts, TcpRoute, TlsRoute, TrafficPolicy,
};

use std::num::NonZeroU16;
Expand Down Expand Up @@ -30,6 +30,7 @@ pub struct OutboundPolicy {
pub tcp_routes: RouteSet<TcpRoute>,
pub port: NonZeroU16,
pub opaque: bool,
pub app_protocol: Option<AppProtocol>,
pub accrual: Option<FailureAccrual>,
pub http_retry: Option<RouteRetry<HttpRetryCondition>>,
pub grpc_retry: Option<RouteRetry<GrpcRetryCondition>>,
Expand Down
166 changes: 117 additions & 49 deletions policy-controller/grpc/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ use linkerd2_proxy_api::{
};
use linkerd_policy_controller_core::{
outbound::{
DiscoverOutboundPolicy, ExternalPolicyStream, Kind, OutboundDiscoverTarget, OutboundPolicy,
OutboundPolicyStream, ParentInfo, ResourceTarget, Route, WeightedEgressNetwork,
WeightedService,
AppProtocol, DiscoverOutboundPolicy, ExternalPolicyStream, Kind, OutboundDiscoverTarget,
OutboundPolicy, OutboundPolicyStream, ParentInfo, ResourceTarget, Route,
WeightedEgressNetwork, WeightedService,
},
routes::GroupKindNamespaceName,
};
use std::{net::SocketAddr, num::NonZeroU16, str::FromStr, sync::Arc, time};
use tracing::info;

mod grpc;
mod http;
Expand Down Expand Up @@ -372,49 +373,51 @@ fn to_proto(
) -> outbound::OutboundPolicy {
let backend: outbound::Backend = default_backend(&policy, original_dst);

let kind = if policy.opaque {
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
})
} else {
let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
kind: Some(match accrual {
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
kind: Some(match accrual {
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
max_failures,
backoff,
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
outbound::failure_accrual::ConsecutiveFailures {
max_failures,
backoff,
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
outbound::failure_accrual::ConsecutiveFailures {
max_failures,
backoff: Some(outbound::ExponentialBackoff {
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
jitter_ratio: backoff.jitter,
}),
},
),
}),
});
backoff: Some(outbound::ExponentialBackoff {
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
jitter_ratio: backoff.jitter,
}),
},
),
}),
});

let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();
let mut tls_routes = policy.tls_routes.clone().into_iter().collect::<Vec<_>>();
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();
let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();
let mut tls_routes = policy.tls_routes.clone().into_iter().collect::<Vec<_>>();
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();

if !grpc_routes.is_empty() {
grpc_routes.sort_by(timestamp_then_name);
grpc::protocol(
let kind = match (policy.opaque, &policy.app_protocol) {
(true, _) | (_, Some(AppProtocol::Opaque)) => {
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
})
}
(_, Some(AppProtocol::Http1)) => {
http_routes.sort_by(timestamp_then_name);
http::http1_only_protocol(
backend,
grpc_routes.into_iter(),
http_routes.into_iter(),
accrual,
policy.grpc_retry.clone(),
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
} else if !http_routes.is_empty() {
}
(_, Some(AppProtocol::Http2)) => {
http_routes.sort_by(timestamp_then_name);
http::protocol(
http::http2_only_protocol(
backend,
http_routes.into_iter(),
accrual,
Expand All @@ -424,35 +427,94 @@ fn to_proto(
&policy.parent_info,
original_dst,
)
} else if !tls_routes.is_empty() {
tls_routes.sort_by(timestamp_then_name);
tls::protocol(
backend,
tls_routes.into_iter(),
&policy.parent_info,
original_dst,
)
} else if !tcp_routes.is_empty() {
}
(_, Some(AppProtocol::Tcp)) => {
tcp_routes.sort_by(timestamp_then_name);
tcp::protocol(
backend,
tcp_routes.into_iter(),
&policy.parent_info,
original_dst,
)
} else {
http_routes.sort_by(timestamp_then_name);
http::protocol(
}
(_, Some(AppProtocol::Grpc)) => {
let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
grpc_routes.sort_by(timestamp_then_name);
grpc::protocol(
backend,
http_routes.into_iter(),
grpc_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.grpc_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
}
(_, Some(AppProtocol::Tls)) => {
tls_routes.sort_by(timestamp_then_name);
tls::protocol(
backend,
tls_routes.into_iter(),
&policy.parent_info,
original_dst,
)
}
_ => {
if !grpc_routes.is_empty() {
grpc_routes.sort_by(timestamp_then_name);
grpc::protocol(
backend,
grpc_routes.into_iter(),
accrual,
policy.grpc_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
} else if !http_routes.is_empty() {
http_routes.sort_by(timestamp_then_name);
http::protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
} else if !tls_routes.is_empty() {
tls_routes.sort_by(timestamp_then_name);
tls::protocol(
backend,
tls_routes.into_iter(),
&policy.parent_info,
original_dst,
)
} else if !tcp_routes.is_empty() {
tcp_routes.sort_by(timestamp_then_name);
tcp::protocol(
backend,
tcp_routes.into_iter(),
&policy.parent_info,
original_dst,
)
} else {
http_routes.sort_by(timestamp_then_name);
http::protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
}
}
};

let (parent_group, parent_kind, namespace, name) = match policy.parent_info {
Expand All @@ -475,6 +537,12 @@ fn to_proto(
})),
};

info!(
?metadata,
?policy.app_protocol,
"created outbound policy"
);

outbound::OutboundPolicy {
metadata: Some(metadata),
protocol: Some(outbound::ProxyProtocol { kind: Some(kind) }),
Expand Down
104 changes: 104 additions & 0 deletions policy-controller/grpc/src/outbound/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,110 @@ pub(crate) fn protocol(
})
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn http1_only_protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> outbound::proxy_protocol::Kind {
let mut routes = routes
.map(|(gknn, route)| {
convert_outbound_route(
gknn,
route,
default_backend.clone(),
service_retry.clone(),
service_timeouts.clone(),
allow_l5d_request_headers,
parent_info,
original_dst,
)
})
.collect::<Vec<_>>();

match parent_info {
ParentInfo::Service { .. } => {
if routes.is_empty() {
routes.push(default_outbound_service_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
));
}
}
ParentInfo::EgressNetwork { traffic_policy, .. } => {
routes.push(default_outbound_egress_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
traffic_policy,
));
}
}

outbound::proxy_protocol::Kind::Http1(outbound::proxy_protocol::Http1 {
routes: routes.clone(),
failure_accrual: accrual.clone(),
})
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn http2_only_protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> outbound::proxy_protocol::Kind {
let mut routes = routes
.map(|(gknn, route)| {
convert_outbound_route(
gknn,
route,
default_backend.clone(),
service_retry.clone(),
service_timeouts.clone(),
allow_l5d_request_headers,
parent_info,
original_dst,
)
})
.collect::<Vec<_>>();

match parent_info {
ParentInfo::Service { .. } => {
if routes.is_empty() {
routes.push(default_outbound_service_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
));
}
}
ParentInfo::EgressNetwork { traffic_policy, .. } => {
routes.push(default_outbound_egress_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
traffic_policy,
));
}
}

outbound::proxy_protocol::Kind::Http2(outbound::proxy_protocol::Http2 {
routes: routes.clone(),
failure_accrual: accrual.clone(),
})
}

#[allow(clippy::too_many_arguments)]
fn convert_outbound_route(
gknn: GroupKindNamespaceName,
Expand Down
Loading

0 comments on commit c519ced

Please sign in to comment.