Skip to content

Commit b57bd96

Browse files
committed
fix(http2): fix internals of HTTP/2 CONNECT upgrades
This refactors the way hyper handles HTTP/2 CONNECT / Extended CONNECT. Before, an uninhabited enum was used to try to prevent sending of the `Buf` type once the STREAM had been upgraded. However, the way it was originally written was incorrect, and will eventually have compilation issues. The change here is to spawn an extra task and use a channel to bridge the IO operations of the `Upgraded` object to be `Cursor` buffers in the new task. ref: rust-lang/rust#147588
1 parent f9f8f44 commit b57bd96

File tree

7 files changed

+382
-288
lines changed

7 files changed

+382
-288
lines changed

src/client/conn/http2.rs

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -640,44 +640,6 @@ mod tests {
640640
}
641641
}
642642

643-
#[tokio::test]
644-
#[ignore] // only compilation is checked
645-
async fn not_send_not_sync_executor_of_send_futures() {
646-
#[derive(Clone)]
647-
struct TokioExecutor {
648-
// !Send, !Sync
649-
_x: std::marker::PhantomData<std::rc::Rc<()>>,
650-
}
651-
652-
impl<F> crate::rt::Executor<F> for TokioExecutor
653-
where
654-
F: std::future::Future + 'static + Send,
655-
F::Output: Send + 'static,
656-
{
657-
fn execute(&self, fut: F) {
658-
tokio::task::spawn(fut);
659-
}
660-
}
661-
662-
#[allow(unused)]
663-
async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
664-
let (_sender, conn) =
665-
crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
666-
TokioExecutor {
667-
_x: Default::default(),
668-
},
669-
io,
670-
)
671-
.await
672-
.unwrap();
673-
674-
tokio::task::spawn_local(async move {
675-
// can't use spawn here because when executor is !Send
676-
conn.await.unwrap();
677-
});
678-
}
679-
}
680-
681643
#[tokio::test]
682644
#[ignore] // only compilation is checked
683645
async fn send_not_sync_executor_of_send_futures() {

src/client/dispatch.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,22 +325,23 @@ impl<T> TrySendError<T> {
325325

326326
#[cfg(feature = "http2")]
327327
pin_project! {
328-
pub struct SendWhen<B>
328+
pub struct SendWhen<B, E>
329329
where
330330
B: Body,
331331
B: 'static,
332332
{
333333
#[pin]
334-
pub(crate) when: ResponseFutMap<B>,
334+
pub(crate) when: ResponseFutMap<B, E>,
335335
#[pin]
336336
pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
337337
}
338338
}
339339

340340
#[cfg(feature = "http2")]
341-
impl<B> Future for SendWhen<B>
341+
impl<B, E> Future for SendWhen<B, E>
342342
where
343343
B: Body + 'static,
344+
E: crate::rt::bounds::Http2UpgradedExec<B::Data>,
344345
{
345346
type Output = ();
346347

src/proto/h2/client.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,16 @@ use http::{Method, StatusCode};
1818
use pin_project_lite::pin_project;
1919

2020
use super::ping::{Ponger, Recorder};
21-
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
21+
use super::{ping, PipeToSendStream, SendBuf};
2222
use crate::body::{Body, Incoming as IncomingBody};
2323
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
2424
use crate::common::either::Either;
2525
use crate::common::io::Compat;
2626
use crate::common::time::Time;
2727
use crate::ext::Protocol;
2828
use crate::headers;
29-
use crate::proto::h2::UpgradedSendStream;
3029
use crate::proto::Dispatched;
31-
use crate::rt::bounds::Http2ClientConnExec;
30+
use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
3231
use crate::upgrade::Upgraded;
3332
use crate::{Request, Response};
3433
use h2::client::ResponseFuture;
@@ -151,7 +150,7 @@ where
151150
T: Read + Write + Unpin,
152151
B: Body + 'static,
153152
B::Data: Send + 'static,
154-
E: Http2ClientConnExec<B, T> + Unpin,
153+
E: Http2ClientConnExec<B, T> + Clone + Unpin,
155154
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
156155
{
157156
let (h2_tx, mut conn) = new_builder(config)
@@ -357,7 +356,7 @@ where
357356

358357
pin_project! {
359358
#[project = H2ClientFutureProject]
360-
pub enum H2ClientFuture<B, T>
359+
pub enum H2ClientFuture<B, T, E>
361360
where
362361
B: http_body::Body,
363362
B: 'static,
@@ -372,7 +371,7 @@ pin_project! {
372371
},
373372
Send {
374373
#[pin]
375-
send_when: SendWhen<B>,
374+
send_when: SendWhen<B, E>,
376375
},
377376
Task {
378377
#[pin]
@@ -381,11 +380,12 @@ pin_project! {
381380
}
382381
}
383382

384-
impl<B, T> Future for H2ClientFuture<B, T>
383+
impl<B, T, E> Future for H2ClientFuture<B, T, E>
385384
where
386385
B: http_body::Body + 'static,
387386
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
388387
T: Read + Write + Unpin,
388+
E: Http2UpgradedExec<B::Data>,
389389
{
390390
type Output = ();
391391

@@ -484,7 +484,7 @@ impl<B, E, T> ClientTask<B, E, T>
484484
where
485485
B: Body + 'static + Unpin,
486486
B::Data: Send,
487-
E: Http2ClientConnExec<B, T> + Unpin,
487+
E: Http2ClientConnExec<B, T> + Clone + Unpin,
488488
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
489489
T: Read + Write + Unpin,
490490
{
@@ -529,6 +529,7 @@ where
529529
fut: f.fut,
530530
ping: Some(ping),
531531
send_stream: Some(send_stream),
532+
exec: self.executor.clone(),
532533
},
533534
call_back: Some(f.cb),
534535
},
@@ -537,28 +538,29 @@ where
537538
}
538539

539540
pin_project! {
540-
pub(crate) struct ResponseFutMap<B>
541+
pub(crate) struct ResponseFutMap<B, E>
541542
where
542543
B: Body,
543544
B: 'static,
544545
{
545546
#[pin]
546547
fut: ResponseFuture,
547-
#[pin]
548548
ping: Option<Recorder>,
549549
#[pin]
550550
send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
551+
exec: E,
551552
}
552553
}
553554

554-
impl<B> Future for ResponseFutMap<B>
555+
impl<B, E> Future for ResponseFutMap<B, E>
555556
where
556557
B: Body + 'static,
558+
E: Http2UpgradedExec<B::Data>,
557559
{
558560
type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
559561

560-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
561-
let mut this = self.project();
562+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563+
let mut this = self.as_mut().project();
562564

563565
let result = ready!(this.fut.poll(cx));
564566

@@ -585,13 +587,10 @@ where
585587
let mut res = Response::from_parts(parts, IncomingBody::empty());
586588

587589
let (pending, on_upgrade) = crate::upgrade::pending();
588-
let io = H2Upgraded {
589-
ping,
590-
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
591-
recv_stream,
592-
buf: Bytes::new(),
593-
};
594-
let upgraded = Upgraded::new(io, Bytes::new());
590+
591+
let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
592+
self.exec.execute_upgrade(up_task);
593+
let upgraded = Upgraded::new(h2_up, Bytes::new());
595594

596595
pending.fulfill(upgraded);
597596
res.extensions_mut().insert(on_upgrade);
@@ -620,7 +619,7 @@ where
620619
B: Body + 'static + Unpin,
621620
B::Data: Send,
622621
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
623-
E: Http2ClientConnExec<B, T> + Unpin,
622+
E: Http2ClientConnExec<B, T> + Clone + Unpin,
624623
T: Read + Write + Unpin,
625624
{
626625
type Output = crate::Result<Dispatched>;

0 commit comments

Comments
 (0)