Skip to content

Commit

Permalink
sync h2 code with hyperium/h2 v0.4.8
Browse files Browse the repository at this point in the history
* Fix reclaiming reserved capacity (fixes <hyperium/h2#607>)
  by @nox in <hyperium/h2#832>
* Fix busy loop on shutdown by @seanmonstar
  in <hyperium/h2#834>
* Fix window size decrement of send-closed streams
  by @nox in <hyperium/h2#830>
* Fix handle implicit resets at the right time
  by @nox in <hyperium/h2#833>
* Fix poll_flush after poll_shutdown
  by @bdbai in <hyperium/h2#836>

Co-authored-by: Sean McArthur <[email protected]>
Co-authored-by: 包布丁 <[email protected]>
Co-authored-by: Anthony Ramine <[email protected]>
Co-authored-by: Samuel Tardieu <[email protected]>
  • Loading branch information
5 people committed Feb 20, 2025
1 parent 08dfc9c commit c271fe4
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 160 deletions.
2 changes: 1 addition & 1 deletion FORK.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ as a distant relative.

### hyperium

- <https://github.com/hyperium/h2/tree/v0.4.7>
- <https://github.com/hyperium/h2/tree/v0.4.8>
- <https://github.com/hyperium/hyper/tree/v1.5.2>
- <https://github.com/hyperium/hyper-util/tree/v0.1.10>

Expand Down
8 changes: 7 additions & 1 deletion rama-http-core/src/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,8 +1441,14 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.maybe_close_connection_if_no_streams();
let had_streams_or_refs = self.inner.has_streams_or_other_references();
let result = self.inner.poll(cx).map_err(Into::into);
if result.is_pending() && !self.inner.has_streams_or_other_references() {
// if we had streams/refs, and don't anymore, wake up one more time to
// ensure proper shutdown
if result.is_pending()
&& had_streams_or_refs
&& !self.inner.has_streams_or_other_references()
{
tracing::trace!("last stream closed during poll, wake again");
cx.waker().wake_by_ref();
}
Expand Down
7 changes: 6 additions & 1 deletion rama-http-core/src/h2/codec/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ macro_rules! limited_write_buf {
pub(super) struct FramedWrite<T, B> {
/// Upstream `AsyncWrite`
inner: T,
final_flush_done: bool,

encoder: Encoder<B>,
}
Expand Down Expand Up @@ -88,6 +89,7 @@ where
};
FramedWrite {
inner,
final_flush_done: false,
encoder: Encoder {
hpack: hpack::Encoder::default(),
buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)),
Expand Down Expand Up @@ -164,7 +166,10 @@ where

/// Close the codec
pub(super) fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
ready!(self.flush(cx))?;
if !self.final_flush_done {
ready!(self.flush(cx))?;
self.final_flush_done = true;
}
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rama-http-core/src/h2/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Counts {
}
}

if stream.is_counted {
if !stream.state.is_scheduled_reset() && stream.is_counted {
tracing::trace!("dec_num_streams; stream={:?}", stream.id);
// Decrement the number of active streams.
self.dec_num_streams(&mut stream);
Expand Down
22 changes: 15 additions & 7 deletions rama-http-core/src/h2/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,18 @@ impl Prioritize {
stream: &mut store::Ptr,
counts: &mut Counts,
) {
// only reclaim requested capacity that isn't already buffered
if stream.requested_send_capacity as usize > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
// only reclaim reserved capacity that isn't already buffered
if stream.send_flow.available().as_size() as usize > stream.buffered_send_data {
let reserved =
stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize;

// Panic safety: due to how `reserved` is computed it can't be greater
// than what's available.
stream
.send_flow
.claim_capacity(reserved)
.expect("window size should be greater than reserved");

// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(reserved);
debug_assert!(_res.is_ok());
self.assign_connection_capacity(reserved, stream, counts);
}
}
Expand Down Expand Up @@ -687,8 +692,11 @@ impl Prioritize {
}

pub(crate) fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_send.pop(store) {
while let Some(mut stream) = self.pending_send.pop(store) {
let is_pending_reset = stream.is_pending_reset_expiration();
if let Some(reason) = stream.state.get_scheduled_reset() {
stream.set_reset(reason, Initiator::Library);
}
counts.transition_after(stream, is_pending_reset);
}
}
Expand Down
10 changes: 10 additions & 0 deletions rama-http-core/src/h2/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,16 @@ impl Send {
store.try_for_each(|mut stream| {
let stream = &mut *stream;

if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
tracing::trace!(
"skipping send-closed stream; id={:?}; flow={:?}",
stream.id,
stream.send_flow
);

return Ok(());
}

tracing::trace!(
"decrementing stream window; id={:?}; decr={}; flow={:?}",
stream.id,
Expand Down
5 changes: 1 addition & 4 deletions rama-http-core/src/h2/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,7 @@ impl<B> StreamRef<B> {

let mut stream = me.store.resolve(self.opaque.key);

me.actions
.send
.poll_reset(cx, &mut stream, mode)
.map_err(From::from)
me.actions.send.poll_reset(cx, &mut stream, mode)
}

pub(crate) fn clone_to_opaque(&self) -> OpaqueStreamRef {
Expand Down
5 changes: 5 additions & 0 deletions rama-http-core/tests/h2-support/src/frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ impl Mock<frame::Settings> {
self
}

pub fn max_frame_size(mut self, val: u32) -> Self {
self.0.set_max_frame_size(Some(val));
self
}

pub fn initial_window_size(mut self, val: u32) -> Self {
self.0.set_initial_window_size(Some(val));
self
Expand Down
Loading

0 comments on commit c271fe4

Please sign in to comment.