Skip to content

Commit 1143dc9

Browse files
committed
feat(s2n-quic-dc): add channel recv buffer impl
1 parent a9e7673 commit 1143dc9

File tree

7 files changed

+124
-4
lines changed

7 files changed

+124
-4
lines changed

dc/s2n-quic-dc/src/stream/client/tokio.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -161,5 +161,6 @@ where
161161
#[inline]
162162
fn recv_buffer() -> recv::shared::RecvBuffer {
163163
// TODO replace this with a parameter once everything is in place
164-
recv::buffer::Local::new(msg::recv::Message::new(9000), None)
164+
let recv_buffer = recv::buffer::Local::new(msg::recv::Message::new(9000), None);
165+
recv::buffer::Either::A(recv_buffer)
165166
}

dc/s2n-quic-dc/src/stream/recv/buffer.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ use crate::{
88
use core::task::{Context, Poll};
99
use std::io;
1010

11+
pub mod channel;
1112
mod dispatch;
1213
mod local;
1314

15+
pub use channel::Channel;
1416
pub use dispatch::Dispatch;
1517
pub use local::Local;
1618

@@ -36,7 +38,6 @@ pub trait Buffer {
3638
R: Dispatch;
3739
}
3840

39-
#[allow(dead_code)] // TODO remove this once we start using the channel buffer
4041
pub enum Either<A, B> {
4142
A(A),
4243
B(B),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use super::Dispatch;
5+
use crate::{
6+
event,
7+
socket::recv::descriptor::Filled,
8+
stream::{recv, socket::Socket, TransportFeatures},
9+
sync::mpsc::Receiver,
10+
};
11+
use core::task::{Context, Poll};
12+
use s2n_quic_core::ensure;
13+
use std::{collections::VecDeque, io};
14+
15+
pub struct Channel {
16+
pending: VecDeque<Filled>,
17+
receiver: Receiver<Filled>,
18+
}
19+
20+
impl super::Buffer for Channel {
21+
#[inline]
22+
fn is_empty(&self) -> bool {
23+
self.pending.is_empty()
24+
}
25+
26+
#[inline]
27+
fn poll_fill<S, Pub>(
28+
&mut self,
29+
cx: &mut Context,
30+
socket: &S,
31+
publisher: &mut Pub,
32+
) -> Poll<io::Result<usize>>
33+
where
34+
S: ?Sized + Socket,
35+
Pub: event::ConnectionPublisher,
36+
{
37+
// check if we have any pending packets
38+
debug_assert!(self.pending.is_empty(), "pending packets should be empty");
39+
ensure!(self.pending.is_empty(), Ok(1).into());
40+
41+
let capacity = u16::MAX as usize;
42+
43+
// the socket isn't actually used since we're relying on another task to fill the `receiver` channel
44+
let _ = socket;
45+
46+
let result = self
47+
.receiver
48+
.poll_swap(cx, &mut self.pending)
49+
.map_err(|_err| io::Error::from(io::ErrorKind::BrokenPipe));
50+
51+
match result {
52+
Poll::Ready(Ok(())) => {
53+
let committed_len = self
54+
.pending
55+
.iter()
56+
.map(|segment| segment.len() as usize)
57+
.sum::<usize>();
58+
publisher.on_stream_read_socket_flushed(event::builder::StreamReadSocketFlushed {
59+
capacity,
60+
committed_len,
61+
});
62+
Ok(committed_len).into()
63+
}
64+
Poll::Ready(Err(error)) => {
65+
let errno = error.raw_os_error();
66+
publisher.on_stream_read_socket_errored(event::builder::StreamReadSocketErrored {
67+
capacity,
68+
errno,
69+
});
70+
Err(error).into()
71+
}
72+
Poll::Pending => {
73+
publisher.on_stream_read_socket_blocked(event::builder::StreamReadSocketBlocked {
74+
capacity,
75+
});
76+
Poll::Pending
77+
}
78+
}
79+
}
80+
81+
#[inline]
82+
fn process<R>(&mut self, features: TransportFeatures, router: &mut R) -> Result<(), recv::Error>
83+
where
84+
R: Dispatch,
85+
{
86+
debug_assert!(
87+
!features.is_stream(),
88+
"only datagram oriented transport is supported"
89+
);
90+
91+
for mut segment in self.pending.drain(..) {
92+
let remote_addr = segment.remote_address().get();
93+
let ecn = segment.ecn();
94+
router.on_datagram_segment(&remote_addr, ecn, segment.payload_mut())?;
95+
}
96+
97+
Ok(())
98+
}
99+
}

dc/s2n-quic-dc/src/stream/recv/shared.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::{
2828
},
2929
};
3030

31-
pub type RecvBuffer = recv::buffer::Local;
31+
pub type RecvBuffer = recv::buffer::Either<recv::buffer::Local, recv::buffer::Channel>;
3232

3333
/// Who will send ACKs?
3434
#[derive(Clone, Copy, Debug, Default)]

dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs

+1
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ impl WorkerState {
317317
// TCP doesn't use the route key so just pick 0
318318
let queue_id = VarInt::ZERO;
319319
let recv_buffer = recv::buffer::Local::new(recv_buffer.take(), None);
320+
let recv_buffer = recv::buffer::Either::A(recv_buffer);
320321

321322
let stream_builder = match endpoint::accept_stream(
322323
now,

dc/s2n-quic-dc/src/stream/server/tokio/udp.rs

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ where
114114
// TODO allocate a queue for this stream
115115
let queue_id = VarInt::ZERO;
116116
let recv_buffer = recv::buffer::Local::new(self.recv_buffer.take(), Some(handshake));
117+
let recv_buffer = recv::buffer::Either::A(recv_buffer);
117118

118119
let stream = match endpoint::accept_stream(
119120
now,

dc/s2n-quic-dc/src/stream/socket/handle.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use super::{Protocol, TransportFeatures};
4+
use super::{recv::descriptor, Protocol, TransportFeatures};
55
use crate::msg::{self, addr::Addr, cmsg};
66
use core::task::{Context, Poll};
77
use s2n_quic_core::inet::ExplicitCongestionNotification;
@@ -31,6 +31,23 @@ pub trait Socket: 'static + Send + Sync {
3131
/// Returns the amount of buffered data on the socket
3232
fn poll_peek_len(&self, cx: &mut Context) -> Poll<io::Result<usize>>;
3333

34+
#[inline]
35+
fn poll_recv_desc(
36+
&self,
37+
cx: &mut Context,
38+
desc: descriptor::Unfilled,
39+
) -> Poll<Result<descriptor::Segments, (descriptor::Unfilled, io::Error)>> {
40+
let segments = desc.recv_with(|addr, cmsg, buffer| {
41+
match self.poll_recv(cx, addr, cmsg, &mut [buffer]) {
42+
Poll::Pending => Err(io::ErrorKind::WouldBlock.into()),
43+
Poll::Ready(Ok(len)) => Ok(len),
44+
Poll::Ready(Err(err)) => Err(err),
45+
}
46+
})?;
47+
48+
Ok(segments).into()
49+
}
50+
3451
#[inline]
3552
fn poll_recv_buffer(
3653
&self,

0 commit comments

Comments
 (0)