Skip to content

Commit 79beee3

Browse files
committed
fix(http2): fix internals of HTTP/2 CONNECT upgrades
This refactors the way hyper handles HTTP/2 CONNECT / Extended CONNECT. Before, an uninhabited enum was used to try to prevent sending of the `Buf` type once the STREAM had been upgraded. However, the way it was originally written was incorrect, and will eventually have compilation issues. The change here is to spawn an extra task and use a channel to bridge the IO operations of the `Upgraded` object to be `Cursor` buffers in the new task. ref: rust-lang/rust#147588
1 parent f9f8f44 commit 79beee3

File tree

6 files changed

+382
-250
lines changed

6 files changed

+382
-250
lines changed

src/client/dispatch.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,22 +325,23 @@ impl<T> TrySendError<T> {
325325

326326
#[cfg(feature = "http2")]
327327
pin_project! {
328-
pub struct SendWhen<B>
328+
pub struct SendWhen<B, E>
329329
where
330330
B: Body,
331331
B: 'static,
332332
{
333333
#[pin]
334-
pub(crate) when: ResponseFutMap<B>,
334+
pub(crate) when: ResponseFutMap<B, E>,
335335
#[pin]
336336
pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
337337
}
338338
}
339339

340340
#[cfg(feature = "http2")]
341-
impl<B> Future for SendWhen<B>
341+
impl<B, E> Future for SendWhen<B, E>
342342
where
343343
B: Body + 'static,
344+
E: crate::rt::bounds::Http2UpgradedExec<B::Data>,
344345
{
345346
type Output = ();
346347

src/proto/h2/client.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,16 @@ use http::{Method, StatusCode};
1818
use pin_project_lite::pin_project;
1919

2020
use super::ping::{Ponger, Recorder};
21-
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
21+
use super::{ping, PipeToSendStream, SendBuf};
2222
use crate::body::{Body, Incoming as IncomingBody};
2323
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
2424
use crate::common::either::Either;
2525
use crate::common::io::Compat;
2626
use crate::common::time::Time;
2727
use crate::ext::Protocol;
2828
use crate::headers;
29-
use crate::proto::h2::UpgradedSendStream;
3029
use crate::proto::Dispatched;
31-
use crate::rt::bounds::Http2ClientConnExec;
30+
use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
3231
use crate::upgrade::Upgraded;
3332
use crate::{Request, Response};
3433
use h2::client::ResponseFuture;
@@ -151,7 +150,7 @@ where
151150
T: Read + Write + Unpin,
152151
B: Body + 'static,
153152
B::Data: Send + 'static,
154-
E: Http2ClientConnExec<B, T> + Unpin,
153+
E: Http2ClientConnExec<B, T> + Clone + Unpin,
155154
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
156155
{
157156
let (h2_tx, mut conn) = new_builder(config)
@@ -357,7 +356,7 @@ where
357356

358357
pin_project! {
359358
#[project = H2ClientFutureProject]
360-
pub enum H2ClientFuture<B, T>
359+
pub enum H2ClientFuture<B, T, E>
361360
where
362361
B: http_body::Body,
363362
B: 'static,
@@ -372,7 +371,7 @@ pin_project! {
372371
},
373372
Send {
374373
#[pin]
375-
send_when: SendWhen<B>,
374+
send_when: SendWhen<B, E>,
376375
},
377376
Task {
378377
#[pin]
@@ -381,11 +380,12 @@ pin_project! {
381380
}
382381
}
383382

384-
impl<B, T> Future for H2ClientFuture<B, T>
383+
impl<B, T, E> Future for H2ClientFuture<B, T, E>
385384
where
386385
B: http_body::Body + 'static,
387386
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
388387
T: Read + Write + Unpin,
388+
E: Http2UpgradedExec<B::Data>,
389389
{
390390
type Output = ();
391391

@@ -484,7 +484,7 @@ impl<B, E, T> ClientTask<B, E, T>
484484
where
485485
B: Body + 'static + Unpin,
486486
B::Data: Send,
487-
E: Http2ClientConnExec<B, T> + Unpin,
487+
E: Http2ClientConnExec<B, T> + Clone + Unpin,
488488
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
489489
T: Read + Write + Unpin,
490490
{
@@ -529,6 +529,7 @@ where
529529
fut: f.fut,
530530
ping: Some(ping),
531531
send_stream: Some(send_stream),
532+
exec: self.executor.clone(),
532533
},
533534
call_back: Some(f.cb),
534535
},
@@ -537,28 +538,29 @@ where
537538
}
538539

539540
pin_project! {
540-
pub(crate) struct ResponseFutMap<B>
541+
pub(crate) struct ResponseFutMap<B, E>
541542
where
542543
B: Body,
543544
B: 'static,
544545
{
545546
#[pin]
546547
fut: ResponseFuture,
547-
#[pin]
548548
ping: Option<Recorder>,
549549
#[pin]
550550
send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
551+
exec: E,
551552
}
552553
}
553554

554-
impl<B> Future for ResponseFutMap<B>
555+
impl<B, E> Future for ResponseFutMap<B, E>
555556
where
556557
B: Body + 'static,
558+
E: Http2UpgradedExec<B::Data>,
557559
{
558560
type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
559561

560-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
561-
let mut this = self.project();
562+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563+
let mut this = self.as_mut().project();
562564

563565
let result = ready!(this.fut.poll(cx));
564566

@@ -585,13 +587,10 @@ where
585587
let mut res = Response::from_parts(parts, IncomingBody::empty());
586588

587589
let (pending, on_upgrade) = crate::upgrade::pending();
588-
let io = H2Upgraded {
589-
ping,
590-
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
591-
recv_stream,
592-
buf: Bytes::new(),
593-
};
594-
let upgraded = Upgraded::new(io, Bytes::new());
590+
591+
let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
592+
self.exec.execute_upgrade(up_task);
593+
let upgraded = Upgraded::new(h2_up, Bytes::new());
595594

596595
pending.fulfill(upgraded);
597596
res.extensions_mut().insert(on_upgrade);
@@ -620,7 +619,7 @@ where
620619
B: Body + 'static + Unpin,
621620
B::Data: Send,
622621
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
623-
E: Http2ClientConnExec<B, T> + Unpin,
622+
E: Http2ClientConnExec<B, T> + Clone + Unpin,
624623
T: Read + Write + Unpin,
625624
{
626625
type Output = crate::Result<Dispatched>;

src/proto/h2/mod.rs

Lines changed: 3 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
use std::error::Error as StdError;
22
use std::future::Future;
33
use std::io::{Cursor, IoSlice};
4-
use std::mem;
54
use std::pin::Pin;
65
use std::task::{Context, Poll};
76

8-
use bytes::{Buf, Bytes};
7+
use bytes::Buf;
98
use futures_core::ready;
10-
use h2::{Reason, RecvStream, SendStream};
9+
use h2::SendStream;
1110
use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
1211
use http::HeaderMap;
1312
use pin_project_lite::pin_project;
1413

1514
use crate::body::Body;
16-
use crate::proto::h2::ping::Recorder;
17-
use crate::rt::{Read, ReadBufCursor, Write};
1815

1916
pub(crate) mod ping;
17+
pub(crate) mod upgrade;
2018

2119
cfg_client! {
2220
pub(crate) mod client;
@@ -259,188 +257,3 @@ impl<B: Buf> Buf for SendBuf<B> {
259257
}
260258
}
261259
}
262-
263-
struct H2Upgraded<B>
264-
where
265-
B: Buf,
266-
{
267-
ping: Recorder,
268-
send_stream: UpgradedSendStream<B>,
269-
recv_stream: RecvStream,
270-
buf: Bytes,
271-
}
272-
273-
impl<B> Read for H2Upgraded<B>
274-
where
275-
B: Buf,
276-
{
277-
fn poll_read(
278-
mut self: Pin<&mut Self>,
279-
cx: &mut Context<'_>,
280-
mut read_buf: ReadBufCursor<'_>,
281-
) -> Poll<Result<(), std::io::Error>> {
282-
if self.buf.is_empty() {
283-
self.buf = loop {
284-
match ready!(self.recv_stream.poll_data(cx)) {
285-
None => return Poll::Ready(Ok(())),
286-
Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
287-
continue
288-
}
289-
Some(Ok(buf)) => {
290-
self.ping.record_data(buf.len());
291-
break buf;
292-
}
293-
Some(Err(e)) => {
294-
return Poll::Ready(match e.reason() {
295-
Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
296-
Some(Reason::STREAM_CLOSED) => {
297-
Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
298-
}
299-
_ => Err(h2_to_io_error(e)),
300-
})
301-
}
302-
}
303-
};
304-
}
305-
let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
306-
read_buf.put_slice(&self.buf[..cnt]);
307-
self.buf.advance(cnt);
308-
let _ = self.recv_stream.flow_control().release_capacity(cnt);
309-
Poll::Ready(Ok(()))
310-
}
311-
}
312-
313-
impl<B> Write for H2Upgraded<B>
314-
where
315-
B: Buf,
316-
{
317-
fn poll_write(
318-
mut self: Pin<&mut Self>,
319-
cx: &mut Context<'_>,
320-
buf: &[u8],
321-
) -> Poll<Result<usize, std::io::Error>> {
322-
if buf.is_empty() {
323-
return Poll::Ready(Ok(0));
324-
}
325-
self.send_stream.reserve_capacity(buf.len());
326-
327-
// We ignore all errors returned by `poll_capacity` and `write`, as we
328-
// will get the correct from `poll_reset` anyway.
329-
let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
330-
None => Some(0),
331-
Some(Ok(cnt)) => self
332-
.send_stream
333-
.write(&buf[..cnt], false)
334-
.ok()
335-
.map(|()| cnt),
336-
Some(Err(_)) => None,
337-
};
338-
339-
if let Some(cnt) = cnt {
340-
return Poll::Ready(Ok(cnt));
341-
}
342-
343-
Poll::Ready(Err(h2_to_io_error(
344-
match ready!(self.send_stream.poll_reset(cx)) {
345-
Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
346-
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
347-
}
348-
Ok(reason) => reason.into(),
349-
Err(e) => e,
350-
},
351-
)))
352-
}
353-
354-
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
355-
Poll::Ready(Ok(()))
356-
}
357-
358-
fn poll_shutdown(
359-
mut self: Pin<&mut Self>,
360-
cx: &mut Context<'_>,
361-
) -> Poll<Result<(), std::io::Error>> {
362-
if self.send_stream.write(&[], true).is_ok() {
363-
return Poll::Ready(Ok(()));
364-
}
365-
366-
Poll::Ready(Err(h2_to_io_error(
367-
match ready!(self.send_stream.poll_reset(cx)) {
368-
Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
369-
Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
370-
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
371-
}
372-
Ok(reason) => reason.into(),
373-
Err(e) => e,
374-
},
375-
)))
376-
}
377-
}
378-
379-
fn h2_to_io_error(e: h2::Error) -> std::io::Error {
380-
if e.is_io() {
381-
e.into_io().unwrap()
382-
} else {
383-
std::io::Error::new(std::io::ErrorKind::Other, e)
384-
}
385-
}
386-
387-
struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
388-
389-
impl<B> UpgradedSendStream<B>
390-
where
391-
B: Buf,
392-
{
393-
unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
394-
assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
395-
Self(mem::transmute(inner))
396-
}
397-
398-
fn reserve_capacity(&mut self, cnt: usize) {
399-
unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
400-
}
401-
402-
fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
403-
unsafe { self.as_inner_unchecked().poll_capacity(cx) }
404-
}
405-
406-
fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
407-
unsafe { self.as_inner_unchecked().poll_reset(cx) }
408-
}
409-
410-
fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> {
411-
let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
412-
unsafe {
413-
self.as_inner_unchecked()
414-
.send_data(send_buf, end_of_stream)
415-
.map_err(h2_to_io_error)
416-
}
417-
}
418-
419-
unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
420-
&mut *(&mut self.0 as *mut _ as *mut _)
421-
}
422-
}
423-
424-
#[repr(transparent)]
425-
struct Neutered<B> {
426-
_inner: B,
427-
impossible: Impossible,
428-
}
429-
430-
enum Impossible {}
431-
432-
unsafe impl<B> Send for Neutered<B> {}
433-
434-
impl<B> Buf for Neutered<B> {
435-
fn remaining(&self) -> usize {
436-
match self.impossible {}
437-
}
438-
439-
fn chunk(&self) -> &[u8] {
440-
match self.impossible {}
441-
}
442-
443-
fn advance(&mut self, _cnt: usize) {
444-
match self.impossible {}
445-
}
446-
}

0 commit comments

Comments
 (0)