Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: retry outbound connection establishment #1473

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
POC: retry outbound connection establishment
TBD:
* What telemetry do we produce when we retry?
* Should we retry all conditions? What about timeouts (retrying these
  increases total timeout time!). What about HBONE status codes?
* How do we pick the new host. Entirely random? Avoid the current host?
howardjohn committed Feb 27, 2025
commit 674ee10ef4ac63a7ac331b1bfb41e4363fd0f25c
10 changes: 5 additions & 5 deletions examples/localhost.yaml
Original file line number Diff line number Diff line change
@@ -14,20 +14,20 @@ workloads:
80: 8080
"default/example2.com":
80: 8080
# Define another local address, but this one uses TCP. This allows testing HBONE and TCP with one config.
- uid: cluster1//v1/Pod/default/local-tcp
name: local-tcp
- uid: cluster1//v1/Pod/default/local2
name: local2
namespace: default
serviceAccount: default
workloadIps: ["127.0.0.2"]
protocol: TCP
protocol: HBONE
node: local
network: ""
services:
"default/example.com":
80: 8080
80: 8081
"default/example2.com":
80: 8080

policies:
- action: Allow
rules:
6 changes: 3 additions & 3 deletions src/copy.rs
Original file line number Diff line number Diff line change
@@ -30,15 +30,15 @@ use tracing::trace;

// BufferedSplitter is a trait to expose splitting an IO object into a buffered reader and a writer
pub trait BufferedSplitter: Unpin {
type R: ResizeBufRead + Unpin;
type W: AsyncWriteBuf + Unpin;
type R: ResizeBufRead + Unpin + Send + Sync;
type W: AsyncWriteBuf + Unpin + Send + Sync;
fn split_into_buffered_reader(self) -> (Self::R, Self::W);
}

// Generic BufferedSplitter for anything that can Read/Write.
impl<I> BufferedSplitter for I
where
I: AsyncRead + AsyncWrite + Unpin,
I: AsyncRead + AsyncWrite + Unpin + Send + Sync,
{
type R = BufReader<io::ReadHalf<I>>;
type W = WriteAdapter<io::WriteHalf<I>>;
147 changes: 81 additions & 66 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ use crate::proxy::metrics::Reporter;
use crate::proxy::{BAGGAGE_HEADER, Error, ProxyInputs, TRACEPARENT_HEADER, TraceParent, util};
use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics, pool};

use crate::copy::BufferedSplitter;
use crate::drain::DrainWatcher;
use crate::drain::run_with_drain;
use crate::proxy::h2::H2Stream;
@@ -119,7 +120,7 @@ impl Outbound {
}
}
}
.in_current_span()
.in_current_span()
};

run_with_drain(
@@ -139,6 +140,11 @@ pub(super) struct OutboundConnection {
pub(super) hbone_port: u16,
}

enum Connection {
Tcp(TcpStream),
Hbone(H2Stream),
}

impl OutboundConnection {
async fn proxy(&mut self, source_stream: TcpStream) {
let source_addr =
@@ -161,60 +167,80 @@ impl OutboundConnection {
metrics::log_early_deny(source_addr, dest_addr, Reporter::source, Error::SelfCall);
return;
}
// First find the source workload of this traffic. If we don't know where the request is from
// we will reject it.
let build = self
.pi
.local_workload_information
.get_workload()
.and_then(|source| self.build_request(source, source_addr.ip(), dest_addr));
let req = match Box::pin(build).await {
Ok(req) => Box::new(req),
Err(err) => {
metrics::log_early_deny(source_addr, dest_addr, Reporter::source, err);
return;
}
};
// TODO: should we use the original address or the actual address? Both seems nice!
let _conn_guard = self.pi.connection_manager.track_outbound(
source_addr,
dest_addr,
req.actual_destination,
);
for attempt in 0..5 {
// First find the source workload of this traffic. If we don't know where the request is from
// we will reject it.
let build = self
.pi
.local_workload_information
.get_workload()
.and_then(|source| self.build_request(source, source_addr.ip(), dest_addr));
let req = match Box::pin(build).await {
Ok(req) => Box::new(req),
Err(err) => {
metrics::log_early_deny(source_addr, dest_addr, Reporter::source, err);
return;
}
};
// TODO: should we use the original address or the actual address? Both seems nice!
let _conn_guard = self.pi.connection_manager.track_outbound(
source_addr,
dest_addr,
req.actual_destination,
);

let metrics = self.pi.metrics.clone();
let hbone_target = req.hbone_target_destination;
let result_tracker = Box::new(ConnectionResult::new(
source_addr,
req.actual_destination,
hbone_target,
start,
Self::conn_metrics_from_request(&req),
metrics,
));
let metrics = self.pi.metrics.clone();
let hbone_target = req.hbone_target_destination;
let result_tracker = Box::new(ConnectionResult::new(
source_addr,
req.actual_destination,
hbone_target,
start,
Self::conn_metrics_from_request(&req),
metrics,
));

let res = match req.protocol {
Protocol::HBONE => {
self.proxy_to_hbone(source_stream, source_addr, &req, &result_tracker)
.await
}
Protocol::TCP => {
self.proxy_to_tcp(source_stream, &req, &result_tracker)
.await
let res = match req.protocol {
Protocol::HBONE => self.proxy_to_hbone(source_addr, &req).await,
Protocol::TCP => self.proxy_to_tcp(&req).await,
};

match res {
Err(e) => {
result_tracker.record(Err(e));
tracing::error!("howardjohn: RETRY");
}
Ok(upstream) => {
// Proxying data between downstream and upstream
let src = copy::TcpStreamSplitter(source_stream);
let res = match upstream {
Connection::Tcp(s) => {
copy::copy_bidirectional(
src,
copy::TcpStreamSplitter(s),
&result_tracker,
)
.await
}
Connection::Hbone(s) => {
copy::copy_bidirectional(src, s, &result_tracker).await
}
};
result_tracker.record(res);
return;
}
}
};
result_tracker.record(res)
}
}

async fn proxy_to_hbone(
&mut self,
stream: TcpStream,
remote_addr: SocketAddr,
req: &Request,
connection_stats: &ConnectionResult,
) -> Result<(), Error> {
let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?;
copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, connection_stats).await
) -> Result<Connection, Error> {
self.send_hbone_request(remote_addr, req)
.await
.map(Connection::Hbone)
}

async fn send_hbone_request(
@@ -253,26 +279,15 @@ impl OutboundConnection {
Ok(upgraded)
}

async fn proxy_to_tcp(
&mut self,
stream: TcpStream,
req: &Request,
connection_stats: &ConnectionResult,
) -> Result<(), Error> {
let outbound = super::freebind_connect(
None, // No need to spoof source IP on outbound
req.actual_destination,
self.pi.socket_factory.as_ref(),
)
.await?;

// Proxying data between downstream and upstream
copy::copy_bidirectional(
copy::TcpStreamSplitter(stream),
copy::TcpStreamSplitter(outbound),
connection_stats,
)
.await
async fn proxy_to_tcp(&mut self, req: &Request) -> Result<Connection, Error> {
Ok(Connection::Tcp(
super::freebind_connect(
None, // No need to spoof source IP on outbound
req.actual_destination,
self.pi.socket_factory.as_ref(),
)
.await?,
))
}

fn conn_metrics_from_request(req: &Request) -> ConnectionOpen {