Skip to content

Commit 99c075e

Browse files
committed
feat(client): add Layer types for client::conn
1 parent b9dc3d2 commit 99c075e

File tree

2 files changed

+264
-0
lines changed

2 files changed

+264
-0
lines changed

src/client/conn.rs

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
//! todo
2+
3+
use std::future::Future;
4+
use std::marker::PhantomData;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use http::{Request, Response};
9+
use tower_service::Service;
10+
11+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
12+
13+
/// todo
14+
pub struct Http1Layer<B> {
15+
builder: hyper::client::conn::http1::Builder,
16+
_body: PhantomData<fn(B)>,
17+
}
18+
19+
/// todo
20+
pub fn http1<B>() -> Http1Layer<B> {
21+
Http1Layer {
22+
builder: hyper::client::conn::http1::Builder::new(),
23+
_body: PhantomData,
24+
}
25+
}
26+
27+
impl<M, B> tower_layer::Layer<M> for Http1Layer<B> {
28+
type Service = Http1Connect<M, B>;
29+
fn layer(&self, inner: M) -> Self::Service {
30+
Http1Connect {
31+
inner,
32+
builder: self.builder.clone(),
33+
_body: self._body,
34+
}
35+
}
36+
}
37+
38+
impl<B> Clone for Http1Layer<B> {
39+
fn clone(&self) -> Self {
40+
Self {
41+
builder: self.builder.clone(),
42+
_body: self._body.clone(),
43+
}
44+
}
45+
}
46+
47+
impl<B> From<hyper::client::conn::http1::Builder> for Http1Layer<B> {
48+
fn from(builder: hyper::client::conn::http1::Builder) -> Self {
49+
Self {
50+
builder,
51+
_body: PhantomData,
52+
}
53+
}
54+
}
55+
56+
/// todo
57+
pub struct Http1Connect<M, B> {
58+
inner: M,
59+
builder: hyper::client::conn::http1::Builder,
60+
_body: PhantomData<fn(B)>,
61+
}
62+
63+
impl<M, Dst, B> Service<Dst> for Http1Connect<M, B>
64+
where
65+
M: Service<Dst>,
66+
M::Future: Send + 'static,
67+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
68+
M::Error: Into<BoxError>,
69+
B: hyper::body::Body + Send + 'static,
70+
B::Data: Send + 'static,
71+
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
72+
{
73+
type Response = Http1ClientService<B>;
74+
type Error = BoxError;
75+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
76+
77+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78+
// Minimal, explicit contract: delegate readiness to the response future.
79+
// If you want strict backpressure, use the "permit" pattern (see notes).
80+
Poll::Ready(Ok(()))
81+
}
82+
83+
fn call(&mut self, dst: Dst) -> Self::Future {
84+
let fut = self.inner.call(dst);
85+
let builder = self.builder.clone();
86+
Box::pin(async move {
87+
let io = fut.await.map_err(Into::into)?;
88+
let (tx, conn) = builder.handshake(io).await?;
89+
tokio::spawn(async move {
90+
if let Err(e) = conn.await {
91+
eprintln!("connection error: {:?}", e);
92+
}
93+
});
94+
Ok(Http1ClientService::new(tx))
95+
})
96+
}
97+
}
98+
99+
impl<M: Clone, B> Clone for Http1Connect<M, B> {
100+
fn clone(&self) -> Self {
101+
Self {
102+
inner: self.inner.clone(),
103+
builder: self.builder.clone(),
104+
_body: self._body.clone(),
105+
}
106+
}
107+
}
108+
109+
/// todo
110+
pub struct Http2Layer<B> {
111+
_body: PhantomData<fn(B)>,
112+
}
113+
114+
/// todo
115+
pub fn http2<B>() -> Http2Layer<B> {
116+
Http2Layer { _body: PhantomData }
117+
}
118+
119+
impl<M, B> tower_layer::Layer<M> for Http2Layer<B> {
120+
type Service = Http2Connect<M, B>;
121+
fn layer(&self, inner: M) -> Self::Service {
122+
Http2Connect {
123+
inner,
124+
builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()),
125+
_body: self._body,
126+
}
127+
}
128+
}
129+
130+
impl<B> Clone for Http2Layer<B> {
131+
fn clone(&self) -> Self {
132+
Self {
133+
_body: self._body.clone(),
134+
}
135+
}
136+
}
137+
138+
/// todo
139+
#[derive(Debug)]
140+
pub struct Http2Connect<M, B> {
141+
inner: M,
142+
builder: hyper::client::conn::http2::Builder<crate::rt::TokioExecutor>,
143+
_body: PhantomData<fn(B)>,
144+
}
145+
146+
impl<M, Dst, B> Service<Dst> for Http2Connect<M, B>
147+
where
148+
M: Service<Dst>,
149+
M::Future: Send + 'static,
150+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
151+
M::Error: Into<BoxError>,
152+
B: hyper::body::Body + Unpin + Send + 'static,
153+
B::Data: Send + 'static,
154+
B::Error: Into<BoxError>,
155+
{
156+
type Response = Http2ClientService<B>;
157+
type Error = BoxError;
158+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
159+
160+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161+
// Minimal, explicit contract: delegate readiness to the response future.
162+
// If you want strict backpressure, use the "permit" pattern (see notes).
163+
Poll::Ready(Ok(()))
164+
}
165+
166+
fn call(&mut self, dst: Dst) -> Self::Future {
167+
let fut = self.inner.call(dst);
168+
let builder = self.builder.clone();
169+
Box::pin(async move {
170+
let io = fut.await.map_err(Into::into)?;
171+
let (tx, conn) = builder.handshake(io).await?;
172+
tokio::spawn(async move {
173+
if let Err(e) = conn.await {
174+
eprintln!("connection error: {:?}", e);
175+
}
176+
});
177+
Ok(Http2ClientService::new(tx))
178+
})
179+
}
180+
}
181+
182+
impl<M: Clone, B> Clone for Http2Connect<M, B> {
183+
fn clone(&self) -> Self {
184+
Self {
185+
inner: self.inner.clone(),
186+
builder: self.builder.clone(),
187+
_body: self._body.clone(),
188+
}
189+
}
190+
}
191+
192+
/// A thin adapter over hyper HTTP/1 client SendRequest.
193+
#[derive(Debug)]
194+
pub struct Http1ClientService<B> {
195+
tx: hyper::client::conn::http1::SendRequest<B>,
196+
}
197+
198+
impl<B> Http1ClientService<B> {
199+
/// todo
200+
pub fn new(tx: hyper::client::conn::http1::SendRequest<B>) -> Self {
201+
Self { tx }
202+
}
203+
}
204+
205+
impl<B> Service<Request<B>> for Http1ClientService<B>
206+
where
207+
B: hyper::body::Body + Send + 'static,
208+
{
209+
type Response = Response<hyper::body::Incoming>;
210+
type Error = hyper::Error;
211+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
212+
213+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
214+
// Minimal, explicit contract: delegate readiness to the response future.
215+
// If you want strict backpressure, use the "permit" pattern (see notes).
216+
Poll::Ready(Ok(()))
217+
}
218+
219+
fn call(&mut self, req: Request<B>) -> Self::Future {
220+
let fut = self.tx.send_request(req);
221+
Box::pin(async move { Ok(fut.await?) })
222+
}
223+
}
224+
225+
/// todo
226+
#[derive(Debug)]
227+
pub struct Http2ClientService<B> {
228+
tx: hyper::client::conn::http2::SendRequest<B>,
229+
}
230+
231+
impl<B> Http2ClientService<B> {
232+
/// todo
233+
pub fn new(tx: hyper::client::conn::http2::SendRequest<B>) -> Self {
234+
Self { tx }
235+
}
236+
}
237+
238+
impl<B> Service<Request<B>> for Http2ClientService<B>
239+
where
240+
B: hyper::body::Body + Send + 'static,
241+
{
242+
type Response = Response<hyper::body::Incoming>;
243+
type Error = hyper::Error;
244+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
245+
246+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
247+
Poll::Ready(Ok(()))
248+
}
249+
250+
fn call(&mut self, req: Request<B>) -> Self::Future {
251+
let fut = self.tx.send_request(req);
252+
Box::pin(async move { Ok(fut.await?) })
253+
}
254+
}
255+
256+
impl<B> Clone for Http2ClientService<B> {
257+
fn clone(&self) -> Self {
258+
Self {
259+
tx: self.tx.clone(),
260+
}
261+
}
262+
}

src/client/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! HTTP client utilities
22
3+
pub mod conn;
4+
35
/// Legacy implementations of `connect` module and `Client`
46
#[cfg(feature = "client-legacy")]
57
pub mod legacy;

0 commit comments

Comments
 (0)