From 764b8a648f0a904f4d7c0867e4482ffad188d537 Mon Sep 17 00:00:00 2001 From: Glen De Cauwsemaecker Date: Thu, 20 Feb 2025 20:14:15 +0100 Subject: [PATCH] sync rama-http-core code with hyperium/hyper v1.6.0 - ext: add ext::on_informational() callback extension () (, closes ) - server: add http1::Builder::ignore_invalid_headers(bool) option () () - server: - start http1 header read timeout when conn is idle () (, closes , ) - change max_local_error_reset_streams function to &mut self (#3820) (e981a91e) - http2::Builder::max_local_error_reset_streams() now takes &mut self and returns &mut Self. In practice, this shouldn't break almost anyone. It was the wrong receiver and return types. () Co-authored-by: Sean McArthur Co-authored-by: Finn Bear Co-authored-by: tottoto --- FORK.md | 2 +- LICENSE-MIT | 4 +- rama-http-core/src/error.rs | 3 + rama-http-core/src/ext/informational.rs | 86 +++++++++++++++++++++++++ rama-http-core/src/ext/mod.rs | 5 ++ rama-http-core/src/proto/h1/conn.rs | 22 ++++++- rama-http-core/src/proto/h1/io.rs | 2 + rama-http-core/src/proto/h1/mod.rs | 1 + rama-http-core/src/proto/h1/role.rs | 22 +++++++ rama-http-core/src/server/conn/http2.rs | 2 +- tests/http-core/client.rs | 40 ++++++++++++ tests/http-core/h2/flow_control.rs | 1 + tests/http-core/server.rs | 53 +++++++++++++-- 13 files changed, 234 insertions(+), 9 deletions(-) create mode 100644 rama-http-core/src/ext/informational.rs diff --git a/FORK.md b/FORK.md index c2b2f3c27..30194d91e 100644 --- a/FORK.md +++ b/FORK.md @@ -10,7 +10,7 @@ as a distant relative. ### hyperium - -- +- - ### tower-rs diff --git a/LICENSE-MIT b/LICENSE-MIT index bb87a3acb..03623e632 100644 --- a/LICENSE-MIT +++ b/LICENSE-MIT @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2023 - Glen Henri J. De Cauwsemaecker +Copyright (c) 2022-2025 Glen Henri J. De Cauwsemaecker Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file +SOFTWARE. diff --git a/rama-http-core/src/error.rs b/rama-http-core/src/error.rs index 01c9afa04..48c3eb3a1 100644 --- a/rama-http-core/src/error.rs +++ b/rama-http-core/src/error.rs @@ -167,6 +167,9 @@ impl Error { /// Returns true if the error was caused by a timeout. pub fn is_timeout(&self) -> bool { + if matches!(self.inner.kind, Kind::HeaderTimeout) { + return true; + } self.find_source::().is_some() } diff --git a/rama-http-core/src/ext/informational.rs b/rama-http-core/src/ext/informational.rs new file mode 100644 index 000000000..3b0cf9cac --- /dev/null +++ b/rama-http-core/src/ext/informational.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +#[derive(Clone)] +pub(crate) struct OnInformational(Arc); + +/// Add a callback for 1xx informational responses. +/// +/// # Example +/// +/// ``` +/// # let some_body = (); +/// let mut req = rama_http_types::Request::new(some_body); +/// +/// rama_http_core::ext::on_informational(&mut req, |res| { +/// println!("informational: {:?}", res.status()); +/// }); +/// +/// // send request on a client connection... +/// ``` +pub fn on_informational(req: &mut rama_http_types::Request, callback: F) +where + F: Fn(Response<'_>) + Send + Sync + 'static, +{ + on_informational_raw(req, OnInformationalClosure(callback)); +} + +pub(crate) fn on_informational_raw(req: &mut rama_http_types::Request, callback: C) +where + C: OnInformationalCallback + Send + Sync + 'static, +{ + req.extensions_mut() + .insert(OnInformational(Arc::new(callback))); +} + +// Sealed, not actually nameable bounds +pub(crate) trait OnInformationalCallback { + fn on_informational(&self, res: rama_http_types::Response<()>); +} + +impl OnInformational { + pub(crate) fn call(&self, res: rama_http_types::Response<()>) { + self.0.on_informational(res); + } +} + +struct OnInformationalClosure(F); + +impl OnInformationalCallback for OnInformationalClosure +where + F: Fn(Response<'_>) + Send + Sync + 'static, +{ + fn on_informational(&self, res: rama_http_types::Response<()>) { + let res = Response(&res); + (self.0)(res); + } +} + +// A facade over rama_http_types::Response. +// +// It purposefully hides being able to move the response out of the closure, +// while also not being able to expect it to be a reference `&Response`. +// (Otherwise, a closure can be written as `|res: &_|`, and then be broken if +// we make the closure take ownership.) +// +// With the type not being nameable, we could change from being a facade to +// being either a real reference, or moving the rama_http_types::Response into the closure, +// in a backwards-compatible change in the future. +#[derive(Debug)] +pub struct Response<'a>(&'a rama_http_types::Response<()>); + +impl Response<'_> { + #[inline] + pub fn status(&self) -> rama_http_types::StatusCode { + self.0.status() + } + + #[inline] + pub fn version(&self) -> rama_http_types::Version { + self.0.version() + } + + #[inline] + pub fn headers(&self) -> &rama_http_types::HeaderMap { + self.0.headers() + } +} diff --git a/rama-http-core/src/ext/mod.rs b/rama-http-core/src/ext/mod.rs index 6f3e55691..f475f90d1 100644 --- a/rama-http-core/src/ext/mod.rs +++ b/rama-http-core/src/ext/mod.rs @@ -5,6 +5,11 @@ use std::fmt; mod h1_reason_phrase; pub use h1_reason_phrase::ReasonPhrase; +mod informational; +pub use informational::on_informational; +pub(crate) use informational::OnInformational; +// pub(crate) use informational::{on_informational_raw, OnInformationalCallback}; // ffi feature in hyperium/hyper + /// Represents the `:protocol` pseudo-header used by /// the [Extended CONNECT Protocol]. /// diff --git a/rama-http-core/src/proto/h1/conn.rs b/rama-http-core/src/proto/h1/conn.rs index f0606840b..ce72c1d9d 100644 --- a/rama-http-core/src/proto/h1/conn.rs +++ b/rama-http-core/src/proto/h1/conn.rs @@ -59,6 +59,7 @@ where date_header: true, title_case_headers: false, h09_responses: false, + on_informational: None, notify_read: false, reading: Reading::Init, writing: Writing::Init, @@ -203,6 +204,7 @@ where h1_parser_config: self.state.h1_parser_config.clone(), h1_max_headers: self.state.h1_max_headers, h09_responses: self.state.h09_responses, + on_informational: &mut self.state.on_informational, }, ) { Poll::Ready(Ok(msg)) => msg, @@ -236,6 +238,9 @@ where // Prevent accepting HTTP/0.9 responses after the initial one, if any. self.state.h09_responses = false; + // Drop any OnInformational callbacks, we're done there! + self.state.on_informational = None; + self.state.busy(); self.state.keep_alive &= msg.keep_alive; self.state.version = msg.head.version; @@ -579,7 +584,11 @@ where }, buf, ) { - Ok(encoder) => Some(encoder), + Ok(encoder) => { + self.state.on_informational = + head.extensions.remove::(); + Some(encoder) + } Err(err) => { self.state.error = Some(err); self.state.writing = Writing::Closed; @@ -861,6 +870,10 @@ struct State { date_header: bool, title_case_headers: bool, h09_responses: bool, + /// If set, called with each 1xx informational response received for + /// the current request. MUST be unset after a non-1xx response is + /// received. + on_informational: Option, /// Set to true when the Dispatcher should poll read operations /// again. See the `maybe_notify` method for more. notify_read: bool, @@ -1039,6 +1052,13 @@ impl State { if !T::should_read_first() { self.notify_read = true; } + + if self.h1_header_read_timeout.is_some() { + // Next read will start and poll the header read timeout, + // so we can close the connection if another header isn't + // received in a timely manner. + self.notify_read = true; + } } fn is_idle(&self) -> bool { diff --git a/rama-http-core/src/proto/h1/io.rs b/rama-http-core/src/proto/h1/io.rs index be10bf2f5..bbbfca4eb 100644 --- a/rama-http-core/src/proto/h1/io.rs +++ b/rama-http-core/src/proto/h1/io.rs @@ -180,6 +180,7 @@ where h1_parser_config: parse_ctx.h1_parser_config.clone(), h1_max_headers: parse_ctx.h1_max_headers, h09_responses: parse_ctx.h09_responses, + on_informational: parse_ctx.on_informational, }, )? { Some(msg) => { @@ -690,6 +691,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }; assert!(buffered .parse::(cx, parse_ctx) diff --git a/rama-http-core/src/proto/h1/mod.rs b/rama-http-core/src/proto/h1/mod.rs index a13d81c3d..c70285422 100644 --- a/rama-http-core/src/proto/h1/mod.rs +++ b/rama-http-core/src/proto/h1/mod.rs @@ -68,6 +68,7 @@ pub(crate) struct ParseContext<'a> { h1_parser_config: ParserConfig, h1_max_headers: Option, h09_responses: bool, + on_informational: &'a mut Option, } struct EncodeHead<'a, S> { diff --git a/rama-http-core/src/proto/h1/role.rs b/rama-http-core/src/proto/h1/role.rs index 865ec4c28..8be450719 100644 --- a/rama-http-core/src/proto/h1/role.rs +++ b/rama-http-core/src/proto/h1/role.rs @@ -956,6 +956,12 @@ impl Http1Transaction for Client { })); } + if head.subject.is_informational() { + if let Some(callback) = ctx.on_informational { + callback.call(head.into_response(())); + } + } + // Parsing a 1xx response could have consumed the buffer, check if // it is empty now... if buf.is_empty() { @@ -1430,6 +1436,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }, ) .unwrap() @@ -1451,6 +1458,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }; let msg = Client::parse(&mut raw, ctx).unwrap().unwrap(); assert_eq!(raw.len(), 0); @@ -1468,6 +1476,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }; Server::parse(&mut raw, ctx).unwrap_err(); } @@ -1482,6 +1491,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: true, + on_informational: &mut None, }; let msg = Client::parse(&mut raw, ctx).unwrap().unwrap(); assert_eq!(raw, H09_RESPONSE); @@ -1498,6 +1508,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }; Client::parse(&mut raw, ctx).unwrap_err(); assert_eq!(raw, H09_RESPONSE); @@ -1518,6 +1529,7 @@ mod tests { h1_parser_config, h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }; let msg = Client::parse(&mut raw, ctx).unwrap().unwrap(); assert_eq!(raw.len(), 0); @@ -1535,6 +1547,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }; Client::parse(&mut raw, ctx).unwrap_err(); } @@ -1548,6 +1561,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }; let parsed_message = Server::parse(&mut raw, ctx).unwrap().unwrap(); let mut orig_headers = parsed_message @@ -1572,6 +1586,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }, ) .expect("parse ok") @@ -1587,6 +1602,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }, ) .expect_err(comment) @@ -1811,6 +1827,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, } ) .expect("parse ok") @@ -1826,6 +1843,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }, ) .expect("parse ok") @@ -1841,6 +1859,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }, ) .expect_err("parse should err") @@ -2433,6 +2452,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: None, h09_responses: false, + on_informational: &mut None, }, ) .expect("parse ok") @@ -2470,6 +2490,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: max_headers, h09_responses: false, + on_informational: &mut None, }, ); if should_success { @@ -2488,6 +2509,7 @@ mod tests { h1_parser_config: Default::default(), h1_max_headers: max_headers, h09_responses: false, + on_informational: &mut None, }, ); if should_success { diff --git a/rama-http-core/src/server/conn/http2.rs b/rama-http-core/src/server/conn/http2.rs index d998d1c65..028343c7c 100644 --- a/rama-http-core/src/server/conn/http2.rs +++ b/rama-http-core/src/server/conn/http2.rs @@ -121,7 +121,7 @@ impl Builder { /// This is not advised, as it can potentially expose servers to DOS vulnerabilities. /// /// See for more information. - pub fn max_local_error_reset_streams(mut self, max: impl Into>) -> Self { + pub fn max_local_error_reset_streams(&mut self, max: impl Into>) -> &mut Self { self.h2_builder.max_local_error_reset_streams = max.into(); self } diff --git a/tests/http-core/client.rs b/tests/http-core/client.rs index 28a8bfb34..4d8f013c0 100644 --- a/tests/http-core/client.rs +++ b/tests/http-core/client.rs @@ -2090,6 +2090,46 @@ mod conn { let _res = client.send_request(req).await.expect("send_request"); } + #[tokio::test] + async fn client_on_informational_ext() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + let (server, addr) = setup_std_test_server(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + let _ = sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 100 Continue\r\n\r\n").unwrap(); + sock.write_all(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n") + .unwrap(); + }); + + let tcp = tcp_connect(&addr).await.unwrap(); + + let (client, conn) = conn::http1::handshake(tcp).await.unwrap(); + + tokio::spawn(async move { + let _ = conn.await; + }); + + let mut req = Request::builder() + .uri("/a") + .body(Empty::::new()) + .unwrap(); + let cnt = Arc::new(AtomicUsize::new(0)); + let cnt2 = cnt.clone(); + rama_http_core::ext::on_informational(&mut req, move |res| { + assert_eq!(res.status(), 100); + cnt2.fetch_add(1, Ordering::Relaxed); + }); + let _res = client.send_request(req).await.expect("send_request"); + assert_eq!(1, cnt.load(Ordering::Relaxed)); + } + #[tokio::test] async fn test_try_send_request() { use std::future::Future; diff --git a/tests/http-core/h2/flow_control.rs b/tests/http-core/h2/flow_control.rs index 1c4272ca6..14eefdde1 100644 --- a/tests/http-core/h2/flow_control.rs +++ b/tests/http-core/h2/flow_control.rs @@ -601,6 +601,7 @@ async fn recv_window_update_on_stream_closed_by_data_frame() { // i know this is kind of evil, but it's necessary to // ensure that the stream is closed by the EOS frame, // and not by the RST_STREAM. + #[allow(clippy::mem_forget)] std::mem::forget(stream); // Wait for the connection to close diff --git a/tests/http-core/server.rs b/tests/http-core/server.rs index d0076dc31..a6d9c40e1 100644 --- a/tests/http-core/server.rs +++ b/tests/http-core/server.rs @@ -1494,7 +1494,6 @@ async fn header_read_timeout_slow_writes() { tcp.write_all( b"\ Something: 1\r\n\ - \r\n\ ", ) .expect("write 2"); @@ -1502,6 +1501,7 @@ async fn header_read_timeout_slow_writes() { tcp.write_all( b"\ Works: 0\r\n\ + \r\n ", ) .expect_err("write 3"); @@ -1545,7 +1545,7 @@ async fn header_read_timeout_starts_immediately() { socket, RamaHttpService::new(rama::Context::default(), unreachable_service()), ); - conn.await.expect_err("header timeout"); + assert!(conn.await.unwrap_err().is_timeout()); } #[tokio::test] @@ -1593,7 +1593,6 @@ async fn header_read_timeout_slow_writes_multiple_requests() { b"\ GET / HTTP/1.1\r\n\ Something: 1\r\n\ - \r\n\ ", ) .expect("write 5"); @@ -1601,6 +1600,7 @@ async fn header_read_timeout_slow_writes_multiple_requests() { tcp.write_all( b"\ Works: 0\r\n\ + \r\n\ ", ) .expect_err("write 6"); @@ -1622,7 +1622,52 @@ async fn header_read_timeout_slow_writes_multiple_requests() { }), ), ); - conn.without_shutdown().await.expect_err("header timeout"); + assert!(conn.without_shutdown().await.unwrap_err().is_timeout()); +} + +#[tokio::test] +async fn header_read_timeout_as_idle_timeout() { + let (listener, addr) = setup_tcp_listener(); + + thread::spawn(move || { + let mut tcp = connect(&addr); + + tcp.write_all( + b"\ + GET / HTTP/1.1\r\n\ + \r\n\ + ", + ) + .expect("request 1"); + + thread::sleep(Duration::from_secs(6)); + + tcp.write_all( + b"\ + GET / HTTP/1.1\r\n\ + \r\n\ + ", + ) + .expect_err("request 2"); + }); + + let (socket, _) = listener.accept().await.unwrap(); + let conn = http1::Builder::new() + .header_read_timeout(Duration::from_secs(3)) + .serve_connection( + socket, + RamaHttpService::new( + rama::Context::default(), + service_fn(|_| { + let res = Response::builder() + .status(200) + .body(Empty::::new()) + .unwrap(); + future::ready(Ok::<_, Infallible>(res)) + }), + ), + ); + assert!(conn.without_shutdown().await.unwrap_err().is_timeout()); } #[tokio::test]