Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make h3 able to connect to nginx #242

Merged
merged 9 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 158 additions & 56 deletions h3/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
convert::TryFrom,
marker::PhantomData,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
task::{Context, Poll},
};
Expand All @@ -8,7 +9,7 @@ use bytes::{Buf, Bytes, BytesMut};
use futures_util::{future, ready};
use http::HeaderMap;
use stream::WriteBuf;
use tracing::{trace, warn};
use tracing::warn;

use crate::{
config::{Config, Settings},
Expand All @@ -21,7 +22,7 @@ use crate::{
varint::VarInt,
},
qpack,
quic::{self, SendStream as _},
quic::{self, SendStream},
stream::{self, AcceptRecvStream, AcceptedRecvStream, BufRecvStream, UniStreamHeader},
webtransport::SessionId,
};
Expand Down Expand Up @@ -107,7 +108,9 @@ where
pub conn: C,
control_send: C::SendStream,
control_recv: Option<FrameStream<C::RecvStream, B>>,
decoder_send: Option<C::SendStream>,
decoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
encoder_send: Option<C::SendStream>,
encoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
/// Buffers incoming uni/recv streams which have yet to be claimed.
///
Expand All @@ -134,15 +137,37 @@ where

got_peer_settings: bool,
pub send_grease_frame: bool,
// tells if the grease steam should be sent
send_grease_stream_flag: bool,
// step of the grease sending poll fn
grease_step: GreaseStatus<C::SendStream, B>,
pub config: Config,
}

enum GreaseStatus<S, B>
where
S: SendStream<B>,
B: Buf,
{
/// Grease stream is not started
NotStarted(PhantomData<B>),
/// Grease steam is started without data
Started(Option<S>),
/// Grease stream is started with data
DataPrepared(Option<S>),
/// Data is sent on grease stream
DataSent(S),
/// Grease stream is finished
Finished,
}

impl<B, C> ConnectionInner<C, B>
where
C: quic::Connection<B>,
B: Buf,
{
pub async fn send_settings(&mut self) -> Result<(), Error> {
/// Sends the settings and initializes the control streams
pub async fn send_control_stream_headers(&mut self) -> Result<(), Error> {
#[cfg(test)]
if !self.config.send_settings {
return Ok(());
Expand Down Expand Up @@ -178,14 +203,32 @@ where
//# Endpoints MUST NOT require any data to be received from
//# the peer prior to sending the SETTINGS frame; settings MUST be sent
//# as soon as the transport is ready to send data.
trace!("Sending Settings frame: {:#x?}", settings);
stream::write(
&mut self.control_send,
WriteBuf::from(UniStreamHeader::Control(settings)),

let mut decoder_send = Option::take(&mut self.decoder_send);
let mut encoder_send = Option::take(&mut self.encoder_send);

let (control, ..) = future::join3(
stream::write(
&mut self.control_send,
WriteBuf::from(UniStreamHeader::Control(settings)),
),
async {
if let Some(stream) = &mut decoder_send {
let _ = stream::write(stream, WriteBuf::from(UniStreamHeader::Decoder)).await;
}
},
async {
if let Some(stream) = &mut encoder_send {
let _ = stream::write(stream, WriteBuf::from(UniStreamHeader::Encoder)).await;
}
},
)
.await?;
.await;

Ok(())
self.decoder_send = decoder_send;
self.encoder_send = encoder_send;

control
}

/// Initiates the connection and opens a control stream
Expand All @@ -194,10 +237,26 @@ where
//# Endpoints SHOULD create the HTTP control stream as well as the
//# unidirectional streams required by mandatory extensions (such as the
//# QPACK encoder and decoder streams) first, and then create additional
//# streams as allowed by their peer.
let control_send = future::poll_fn(|cx| conn.poll_open_send(cx))
.await
.map_err(|e| Code::H3_STREAM_CREATION_ERROR.with_transport(e))?;

// start streams
let (control_send, qpack_encoder, qpack_decoder) = (
future::poll_fn(|cx| conn.poll_open_send(cx)).await,
future::poll_fn(|cx| conn.poll_open_send(cx)).await,
future::poll_fn(|cx| conn.poll_open_send(cx)).await,
);

let control_send =
control_send.map_err(|e| Code::H3_STREAM_CREATION_ERROR.with_transport(e))?;

let qpack_encoder = match qpack_encoder {
Ok(stream) => Some(stream),
Err(_) => None,
};

let qpack_decoder = match qpack_decoder {
Ok(stream) => Some(stream),
Err(_) => None,
};

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.1
//= type=implication
Expand All @@ -216,20 +275,14 @@ where
send_grease_frame: config.send_grease,
config,
accepted_streams: Default::default(),
decoder_send: qpack_decoder,
encoder_send: qpack_encoder,
// send grease stream if configured
send_grease_stream_flag: config.send_grease,
// start at first step
grease_step: GreaseStatus::NotStarted(PhantomData),
};

conn_inner.send_settings().await?;

// start a grease stream
if config.send_grease {
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.8
//= type=implication
//# Frame types of the format 0x1f * N + 0x21 for non-negative integer
//# values of N are reserved to exercise the requirement that unknown
//# types be ignored (Section 9). These frames have no semantics, and
//# they MAY be sent on any stream where frames are allowed to be sent.
conn_inner.start_grease_stream().await;
}
conn_inner.send_control_stream_headers().await?;

Ok(conn_inner)
}
Expand Down Expand Up @@ -479,6 +532,17 @@ where
}
}
};

if self.send_grease_stream_flag {
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.8
//= type=implication
//# Frame types of the format 0x1f * N + 0x21 for non-negative integer
//# values of N are reserved to exercise the requirement that unknown
//# types be ignored (Section 9). These frames have no semantics, and
//# they MAY be sent on any stream where frames are allowed to be sent.
ready!(self.poll_grease_stream(cx));
}

Poll::Ready(res)
}

Expand Down Expand Up @@ -533,50 +597,88 @@ where
code.with_reason(reason.as_ref(), crate::error::ErrorLevel::ConnectionError)
}

/// starts an grease stream
/// https://www.rfc-editor.org/rfc/rfc9114.html#stream-grease
async fn start_grease_stream(&mut self) {
// start the stream
let mut grease_stream = match future::poll_fn(|cx| self.conn.poll_open_send(cx))
.await
.map_err(|e| Code::H3_STREAM_CREATION_ERROR.with_transport(e))
{
Err(err) => {
warn!("grease stream creation failed with {}", err);
return;
}
Ok(grease) => grease,
// start grease stream and send data
fn poll_grease_stream(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if matches!(self.grease_step, GreaseStatus::NotStarted(_)) {
self.grease_step = match self.conn.poll_open_send(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => {
// could not create grease stream
// dont try again
self.send_grease_stream_flag = false;
warn!("grease stream creation failed with");
return Poll::Ready(());
}
Poll::Ready(Ok(stream)) => GreaseStatus::Started(Some(stream)),
};
};

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.3
//# Stream types of the format 0x1f * N + 0x21 for non-negative integer
//# values of N are reserved to exercise the requirement that unknown
//# types be ignored. These streams have no semantics, and they can be
//# sent when application-layer padding is desired. They MAY also be
//# sent on connections where no data is currently being transferred.
match stream::write(&mut grease_stream, (StreamType::grease(), Frame::Grease)).await {
Ok(()) => (),
Err(err) => {
warn!("write data on grease stream failed with {}", err);
return;
if let GreaseStatus::Started(stream) = &mut self.grease_step {
if let Some(stream) = stream {
if stream
.send_data((StreamType::grease(), Frame::Grease))
.is_err()
{
self.send_grease_stream_flag = false;
warn!("write data on grease stream failed with");
return Poll::Ready(());
};
}
}
self.grease_step = GreaseStatus::DataPrepared(stream.take());
};

if let GreaseStatus::DataPrepared(stream) = &mut self.grease_step {
if let Some(stream) = stream {
match stream.poll_ready(cx) {
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => {
// could not write grease frame
// dont try again
self.send_grease_stream_flag = false;
warn!("write data on grease stream failed with");
return Poll::Ready(());
}
};
}
self.grease_step = GreaseStatus::DataSent(match stream.take() {
Some(stream) => stream,
None => {
// this should never happen
self.send_grease_stream_flag = false;
return Poll::Ready(());
}
});
};

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.3
//# When sending a reserved stream type,
//# the implementation MAY either terminate the stream cleanly or reset
//# it.

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.3
//# When resetting the stream, either the H3_NO_ERROR error code or
//# a reserved error code (Section 8.1) SHOULD be used.
match future::poll_fn(|cx| grease_stream.poll_finish(cx))
.await
.map_err(|e| Code::H3_NO_ERROR.with_transport(e))
{
Ok(()) => (),
Err(err) => warn!("grease stream error on close {}", err),
if let GreaseStatus::DataSent(stream) = &mut self.grease_step {
match stream.poll_finish(cx) {
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => {
// could not finish grease stream
// dont try again
self.send_grease_stream_flag = false;
warn!("finish grease stream failed with");
return Poll::Ready(());
}
};
self.grease_step = GreaseStatus::Finished;
};

// grease stream is closed
// dont do another one
self.send_grease_stream_flag = false;
Poll::Ready(())
}

#[allow(missing_docs)]
Expand Down
2 changes: 1 addition & 1 deletion h3/src/proto/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl Settings {
//# H3_SETTINGS_ERROR.
settings.insert(identifier, value)?;
} else {
tracing::warn!("Unsupported setting: {:#x?}", identifier);
tracing::debug!("Unsupported setting: {:#x?}", identifier);
}
}
Ok(settings)
Expand Down
13 changes: 8 additions & 5 deletions h3/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ where
pub enum UniStreamHeader {
Control(Settings),
WebTransportUni(SessionId),
Encoder,
Decoder,
}

impl Encode for UniStreamHeader {
Expand All @@ -132,6 +134,12 @@ impl Encode for UniStreamHeader {
StreamType::WEBTRANSPORT_UNI.encode(buf);
session_id.encode(buf);
}
UniStreamHeader::Encoder => {
StreamType::ENCODER.encode(buf);
}
UniStreamHeader::Decoder => {
StreamType::DECODER.encode(buf);
}
}
}
}
Expand All @@ -154,17 +162,12 @@ where
}

pub enum BidiStreamHeader {
Control(Settings),
WebTransportBidi(SessionId),
}

impl Encode for BidiStreamHeader {
fn encode<B: BufMut>(&self, buf: &mut B) {
match self {
Self::Control(settings) => {
StreamType::CONTROL.encode(buf);
settings.encode(buf);
}
Self::WebTransportBidi(session_id) => {
StreamType::WEBTRANSPORT_BIDI.encode(buf);
session_id.encode(buf);
Expand Down
Loading