Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ts_netstack_smoltcp_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl Netstack {
PollIngressSingleResult::SocketStateChanged => {
changed = true;
changed_this_iter = true;
self.pump_tcp_accept();

tracing::trace!("socket state changed");
}
Expand Down Expand Up @@ -386,6 +387,7 @@ impl Netstack {
PollIngressSingleResult::PacketProcessed => {}
PollIngressSingleResult::SocketStateChanged => {
changed = true;
self.pump_tcp_accept();
tracing::trace!("socket state changed");
}
}
Expand Down Expand Up @@ -607,6 +609,7 @@ where
}

if progress_made {
stack.drain_tcp_closes();
Comment thread
dylan-tailscale marked this conversation as resolved.
Poll::Ready(())
} else {
Poll::Pending
Expand Down
117 changes: 104 additions & 13 deletions ts_netstack_smoltcp_core/src/socket_impl/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,23 @@ pub struct TcpListenerState {
/// Socket currently in listening state and waiting for a new connection.
current_socket_handle: SocketHandle,

/// Sockets which have upgraded from the listening state and are waiting to be accepted.
/// Sockets which have transitioned from `LISTEN` to `SYN-RECEIVED` (half-open) and are waiting
/// to become `ESTABLISHED`; in other words, the socket received a `SYN` and replied with a
/// `SYN-ACK`, and is awaiting an `ACK` to complete the handshake.
///
/// Note that sockets in this queue can [transition back to the `LISTEN` state if the remote
/// replies with a `RST` rather than an `ACK`](https://www.rfc-editor.org/rfc/rfc793#page-70).
/// Sockets that return to `LISTEN` should be removed from this queue/dropped (not `close()`d);
/// the listener has already opened a new socket in the `LISTEN` state.
half_open_queue: VecDeque<SocketHandle>,

/// Sockets which have transitioned from `SYN-RECEIVED` (half-open) to `ESTABLISHED`
/// (full-open); in other words, the socket has received an `ACK` from the remote completing the
/// three-way handshake. Sockets in this queue are waiting for a call to
/// [`Netstack::process_tcp_listen()`] with a [`TcpListenCommand::Accept`] command, which will
/// dequeue a socket and return it to become a [`TcpStream`].
///
/// [`TcpStream`]: [::ts_netstack_smoltcp_socket::tcp::stream::TcpStream]
accept_queue: VecDeque<SocketHandle>,
}

Expand Down Expand Up @@ -61,6 +77,7 @@ impl Netstack {
TcpListenerState {
current_socket_handle: socket_handle,
local_endpoint,
half_open_queue: Default::default(),
accept_queue: Default::default(),
},
);
Expand All @@ -76,18 +93,79 @@ impl Netstack {
return Error::BadRequest.into();
};

if let Some(handle) = listener.accept_queue.pop_front() {
let sock = self.socket_set.get::<tcp::Socket>(handle);
let remote = sock.remote_endpoint().unwrap();
// Iterate the half-open queue, re-queueing any sockets that are still in
// `SYN-RECEIVED`. Move any sockets in `ESTABLISHED` to the `accept_queue`, close
// any sockets in `CLOSE-WAIT`, and drop any sockets that moved back to `LISTEN`.
// All other states are unexpected.
listener.half_open_queue.retain(|half_open| {
let sock = self.socket_set.get_mut::<tcp::Socket>(*half_open);
let state = sock.state();
let _span = tracing::trace_span!(
"half_open_queue",
accept_queue_len = listener.accept_queue.len(),
pending_closes = self.pending_tcp_closes.len(),
?half_open,
?state
)
.entered();

match state {
tcp::State::SynReceived => {
tracing::trace!("half-open socket unchanged, re-queueing");
true
}
tcp::State::Established => {
tracing::trace!("half-open socket ready, moving to accept queue");
listener.accept_queue.push_back(*half_open);
false
}
tcp::State::CloseWait => {
tracing::trace!("half-open socket moved to CLOSE-WAIT, closing");
sock.close();
self.pending_tcp_closes.push(*half_open);
if self.pending_tcp_closes.len() > 10000 {
tracing::warn!("large number of pending closes");
}
false
}
tcp::State::Listen => {
tracing::trace!("half-open socket moved to LISTEN, dropping");
false
}
_ => {
tracing::warn!("half-open socket in unexpected state, dropping");
false
}
Comment thread
dylan-tailscale marked this conversation as resolved.
}
});

// De-queue a single socket in the `ESTABLISHED` state from the `accept_queue` and
// return it to become a `TcpStream`.
if let Some(accept) = listener.accept_queue.pop_front() {
let sock = self.socket_set.get_mut::<tcp::Socket>(accept);
let state = sock.state();
let _span = tracing::trace_span!(
"accept_queue",
half_open_queue_len = listener.half_open_queue.len(),
accept_queue_len = listener.accept_queue.len(),
pending_closes = self.pending_tcp_closes.len(),
?accept,
?state
)
.entered();

debug_assert_eq!(sock.state(), tcp::State::Established);
tracing::trace!("accept socket accepted, returning");

let remote = sock.remote_endpoint().unwrap();
return TcpListenResponse::Accepted {
handle,
handle: accept,
remote: SocketAddr::new(remote.addr.into(), remote.port),
}
.into();
}

tracing::trace!("accept not ready");
tracing::trace!("accept queue empty");

Response::WouldBlock {
handle: None,
Expand All @@ -108,7 +186,12 @@ impl Netstack {

self.pending_tcp_closes.push(listener.current_socket_handle);

for pending_accept in listener.accept_queue {
let accept_handles = listener
.half_open_queue
.iter()
.chain(listener.accept_queue.iter())
.copied();
for pending_accept in accept_handles {
let sock = self.socket_set.get_mut::<tcp::Socket>(pending_accept);
sock.close();

Expand Down Expand Up @@ -140,24 +223,32 @@ impl Netstack {

match sock.state() {
tcp::State::Listen => {
return;
tracing::trace!("listening");
continue;
Comment thread
npry marked this conversation as resolved.
}

tcp::State::SynReceived | tcp::State::SynSent => {
tcp::State::SynReceived => {
tracing::trace!("socket pending, not yet established");
return;
listener
.half_open_queue
.push_back(listener.current_socket_handle);
}

tcp::State::Established => {
tracing::trace!("connection established");

listener
.accept_queue
.push_back(listener.current_socket_handle);
}

_ => {
tracing::warn!("partially-established listening socket reset or closed");
state => {
tracing::warn!(
current_socket = ?listener.current_socket_handle,
current_socket_state = %state,
half_open_queue_len = listener.half_open_queue.len(),
accept_queue_len = listener.accept_queue.len(),
listening_on = %listener.local_endpoint,
"partially-established listening socket reset or closed");
sock.close();
self.pending_tcp_closes.push(listener.current_socket_handle);
}
Expand Down