Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Feature/double hbone #1429

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
Stevenjin8 committed Mar 7, 2025

Verified

This commit was signed with the committer’s verified signature.
Stevenjin8 Steven Jin
commit b2a21bd2150cb607770e9de5cf63cf1679716f1f
2 changes: 1 addition & 1 deletion src/proxy/h2/client.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::oneshot;
use tokio::sync::watch::Receiver;
use tracing::{debug, error, trace, warn, Instrument};
use tracing::{Instrument, debug, error, trace, warn};

#[derive(Debug, Clone)]
// H2ConnectClient is a wrapper abstracting h2
2 changes: 1 addition & 1 deletion src/proxy/metrics.rs
Original file line number Diff line number Diff line change
@@ -458,7 +458,7 @@ impl ConnectionResult {
src.identity = tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned),

dst.addr = %dst.0,
dst.hbone_addr = hbone_target.as_ref(),
dst.hbone_addr = hbone_target.as_ref().map(display),
dst.service = tl.destination_service.to_value(),
dst.workload = dst.1.as_deref().map(to_value),
dst.namespace = tl.destination_workload_namespace.to_value(),
9 changes: 5 additions & 4 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
@@ -198,7 +198,6 @@ impl OutboundConnection {
metrics,
));


let res = match (
req.protocol,
req.actual_destination_workload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The presence of a network gateway does not imply we need to reach it through the network gateway. For example, a local workload in a cluster will have a network gateway defined.

The WDS protocol is not contextual. A workload w/ a network gateway does not mean "To reach this workload, always go through this network gateway [and the control plane will dynamically set or unset this for you]", it means "if you need to traverse a network boundary, here is how to do it".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, I wonder if the code might be simplified to have a new Protocol::DoubleHbone (maybe we make a new enum like RequestProtocol though)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

@@ -229,7 +228,7 @@ impl OutboundConnection {
connection_stats: &ConnectionResult,
) -> Result<(), Error> {
// Create the outer HBONE stream
let upgraded = Box::pin(self.send_hbone_request(remote_addr, &req)).await?;
let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?;
// Wrap upgraded to implement tokio's Async{Write,Read}
let upgraded = TokioH2Stream::new(upgraded);

@@ -703,8 +702,10 @@ mod tests {
Some(ExpectedRequest {
protocol: r.protocol,
hbone_destination: &r
.hbone_target_destination.as_ref()
.unwrap_or(&String::new()),
.hbone_target_destination
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default(),
destination: &r.actual_destination.to_string(),
})
);
57 changes: 31 additions & 26 deletions src/state.rs
Original file line number Diff line number Diff line change
@@ -665,7 +665,11 @@ impl DemandProxyState {
) -> Result<IpAddr, Error> {
let workload_uid = workload.uid.clone();
// FIXME(stevenjin8) Throw a error if this a network gateway without a hostname?
let hostname = match workload.network_gateway.as_ref().map(|g| g.destination.clone()) {
let hostname = match workload
.network_gateway
.as_ref()
.map(|g| g.destination.clone())
{
Some(Destination::Hostname(hostname)) => hostname.hostname.clone(),
_ => workload.hostname.clone(),
};
@@ -802,31 +806,32 @@ impl DemandProxyState {
};
let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref()));
let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families);
let (selected_workload_ip, proxy_protocol_port_override) =
if let Some(network_gateway) = wl.network_gateway.as_ref() {
match &network_gateway.destination {
Destination::Address(network_address) => (
network_address.address,
Some(network_gateway.hbone_mtls_port),
),
Destination::Hostname(_) => (
self.resolve_workload_address(&wl, source_workload, original_target_address)
.await?,
Some(network_gateway.hbone_mtls_port),
),
}
} else {
(
self.pick_workload_destination_or_resolve(
&wl,
source_workload,
original_target_address,
ip_family_restriction,
)
.await?,
None,
) // if we can't load balance just return the error
};
let (selected_workload_ip, proxy_protocol_port_override) = if let Some(network_gateway) =
wl.network_gateway.as_ref()
{
match &network_gateway.destination {
Destination::Address(network_address) => (
network_address.address,
Some(network_gateway.hbone_mtls_port),
),
Destination::Hostname(_) => (
self.resolve_workload_address(&wl, source_workload, original_target_address)
.await?,
Some(network_gateway.hbone_mtls_port),
),
}
} else {
(
self.pick_workload_destination_or_resolve(
&wl,
source_workload,
original_target_address,
ip_family_restriction,
)
.await?,
None,
) // if we can't load balance just return the error
};

let res = Upstream {
workload: wl,
7 changes: 6 additions & 1 deletion tests/namespaced.rs
Original file line number Diff line number Diff line change
@@ -402,7 +402,12 @@ mod namespaced {
.mutate_workload(|w| w.hostname = "waypoint.example.com".into())
.register()
.await?;
run_hbone_server(waypoint, "waypoint", tcp::Mode::ReadWrite, WAYPOINT_MESSAGE.into())?;
run_hbone_server(
waypoint,
"waypoint",
tcp::Mode::ReadWrite,
WAYPOINT_MESSAGE.into(),
)?;

manager
.workload_builder("server", DEFAULT_NODE)