From 98db57d3e29f61f9955be1c98da0430b3aee8324 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 21 Jun 2024 21:12:37 +0200 Subject: [PATCH] add tracing instrumentation --- h3-quinn/Cargo.toml | 1 + h3-quinn/src/lib.rs | 20 ++++++++++++++++++++ h3/src/client/connection.rs | 6 +++++- h3/src/client/stream.rs | 8 ++++++++ h3/src/connection.rs | 23 +++++++++++++++++++++-- h3/src/server/connection.rs | 14 +++++++++++++- h3/src/server/request.rs | 2 ++ h3/src/server/stream.rs | 6 +++++- 8 files changed, 75 insertions(+), 5 deletions(-) diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml index aade3fdb..1b5f66fa 100644 --- a/h3-quinn/Cargo.toml +++ b/h3-quinn/Cargo.toml @@ -21,3 +21,4 @@ quinn = { version = "0.11", default-features = false, features = [ tokio-util = { version = "0.7.9" } futures = { version = "0.3.28" } tokio = { version = "1", features = ["io-util"], default-features = false } +tracing = "0.1.40" diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 7819b383..c0e791cd 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -27,6 +27,7 @@ use h3::{ quic::{self, Error, StreamId, WriteBuf}, }; use tokio_util::sync::ReusableBoxFuture; +use tracing::instrument; /// A QUIC connection backed by Quinn /// @@ -155,6 +156,7 @@ where type OpenStreams = OpenStreams; type AcceptError = ConnectionError; + #[instrument(skip_all)] fn poll_accept_bidi( &mut self, cx: &mut task::Context<'_>, @@ -169,6 +171,7 @@ where }))) } + #[instrument(skip_all)] fn poll_accept_recv( &mut self, cx: &mut task::Context<'_>, @@ -197,6 +200,7 @@ where type BidiStream = BidiStream; type OpenError = ConnectionError; + #[instrument(skip_all)] fn poll_open_bidi( &mut self, cx: &mut task::Context<'_>, @@ -215,6 +219,7 @@ where })) } + #[instrument(skip_all)] fn poll_open_send( &mut self, cx: &mut task::Context<'_>, @@ -229,6 +234,7 @@ where Poll::Ready(Ok(Self::SendStream::new(send))) } + #[instrument(skip_all)] fn close(&mut self, code: h3::error::Code, reason: &[u8]) { self.conn.close( VarInt::from_u64(code.value()).expect("error code VarInt"), @@ -243,6 +249,7 @@ where { type Error = SendDatagramError; + #[instrument(skip_all)] fn send_datagram(&mut self, data: Datagram) -> Result<(), SendDatagramError> { // TODO investigate static buffer from known max datagram size let mut buf = BytesMut::new(); @@ -259,6 +266,7 @@ impl quic::RecvDatagramExt for Connection { type Error = ConnectionError; #[inline] + #[instrument(skip_all)] fn poll_accept_datagram( &mut self, cx: &mut task::Context<'_>, @@ -289,6 +297,7 @@ where type BidiStream = BidiStream; type OpenError = ConnectionError; + #[instrument(skip_all)] fn poll_open_bidi( &mut self, cx: &mut task::Context<'_>, @@ -307,6 +316,7 @@ where })) } + #[instrument(skip_all)] fn poll_open_send( &mut self, cx: &mut task::Context<'_>, @@ -321,6 +331,7 @@ where Poll::Ready(Ok(Self::SendStream::new(send))) } + #[instrument(skip_all)] fn close(&mut self, code: h3::error::Code, reason: &[u8]) { self.conn.close( VarInt::from_u64(code.value()).expect("error code VarInt"), @@ -452,6 +463,7 @@ impl quic::RecvStream for RecvStream { type Buf = Bytes; type Error = ReadError; + #[instrument(skip_all)] fn poll_data( &mut self, cx: &mut task::Context<'_>, @@ -468,6 +480,7 @@ impl quic::RecvStream for RecvStream { Poll::Ready(Ok(chunk?.map(|c| c.bytes))) } + #[instrument(skip_all)] fn stop_sending(&mut self, error_code: u64) { self.stream .as_mut() @@ -476,6 +489,7 @@ impl quic::RecvStream for RecvStream { .ok(); } + #[instrument(skip_all)] fn recv_id(&self) -> StreamId { self.stream .as_ref() @@ -573,6 +587,7 @@ where { type Error = SendStreamError; + #[instrument(skip_all)] fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { if let Some(ref mut data) = self.writing { while data.has_remaining() { @@ -598,10 +613,12 @@ where Poll::Ready(Ok(())) } + #[instrument(skip_all)] fn poll_finish(&mut self, _cx: &mut task::Context<'_>) -> Poll> { Poll::Ready(self.stream.as_mut().unwrap().finish().map_err(|e| e.into())) } + #[instrument(skip_all)] fn reset(&mut self, reset_code: u64) { let _ = self .stream @@ -610,6 +627,7 @@ where .reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX)); } + #[instrument(skip_all)] fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { if self.writing.is_some() { return Err(Self::Error::NotReady); @@ -618,6 +636,7 @@ where Ok(()) } + #[instrument(skip_all)] fn send_id(&self) -> StreamId { self.stream .as_ref() @@ -633,6 +652,7 @@ impl quic::SendStreamUnframed for SendStream where B: Buf, { + #[instrument(skip_all)] fn poll_send( &mut self, cx: &mut task::Context<'_>, diff --git a/h3/src/client/connection.rs b/h3/src/client/connection.rs index 7fb591e9..6b095752 100644 --- a/h3/src/client/connection.rs +++ b/h3/src/client/connection.rs @@ -9,7 +9,7 @@ use std::{ use bytes::{Buf, BytesMut}; use futures_util::future; use http::request; -use tracing::{info, trace}; +use tracing::{info, instrument, trace}; use crate::{ connection::{self, ConnectionInner, ConnectionState, SharedStateRef}, @@ -121,6 +121,7 @@ where B: Buf, { /// Send an HTTP/3 request to the server + #[instrument(skip_all)] pub async fn send_request( &mut self, req: http::Request<()>, @@ -346,17 +347,20 @@ where B: Buf, { /// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes + #[instrument(skip_all)] pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), Error> { // TODO: Calculate remaining pushes once server push is implemented. self.inner.shutdown(&mut self.sent_closing, PushId(0)).await } /// Wait until the connection is closed + #[instrument(skip_all)] pub async fn wait_idle(&mut self) -> Result<(), Error> { future::poll_fn(|cx| self.poll_close(cx)).await } /// Maintain the connection state until it is closed + #[instrument(skip_all)] pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { while let Poll::Ready(result) = self.inner.poll_control(cx) { match result { diff --git a/h3/src/client/stream.rs b/h3/src/client/stream.rs index e2b4b8e6..27a76d97 100644 --- a/h3/src/client/stream.rs +++ b/h3/src/client/stream.rs @@ -1,6 +1,7 @@ use bytes::Buf; use futures_util::future; use http::{HeaderMap, Response}; +use tracing::instrument; use crate::{ connection::{self, ConnectionState, SharedStateRef}, @@ -82,6 +83,7 @@ where /// This should be called before trying to receive any data with [`recv_data()`]. /// /// [`recv_data()`]: #method.recv_data + #[instrument(skip_all)] pub async fn recv_response(&mut self) -> Result, Error> { let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx)) .await @@ -141,11 +143,13 @@ where /// Receive some of the request body. // TODO what if called before recv_response ? + #[instrument(skip_all)] pub async fn recv_data(&mut self) -> Result, Error> { self.inner.recv_data().await } /// Receive an optional set of trailers for the response. + #[instrument(skip_all)] pub async fn recv_trailers(&mut self) -> Result, Error> { let res = self.inner.recv_trailers().await; if let Err(ref e) = res { @@ -157,6 +161,7 @@ where } /// Tell the peer to stop sending into the underlying QUIC stream + #[instrument(skip_all)] pub fn stop_sending(&mut self, error_code: crate::error::Code) { // TODO take by value to prevent any further call as this request is cancelled // rename `cancel()` ? @@ -170,6 +175,7 @@ where B: Buf, { /// Send some data on the request body. + #[instrument(skip_all)] pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { self.inner.send_data(buf).await } @@ -179,6 +185,7 @@ where /// Either [`RequestStream::finish`] or /// [`RequestStream::send_trailers`] must be called to finalize a /// request. + #[instrument(skip_all)] pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> { self.inner.send_trailers(trailers).await } @@ -188,6 +195,7 @@ where /// Either [`RequestStream::finish`] or /// [`RequestStream::send_trailers`] must be called to finalize a /// request. + #[instrument(skip_all)] pub async fn finish(&mut self) -> Result<(), Error> { self.inner.finish().await } diff --git a/h3/src/connection.rs b/h3/src/connection.rs index 5966eb87..09e78b85 100644 --- a/h3/src/connection.rs +++ b/h3/src/connection.rs @@ -9,7 +9,7 @@ use bytes::{Buf, Bytes, BytesMut}; use futures_util::{future, ready}; use http::HeaderMap; use stream::WriteBuf; -use tracing::warn; +use tracing::{instrument, warn}; use crate::{ config::{Config, Settings}, @@ -167,6 +167,7 @@ where B: Buf, { /// Sends the settings and initializes the control streams + #[instrument(skip_all)] pub async fn send_control_stream_headers(&mut self) -> Result<(), Error> { #[cfg(test)] if !self.config.send_settings { @@ -232,6 +233,7 @@ where } /// Initiates the connection and opens a control stream + #[instrument(skip_all)] pub async fn new(mut conn: C, shared: SharedStateRef, config: Config) -> Result { //= https://www.rfc-editor.org/rfc/rfc9114#section-6.2 //# Endpoints SHOULD create the HTTP control stream as well as the @@ -288,7 +290,7 @@ where } /// Send GOAWAY with specified max_id, iff max_id is smaller than the previous one. - + #[instrument(skip_all)] pub async fn shutdown( &mut self, sent_closing: &mut Option, @@ -317,6 +319,7 @@ where } #[allow(missing_docs)] + #[instrument(skip_all)] pub fn poll_accept_request( &mut self, cx: &mut Context<'_>, @@ -337,6 +340,7 @@ where /// Polls incoming streams /// /// Accepted streams which are not control, decoder, or encoder streams are buffer in `accepted_recv_streams` + #[instrument(skip_all)] pub fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Result<(), Error> { if let Some(ref e) = self.shared.read("poll_accept_request").error { return Err(e.clone()); @@ -425,6 +429,7 @@ where } /// Waits for the control stream to be received and reads subsequent frames. + #[instrument(skip_all)] pub fn poll_control(&mut self, cx: &mut Context<'_>) -> Poll, Error>> { if let Some(ref e) = self.shared.read("poll_accept_request").error { return Poll::Ready(Err(e.clone())); @@ -546,6 +551,7 @@ where Poll::Ready(res) } + #[instrument(skip_all)] pub(crate) fn process_goaway( &mut self, recv_closing: &mut Option, @@ -590,6 +596,7 @@ where /// Closes a Connection with code and reason. /// It returns an [`Error`] which can be returned. + #[instrument(skip_all)] pub fn close>(&mut self, code: Code, reason: T) -> Error { self.shared.write("connection close err").error = Some(code.with_reason(reason.as_ref(), crate::error::ErrorLevel::ConnectionError)); @@ -598,6 +605,7 @@ where } // start grease stream and send data + #[instrument(skip_all)] fn poll_grease_stream(&mut self, cx: &mut Context<'_>) -> Poll<()> { if matches!(self.grease_step, GreaseStatus::NotStarted(_)) { self.grease_step = match self.conn.poll_open_send(cx) { @@ -682,6 +690,7 @@ where } #[allow(missing_docs)] + #[instrument(skip_all)] pub fn accepted_streams_mut(&mut self) -> &mut AcceptedStreams { &mut self.accepted_streams } @@ -725,6 +734,7 @@ where S: quic::RecvStream, { /// Receive some of the request body. + #[instrument(skip_all)] pub fn poll_recv_data( &mut self, cx: &mut Context<'_>, @@ -773,12 +783,15 @@ where .poll_data(cx) .map_err(|e| self.maybe_conn_err(e)) } + /// Receive some of the request body. + #[instrument(skip_all)] pub async fn recv_data(&mut self) -> Result, Error> { future::poll_fn(|cx| self.poll_recv_data(cx)).await } /// Receive trailers + #[instrument(skip_all)] pub async fn recv_trailers(&mut self) -> Result, Error> { let mut trailers = if let Some(encoded) = self.trailers.take() { encoded @@ -847,6 +860,7 @@ where } #[allow(missing_docs)] + #[instrument(skip_all)] pub fn stop_sending(&mut self, err_code: Code) { self.stream.stop_sending(err_code); } @@ -858,6 +872,7 @@ where B: Buf, { /// Send some data on the response body. + #[instrument(skip_all)] pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { let frame = Frame::Data(buf); @@ -868,6 +883,7 @@ where } /// Send a set of trailers to end the request. + #[instrument(skip_all)] pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> { //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2 //= type=TODO @@ -898,11 +914,13 @@ where } /// Stops a stream with an error code + #[instrument(skip_all)] pub fn stop_stream(&mut self, code: Code) { self.stream.reset(code.into()); } #[allow(missing_docs)] + #[instrument(skip_all)] pub async fn finish(&mut self) -> Result<(), Error> { if self.send_grease_frame { // send a grease frame once per Connection @@ -923,6 +941,7 @@ where S: quic::BidiStream, B: Buf, { + #[instrument(skip_all)] pub(crate) fn split( self, ) -> ( diff --git a/h3/src/server/connection.rs b/h3/src/server/connection.rs index 88d2d851..70169c8c 100644 --- a/h3/src/server/connection.rs +++ b/h3/src/server/connection.rs @@ -37,7 +37,7 @@ use crate::{ use crate::server::request::ResolveRequest; -use tracing::{trace, warn}; +use tracing::{instrument, trace, warn}; use super::stream::{ReadDatagram, RequestStream}; @@ -89,11 +89,13 @@ where /// Use a custom [`super::builder::Builder`] with [`super::builder::builder()`] to create a connection /// with different settings. /// Provide a Connection which implements [`quic::Connection`]. + #[instrument(skip_all)] pub async fn new(conn: C) -> Result { super::builder::builder().build(conn).await } /// Closes the connection with a code and a reason. + #[instrument(skip_all)] pub fn close>(&mut self, code: Code, reason: T) -> Error { self.inner.close(code, reason) } @@ -109,6 +111,7 @@ where /// It returns a tuple with a [`http::Request`] and an [`RequestStream`]. /// The [`http::Request`] is the received request from the client. /// The [`RequestStream`] can be used to send the response. + #[instrument(skip_all)] pub async fn accept( &mut self, ) -> Result, RequestStream)>, Error> { @@ -154,6 +157,7 @@ where /// This is needed as a bidirectional stream may be read as part of incoming webtransport /// bi-streams. If it turns out that the stream is *not* a `WEBTRANSPORT_STREAM` the request /// may still want to be handled and passed to the user. + #[instrument(skip_all)] pub fn accept_with_frame( &mut self, mut stream: FrameStream, @@ -285,6 +289,7 @@ where /// Initiate a graceful shutdown, accepting `max_request` potentially still in-flight /// /// See [connection shutdown](https://www.rfc-editor.org/rfc/rfc9114.html#connection-shutdown) for more information. + #[instrument(skip_all)] pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> { let max_id = self .last_accepted_stream @@ -298,6 +303,7 @@ where /// /// This could be either a *Request* or a *WebTransportBiStream*, the first frame's type /// decides. + #[instrument(skip_all)] pub fn poll_accept_request( &mut self, cx: &mut Context<'_>, @@ -346,11 +352,13 @@ where } } + #[instrument(skip_all)] pub(crate) fn poll_control(&mut self, cx: &mut Context<'_>) -> Poll> { while (self.poll_next_control(cx)?).is_ready() {} Poll::Pending } + #[instrument(skip_all)] pub(crate) fn poll_next_control( &mut self, cx: &mut Context<'_>, @@ -391,6 +399,7 @@ where Poll::Ready(Ok(frame)) } + #[instrument(skip_all)] fn poll_requests_completion(&mut self, cx: &mut Context<'_>) -> Poll<()> { loop { match self.request_end_recv.poll_recv(cx) { @@ -420,6 +429,7 @@ where B: Buf, { /// Sends a datagram + #[instrument(skip_all)] pub fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error> { self.inner .conn @@ -436,6 +446,7 @@ where B: Buf, { /// Reads an incoming datagram + #[instrument(skip_all)] pub fn read_datagram(&mut self) -> ReadDatagram { ReadDatagram { conn: self, @@ -449,6 +460,7 @@ where C: quic::Connection, B: Buf, { + #[instrument(skip_all)] fn drop(&mut self) { self.inner.close(Code::H3_NO_ERROR, ""); } diff --git a/h3/src/server/request.rs b/h3/src/server/request.rs index 19c379d8..8743a555 100644 --- a/h3/src/server/request.rs +++ b/h3/src/server/request.rs @@ -2,6 +2,7 @@ use std::convert::TryFrom; use bytes::Buf; use http::{Request, StatusCode}; +use tracing::instrument; use crate::{error::Code, proto::headers::Header, qpack, quic, Error}; @@ -28,6 +29,7 @@ impl> ResolveRequest { } /// Finishes the resolution of the request + #[instrument(skip_all)] pub async fn resolve( mut self, ) -> Result<(Request<()>, RequestStream), Error> { diff --git a/h3/src/server/stream.rs b/h3/src/server/stream.rs index 610b83b6..a9725814 100644 --- a/h3/src/server/stream.rs +++ b/h3/src/server/stream.rs @@ -33,7 +33,7 @@ use crate::{ stream::{self}, }; -use tracing::error; +use tracing::{error, instrument}; /// Manage request and response transfer for an incoming request /// @@ -62,11 +62,13 @@ where B: Buf, { /// Receive data sent from the client + #[instrument(skip_all)] pub async fn recv_data(&mut self) -> Result, Error> { self.inner.recv_data().await } /// Poll for data sent from the client + #[instrument(skip_all)] pub fn poll_recv_data( &mut self, cx: &mut Context<'_>, @@ -75,11 +77,13 @@ where } /// Receive an optional set of trailers for the request + #[instrument(skip_all)] pub async fn recv_trailers(&mut self) -> Result, Error> { self.inner.recv_trailers().await } /// Tell the peer to stop sending into the underlying QUIC stream + #[instrument(skip_all)] pub fn stop_sending(&mut self, error_code: crate::error::Code) { self.inner.stream.stop_sending(error_code) }