Skip to content

Commit

Permalink
add tracing instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruben2424 committed Jun 21, 2024
1 parent 983c853 commit 98db57d
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 5 deletions.
1 change: 1 addition & 0 deletions h3-quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ quinn = { version = "0.11", default-features = false, features = [
tokio-util = { version = "0.7.9" }
futures = { version = "0.3.28" }
tokio = { version = "1", features = ["io-util"], default-features = false }
tracing = "0.1.40"
20 changes: 20 additions & 0 deletions h3-quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use h3::{
quic::{self, Error, StreamId, WriteBuf},
};
use tokio_util::sync::ReusableBoxFuture;
use tracing::instrument;

/// A QUIC connection backed by Quinn
///
Expand Down Expand Up @@ -155,6 +156,7 @@ where
type OpenStreams = OpenStreams;
type AcceptError = ConnectionError;

#[instrument(skip_all)]
fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -169,6 +171,7 @@ where
})))
}

#[instrument(skip_all)]
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
Expand Down Expand Up @@ -197,6 +200,7 @@ where
type BidiStream = BidiStream<B>;
type OpenError = ConnectionError;

#[instrument(skip_all)]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -215,6 +219,7 @@ where
}))
}

#[instrument(skip_all)]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -229,6 +234,7 @@ where
Poll::Ready(Ok(Self::SendStream::new(send)))
}

#[instrument(skip_all)]
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
self.conn.close(
VarInt::from_u64(code.value()).expect("error code VarInt"),
Expand All @@ -243,6 +249,7 @@ where
{
type Error = SendDatagramError;

#[instrument(skip_all)]
fn send_datagram(&mut self, data: Datagram<B>) -> Result<(), SendDatagramError> {
// TODO investigate static buffer from known max datagram size
let mut buf = BytesMut::new();
Expand All @@ -259,6 +266,7 @@ impl quic::RecvDatagramExt for Connection {
type Error = ConnectionError;

#[inline]
#[instrument(skip_all)]
fn poll_accept_datagram(
&mut self,
cx: &mut task::Context<'_>,
Expand Down Expand Up @@ -289,6 +297,7 @@ where
type BidiStream = BidiStream<B>;
type OpenError = ConnectionError;

#[instrument(skip_all)]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -307,6 +316,7 @@ where
}))
}

#[instrument(skip_all)]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -321,6 +331,7 @@ where
Poll::Ready(Ok(Self::SendStream::new(send)))
}

#[instrument(skip_all)]
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
self.conn.close(
VarInt::from_u64(code.value()).expect("error code VarInt"),
Expand Down Expand Up @@ -452,6 +463,7 @@ impl quic::RecvStream for RecvStream {
type Buf = Bytes;
type Error = ReadError;

#[instrument(skip_all)]
fn poll_data(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -468,6 +480,7 @@ impl quic::RecvStream for RecvStream {
Poll::Ready(Ok(chunk?.map(|c| c.bytes)))
}

#[instrument(skip_all)]
fn stop_sending(&mut self, error_code: u64) {
self.stream
.as_mut()
Expand All @@ -476,6 +489,7 @@ impl quic::RecvStream for RecvStream {
.ok();
}

#[instrument(skip_all)]
fn recv_id(&self) -> StreamId {
self.stream
.as_ref()
Expand Down Expand Up @@ -573,6 +587,7 @@ where
{
type Error = SendStreamError;

#[instrument(skip_all)]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(ref mut data) = self.writing {
while data.has_remaining() {
Expand All @@ -598,10 +613,12 @@ where
Poll::Ready(Ok(()))
}

#[instrument(skip_all)]
fn poll_finish(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(self.stream.as_mut().unwrap().finish().map_err(|e| e.into()))
}

#[instrument(skip_all)]
fn reset(&mut self, reset_code: u64) {
let _ = self
.stream
Expand All @@ -610,6 +627,7 @@ where
.reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX));
}

#[instrument(skip_all)]
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.writing.is_some() {
return Err(Self::Error::NotReady);
Expand All @@ -618,6 +636,7 @@ where
Ok(())
}

#[instrument(skip_all)]
fn send_id(&self) -> StreamId {
self.stream
.as_ref()
Expand All @@ -633,6 +652,7 @@ impl<B> quic::SendStreamUnframed<B> for SendStream<B>
where
B: Buf,
{
#[instrument(skip_all)]
fn poll_send<D: Buf>(
&mut self,
cx: &mut task::Context<'_>,
Expand Down
6 changes: 5 additions & 1 deletion h3/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use bytes::{Buf, BytesMut};
use futures_util::future;
use http::request;
use tracing::{info, trace};
use tracing::{info, instrument, trace};

use crate::{
connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
Expand Down Expand Up @@ -121,6 +121,7 @@ where
B: Buf,
{
/// Send an HTTP/3 request to the server
#[instrument(skip_all)]
pub async fn send_request(
&mut self,
req: http::Request<()>,
Expand Down Expand Up @@ -346,17 +347,20 @@ where
B: Buf,
{
/// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes
#[instrument(skip_all)]
pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), Error> {
// TODO: Calculate remaining pushes once server push is implemented.
self.inner.shutdown(&mut self.sent_closing, PushId(0)).await
}

/// Wait until the connection is closed
#[instrument(skip_all)]
pub async fn wait_idle(&mut self) -> Result<(), Error> {
future::poll_fn(|cx| self.poll_close(cx)).await
}

/// Maintain the connection state until it is closed
#[instrument(skip_all)]
pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
while let Poll::Ready(result) = self.inner.poll_control(cx) {
match result {
Expand Down
8 changes: 8 additions & 0 deletions h3/src/client/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::Buf;
use futures_util::future;
use http::{HeaderMap, Response};
use tracing::instrument;

use crate::{
connection::{self, ConnectionState, SharedStateRef},
Expand Down Expand Up @@ -82,6 +83,7 @@ where
/// This should be called before trying to receive any data with [`recv_data()`].
///
/// [`recv_data()`]: #method.recv_data
#[instrument(skip_all)]
pub async fn recv_response(&mut self) -> Result<Response<()>, Error> {
let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx))
.await
Expand Down Expand Up @@ -141,11 +143,13 @@ where

/// Receive some of the request body.
// TODO what if called before recv_response ?
#[instrument(skip_all)]
pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
self.inner.recv_data().await
}

/// Receive an optional set of trailers for the response.
#[instrument(skip_all)]
pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
let res = self.inner.recv_trailers().await;
if let Err(ref e) = res {
Expand All @@ -157,6 +161,7 @@ where
}

/// Tell the peer to stop sending into the underlying QUIC stream
#[instrument(skip_all)]
pub fn stop_sending(&mut self, error_code: crate::error::Code) {
// TODO take by value to prevent any further call as this request is cancelled
// rename `cancel()` ?
Expand All @@ -170,6 +175,7 @@ where
B: Buf,
{
/// Send some data on the request body.
#[instrument(skip_all)]
pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
self.inner.send_data(buf).await
}
Expand All @@ -179,6 +185,7 @@ where
/// Either [`RequestStream::finish`] or
/// [`RequestStream::send_trailers`] must be called to finalize a
/// request.
#[instrument(skip_all)]
pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
self.inner.send_trailers(trailers).await
}
Expand All @@ -188,6 +195,7 @@ where
/// Either [`RequestStream::finish`] or
/// [`RequestStream::send_trailers`] must be called to finalize a
/// request.
#[instrument(skip_all)]
pub async fn finish(&mut self) -> Result<(), Error> {
self.inner.finish().await
}
Expand Down
Loading

0 comments on commit 98db57d

Please sign in to comment.