diff --git a/ts_netstack_smoltcp_core/src/lib.rs b/ts_netstack_smoltcp_core/src/lib.rs index 941e7f4a..879056d5 100644 --- a/ts_netstack_smoltcp_core/src/lib.rs +++ b/ts_netstack_smoltcp_core/src/lib.rs @@ -289,6 +289,7 @@ impl Netstack { PollIngressSingleResult::SocketStateChanged => { changed = true; changed_this_iter = true; + self.pump_tcp_accept(); tracing::trace!("socket state changed"); } @@ -386,6 +387,7 @@ impl Netstack { PollIngressSingleResult::PacketProcessed => {} PollIngressSingleResult::SocketStateChanged => { changed = true; + self.pump_tcp_accept(); tracing::trace!("socket state changed"); } } @@ -607,6 +609,7 @@ where } if progress_made { + stack.drain_tcp_closes(); Poll::Ready(()) } else { Poll::Pending diff --git a/ts_netstack_smoltcp_core/src/socket_impl/tcp/listener.rs b/ts_netstack_smoltcp_core/src/socket_impl/tcp/listener.rs index 6a2943a2..04363e4b 100644 --- a/ts_netstack_smoltcp_core/src/socket_impl/tcp/listener.rs +++ b/ts_netstack_smoltcp_core/src/socket_impl/tcp/listener.rs @@ -29,7 +29,23 @@ pub struct TcpListenerState { /// Socket currently in listening state and waiting for a new connection. current_socket_handle: SocketHandle, - /// Sockets which have upgraded from the listening state and are waiting to be accepted. + /// Sockets which have transitioned from `LISTEN` to `SYN-RECEIVED` (half-open) and are waiting + /// to become `ESTABLISHED`; in other words, the socket received a `SYN` and replied with a + /// `SYN-ACK`, and is awaiting an `ACK` to complete the handshake. + /// + /// Note that sockets in this queue can [transition back to the `LISTEN` state if the remote + /// replies with a `RST` rather than an `ACK`](https://www.rfc-editor.org/rfc/rfc793#page-70). + /// Sockets that return to `LISTEN` should be removed from this queue/dropped (not `close()`d); + /// the listener has already opened a new socket in the `LISTEN` state. + half_open_queue: VecDeque, + + /// Sockets which have transitioned from `SYN-RECEIVED` (half-open) to `ESTABLISHED` + /// (full-open); in other words, the socket has received an `ACK` from the remote completing the + /// three-way handshake. Sockets in this queue are waiting for a call to + /// [`Netstack::process_tcp_listen()`] with a [`TcpListenCommand::Accept`] command, which will + /// dequeue a socket and return it to become a [`TcpStream`]. + /// + /// [`TcpStream`]: [::ts_netstack_smoltcp_socket::tcp::stream::TcpStream] accept_queue: VecDeque, } @@ -61,6 +77,7 @@ impl Netstack { TcpListenerState { current_socket_handle: socket_handle, local_endpoint, + half_open_queue: Default::default(), accept_queue: Default::default(), }, ); @@ -76,18 +93,79 @@ impl Netstack { return Error::BadRequest.into(); }; - if let Some(handle) = listener.accept_queue.pop_front() { - let sock = self.socket_set.get::(handle); - let remote = sock.remote_endpoint().unwrap(); + // Iterate the half-open queue, re-queueing any sockets that are still in + // `SYN-RECEIVED`. Move any sockets in `ESTABLISHED` to the `accept_queue`, close + // any sockets in `CLOSE-WAIT`, and drop any sockets that moved back to `LISTEN`. + // All other states are unexpected. + listener.half_open_queue.retain(|half_open| { + let sock = self.socket_set.get_mut::(*half_open); + let state = sock.state(); + let _span = tracing::trace_span!( + "half_open_queue", + accept_queue_len = listener.accept_queue.len(), + pending_closes = self.pending_tcp_closes.len(), + ?half_open, + ?state + ) + .entered(); + + match state { + tcp::State::SynReceived => { + tracing::trace!("half-open socket unchanged, re-queueing"); + true + } + tcp::State::Established => { + tracing::trace!("half-open socket ready, moving to accept queue"); + listener.accept_queue.push_back(*half_open); + false + } + tcp::State::CloseWait => { + tracing::trace!("half-open socket moved to CLOSE-WAIT, closing"); + sock.close(); + self.pending_tcp_closes.push(*half_open); + if self.pending_tcp_closes.len() > 10000 { + tracing::warn!("large number of pending closes"); + } + false + } + tcp::State::Listen => { + tracing::trace!("half-open socket moved to LISTEN, dropping"); + false + } + _ => { + tracing::warn!("half-open socket in unexpected state, dropping"); + false + } + } + }); + + // De-queue a single socket in the `ESTABLISHED` state from the `accept_queue` and + // return it to become a `TcpStream`. + if let Some(accept) = listener.accept_queue.pop_front() { + let sock = self.socket_set.get_mut::(accept); + let state = sock.state(); + let _span = tracing::trace_span!( + "accept_queue", + half_open_queue_len = listener.half_open_queue.len(), + accept_queue_len = listener.accept_queue.len(), + pending_closes = self.pending_tcp_closes.len(), + ?accept, + ?state + ) + .entered(); + + debug_assert_eq!(sock.state(), tcp::State::Established); + tracing::trace!("accept socket accepted, returning"); + let remote = sock.remote_endpoint().unwrap(); return TcpListenResponse::Accepted { - handle, + handle: accept, remote: SocketAddr::new(remote.addr.into(), remote.port), } .into(); } - tracing::trace!("accept not ready"); + tracing::trace!("accept queue empty"); Response::WouldBlock { handle: None, @@ -108,7 +186,12 @@ impl Netstack { self.pending_tcp_closes.push(listener.current_socket_handle); - for pending_accept in listener.accept_queue { + let accept_handles = listener + .half_open_queue + .iter() + .chain(listener.accept_queue.iter()) + .copied(); + for pending_accept in accept_handles { let sock = self.socket_set.get_mut::(pending_accept); sock.close(); @@ -140,24 +223,32 @@ impl Netstack { match sock.state() { tcp::State::Listen => { - return; + tracing::trace!("listening"); + continue; } - tcp::State::SynReceived | tcp::State::SynSent => { + tcp::State::SynReceived => { tracing::trace!("socket pending, not yet established"); - return; + listener + .half_open_queue + .push_back(listener.current_socket_handle); } tcp::State::Established => { tracing::trace!("connection established"); - listener .accept_queue .push_back(listener.current_socket_handle); } - _ => { - tracing::warn!("partially-established listening socket reset or closed"); + state => { + tracing::warn!( + current_socket = ?listener.current_socket_handle, + current_socket_state = %state, + half_open_queue_len = listener.half_open_queue.len(), + accept_queue_len = listener.accept_queue.len(), + listening_on = %listener.local_endpoint, + "partially-established listening socket reset or closed"); sock.close(); self.pending_tcp_closes.push(listener.current_socket_handle); }