Skip to content

Commit 778d49d

Browse files
committed
pools wip
1 parent 6c3f61d commit 778d49d

File tree

7 files changed

+391
-28
lines changed

7 files changed

+391
-28
lines changed

Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
4343
http-body-util = "0.1.0"
4444
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
4545
tokio-test = "0.4"
46+
tower = { version = "0.5", features = ["util"] }
4647
tower-test = "0.4"
4748
pretty_env_logger = "0.5"
4849

@@ -77,7 +78,7 @@ full = [
7778

7879
client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
7980
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
80-
client-pool = ["dep:futures-util", "dep:tower-layer"]
81+
client-pool = ["tokio/sync", "dep:futures-util", "dep:tower-layer"]
8182
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
8283
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
8384

@@ -99,6 +100,10 @@ __internal_happy_eyeballs_tests = []
99100

100101
[[example]]
101102
name = "client"
103+
required-features = ["client-legacy", "client-pool", "http1", "tokio"]
104+
105+
[[example]]
106+
name = "client_legacy"
102107
required-features = ["client-legacy", "http1", "tokio"]
103108

104109
[[example]]

examples/client.rs

Lines changed: 146 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,157 @@
1-
use std::env;
1+
use tower::ServiceExt;
2+
use tower_service::Service;
23

3-
use http_body_util::Empty;
4-
use hyper::Request;
5-
use hyper_util::client::legacy::{connect::HttpConnector, Client};
4+
use hyper_util::client::pool;
65

76
#[tokio::main(flavor = "current_thread")]
8-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
9-
let url = match env::args().nth(1) {
10-
Some(url) => url,
11-
None => {
12-
eprintln!("Usage: client <url>");
13-
return Ok(());
14-
}
15-
};
16-
17-
// HTTPS requires picking a TLS implementation, so give a better
18-
// warning if the user tries to request an 'https' URL.
19-
let url = url.parse::<hyper::Uri>()?;
20-
if url.scheme_str() != Some("http") {
21-
eprintln!("This example only works with 'http' URLs.");
22-
return Ok(());
7+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8+
send_nego().await
9+
}
10+
11+
async fn send_h1() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
12+
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();
13+
14+
let http1 = tcp.and_then(|conn| {
15+
Box::pin(async move {
16+
let (mut tx, c) = hyper::client::conn::http1::handshake::<
17+
_,
18+
http_body_util::Empty<hyper::body::Bytes>,
19+
>(conn)
20+
.await?;
21+
tokio::spawn(async move {
22+
if let Err(e) = c.await {
23+
eprintln!("connection error: {:?}", e);
24+
}
25+
});
26+
let svc = tower::service_fn(move |req| tx.send_request(req));
27+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
28+
})
29+
});
30+
31+
let mut p = pool::Cache::new(http1).build();
32+
33+
let mut c = p.call(http::Uri::from_static("http://hyper.rs")).await?;
34+
eprintln!("{:?}", c);
35+
36+
let req = http::Request::builder()
37+
.header("host", "hyper.rs")
38+
.body(http_body_util::Empty::new())
39+
.unwrap();
40+
41+
c.ready().await?;
42+
let resp = c.call(req).await?;
43+
eprintln!("{:?}", resp);
44+
45+
Ok(())
46+
}
47+
48+
async fn send_h2() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
49+
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();
50+
51+
let http2 = tcp.and_then(|conn| {
52+
Box::pin(async move {
53+
let (mut tx, c) = hyper::client::conn::http2::handshake::<
54+
_,
55+
_,
56+
http_body_util::Empty<hyper::body::Bytes>,
57+
>(hyper_util::rt::TokioExecutor::new(), conn)
58+
.await?;
59+
println!("connected");
60+
tokio::spawn(async move {
61+
if let Err(e) = c.await {
62+
eprintln!("connection error: {:?}", e);
63+
}
64+
});
65+
let svc = tower::service_fn(move |req| tx.send_request(req));
66+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
67+
})
68+
});
69+
70+
let mut p = pool::Singleton::new(http2);
71+
72+
for _ in 0..5 {
73+
let mut c = p
74+
.call(http::Uri::from_static("http://localhost:3000"))
75+
.await?;
76+
eprintln!("{:?}", c);
77+
78+
let req = http::Request::builder()
79+
.header("host", "hyper.rs")
80+
.body(http_body_util::Empty::new())
81+
.unwrap();
82+
83+
c.ready().await?;
84+
let resp = c.call(req).await?;
85+
eprintln!("{:?}", resp);
2386
}
2487

25-
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new());
88+
Ok(())
89+
}
2690

27-
let req = Request::builder()
28-
.uri(url)
29-
.body(Empty::<bytes::Bytes>::new())?;
91+
async fn send_nego() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
92+
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();
3093

31-
let resp = client.request(req).await?;
94+
let http1 = tower::layer::layer_fn(|tcp| {
95+
tower::service_fn(move |dst| {
96+
let inner = tcp.call(dst);
97+
async move {
98+
let conn = inner.await?;
99+
let (mut tx, c) = hyper::client::conn::http1::handshake::<
100+
_,
101+
http_body_util::Empty<hyper::body::Bytes>,
102+
>(conn)
103+
.await?;
104+
tokio::spawn(async move {
105+
if let Err(e) = c.await {
106+
eprintln!("connection error: {:?}", e);
107+
}
108+
});
109+
let svc = tower::service_fn(move |req| tx.send_request(req));
110+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
111+
}
112+
})
113+
});
32114

33-
eprintln!("{:?} {:?}", resp.version(), resp.status());
34-
eprintln!("{:#?}", resp.headers());
115+
let http2 = tower::layer::layer_fn(|tcp| {
116+
tower::service_fn(move |dst| {
117+
let inner = tcp.call(dst);
118+
async move {
119+
let conn = inner.await?;
120+
let (mut tx, c) = hyper::client::conn::http2::handshake::<
121+
_,
122+
_,
123+
http_body_util::Empty<hyper::body::Bytes>,
124+
>(hyper_util::rt::TokioExecutor::new(), conn)
125+
.await?;
126+
println!("connected");
127+
tokio::spawn(async move {
128+
if let Err(e) = c.await {
129+
eprintln!("connection error: {:?}", e);
130+
}
131+
});
132+
let svc = tower::service_fn(move |req| tx.send_request(req));
133+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
134+
}
135+
})
136+
});
137+
138+
let mut svc = pool::negotiate(tcp, |_| false, http1, http2);
139+
140+
for _ in 0..5 {
141+
let mut c = svc
142+
.call(http::Uri::from_static("http://localhost:3000"))
143+
.await?;
144+
eprintln!("{:?}", c);
145+
146+
let req = http::Request::builder()
147+
.header("host", "hyper.rs")
148+
.body(http_body_util::Empty::new())
149+
.unwrap();
150+
151+
c.ready().await?;
152+
let resp = c.call(req).await?;
153+
eprintln!("{:?}", resp);
154+
}
35155

36156
Ok(())
37157
}

examples/client_legacy.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use std::env;
2+
3+
use http_body_util::Empty;
4+
use hyper::Request;
5+
use hyper_util::client::legacy::{connect::HttpConnector, Client};
6+
7+
#[tokio::main(flavor = "current_thread")]
8+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
9+
let url = match env::args().nth(1) {
10+
Some(url) => url,
11+
None => {
12+
eprintln!("Usage: client <url>");
13+
return Ok(());
14+
}
15+
};
16+
17+
// HTTPS requires picking a TLS implementation, so give a better
18+
// warning if the user tries to request an 'https' URL.
19+
let url = url.parse::<hyper::Uri>()?;
20+
if url.scheme_str() != Some("http") {
21+
eprintln!("This example only works with 'http' URLs.");
22+
return Ok(());
23+
}
24+
25+
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new());
26+
27+
let req = Request::builder()
28+
.uri(url)
29+
.body(Empty::<bytes::Bytes>::new())?;
30+
31+
let resp = client.request(req).await?;
32+
33+
eprintln!("{:?} {:?}", resp.version(), resp.status());
34+
eprintln!("{:#?}", resp.headers());
35+
36+
Ok(())
37+
}

src/client/pool/expire.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub struct Expire {
2+
3+
}

0 commit comments

Comments
 (0)