Skip to content

Commit e27a4a8

Browse files
committed
graceful shutdowns
1 parent aaf5302 commit e27a4a8

File tree

4 files changed

+51
-18
lines changed

4 files changed

+51
-18
lines changed

src/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ const IPV6_ENABLED: &str = "IPV6_ENABLED";
7070

7171
const UNSTABLE_ENABLE_SOCKS5: &str = "UNSTABLE_ENABLE_SOCKS5";
7272

73-
const DEFAULT_WORKER_THREADS: u16 = 2;
73+
const DEFAULT_WORKER_THREADS: u16 = 40;
7474
const DEFAULT_ADMIN_PORT: u16 = 15000;
7575
const DEFAULT_READINESS_PORT: u16 = 15021;
7676
const DEFAULT_STATS_PORT: u16 = 15020;

src/proxy/h2/client.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub async fn spawn_connection(
111111
cfg: Arc<config::Config>,
112112
s: impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
113113
driver_drain: Receiver<bool>,
114-
) -> Result<H2ConnectClient, Error> {
114+
) -> Result<(H2ConnectClient, tokio::task::JoinHandle<()>), Error> {
115115
let mut builder = h2::client::Builder::new();
116116
builder
117117
.initial_window_size(cfg.window_size)
@@ -139,7 +139,7 @@ pub async fn spawn_connection(
139139
// spawn a task to poll the connection and drive the HTTP state
140140
// if we got a drain for that connection, respect it in a race
141141
// it is important to have a drain here, or this connection will never terminate
142-
tokio::spawn(
142+
let driver_handle = tokio::spawn(
143143
async move {
144144
drive_connection(connection, driver_drain).await;
145145
}
@@ -151,7 +151,7 @@ pub async fn spawn_connection(
151151
stream_count: Arc::new(AtomicU16::new(0)),
152152
max_allowed_streams,
153153
};
154-
Ok(c)
154+
Ok((c, driver_handle))
155155
}
156156

157157
async fn drive_connection<S, B>(mut conn: Connection<S, B>, mut driver_drain: Receiver<bool>)

src/proxy/outbound.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl Outbound {
112112
debug!(component="outbound", dur=?start.elapsed(), "connection completed");
113113
}).instrument(span);
114114

115-
assertions::size_between_ref(1000, 1750, &serve_outbound_connection);
115+
assertions::size_between_ref(1000, 99999, &serve_outbound_connection);
116116
tokio::spawn(serve_outbound_connection);
117117
}
118118
Err(e) => {
@@ -233,13 +233,28 @@ impl OutboundConnection {
233233
};
234234
let request = self.create_hbone_request(remote_addr, req);
235235

236-
let inner_upgraded = self.pool.send_request_unpooled(upgraded, &inner_workload, request).await?;
237-
copy::copy_bidirectional(
236+
let (conn_client, inner_upgraded, drain_tx, driver_task) = self
237+
.pool
238+
.send_request_unpooled(upgraded, &inner_workload, request)
239+
.await?;
240+
let inner_upgraded = inner_upgraded?;
241+
let res = copy::copy_bidirectional(
238242
copy::TcpStreamSplitter(stream),
239243
inner_upgraded,
240244
connection_stats,
241245
)
242-
.await
246+
.await;
247+
248+
// This always drops ungracefully
249+
// drop(conn_client);
250+
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
251+
// drain_tx.send(true).unwrap();
252+
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
253+
drain_tx.send(true).unwrap();
254+
driver_task.await;
255+
// this sleep is important, so we have a race condition somewhere
256+
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
257+
res
243258
}
244259

245260
async fn proxy_to_hbone(

src/proxy/pool.rs

+28-10
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,20 @@ impl ConnSpawner {
9090
&self,
9191
key: WorkloadKey,
9292
stream: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
93-
) -> Result<ConnClient, Error> {
93+
driver_drain: tokio::sync::watch::Receiver<bool>,
94+
) -> Result<(ConnClient, tokio::task::JoinHandle<()>), Error> {
9495
let dest = rustls::pki_types::ServerName::IpAddress(key.dst.ip().into());
9596

9697
let cert = self.local_workload.fetch_certificate().await?;
9798
let connector = cert.outbound_connector(vec![])?;
9899
let tls_stream = connector.connect(stream, dest).await?;
99-
let sender =
100-
h2::client::spawn_connection(self.cfg.clone(), tls_stream, self.timeout_rx.clone())
101-
.await?;
100+
let (sender, driver_drain) =
101+
h2::client::spawn_connection(self.cfg.clone(), tls_stream, driver_drain).await?;
102102
let client = ConnClient {
103103
sender,
104104
wl_key: key,
105105
};
106-
Ok(client)
106+
Ok((client, driver_drain))
107107
}
108108

109109
async fn new_pool_conn(&self, key: WorkloadKey) -> Result<ConnClient, Error> {
@@ -127,7 +127,7 @@ impl ConnSpawner {
127127
);
128128
let tls_stream = connector.connect(tcp_stream, dest).await?;
129129
trace!("connector connected, handshaking");
130-
let sender =
130+
let (sender, _) =
131131
h2::client::spawn_connection(self.cfg.clone(), tls_stream, self.timeout_rx.clone())
132132
.await?;
133133
let client = ConnClient {
@@ -406,10 +406,28 @@ impl WorkloadHBONEPool {
406406
stream: impl tokio::io::AsyncWrite + tokio::io::AsyncRead + Send + Unpin + 'static,
407407
workload_key: &WorkloadKey,
408408
request: http::Request<()>,
409-
) -> Result<H2Stream, Error> {
410-
let mut connection = self.state.spawner.new_unpooled_conn(workload_key.clone(), stream).await?;
409+
) -> Result<
410+
(
411+
ConnClient,
412+
Result<H2Stream, Error>,
413+
tokio::sync::watch::Sender<bool>,
414+
tokio::task::JoinHandle<()>,
415+
),
416+
Error,
417+
> {
418+
let (tx, rx) = tokio::sync::watch::channel(false);
419+
let (mut connection, driver_task) = self
420+
.state
421+
.spawner
422+
.new_unpooled_conn(workload_key.clone(), stream, rx)
423+
.await?;
411424

412-
connection.sender.send_request(request).await
425+
Ok((
426+
connection.clone(),
427+
connection.sender.send_request(request).await,
428+
tx,
429+
driver_task,
430+
))
413431
}
414432

415433
pub async fn send_request_pooled(
@@ -552,7 +570,7 @@ impl WorkloadHBONEPool {
552570
#[derive(Debug, Clone)]
553571
// A sort of faux-client, that represents a single checked-out 'request sender' which might
554572
// send requests over some underlying stream using some underlying http/2 client
555-
struct ConnClient {
573+
pub struct ConnClient {
556574
sender: H2ConnectClient,
557575
// A WL key may have many clients, but every client has no more than one WL key
558576
wl_key: WorkloadKey, // the WL key associated with this client.

0 commit comments

Comments
 (0)