Skip to content

Commit c11d1a5

Browse files
authored
Merge pull request #4168 from TheBlueMatt/2025-10-net-race-fixes
Fix race in `PeerManager` read pausing.
2 parents add8241 + 467d311 commit c11d1a5

File tree

4 files changed

+73
-81
lines changed

4 files changed

+73
-81
lines changed

fuzz/src/full_stack.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ struct Peer<'a> {
195195
peers_connected: &'a RefCell<[bool; 256]>,
196196
}
197197
impl<'a> SocketDescriptor for Peer<'a> {
198-
fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
198+
fn send_data(&mut self, data: &[u8], _continue_read: bool) -> usize {
199199
data.len()
200200
}
201201
fn disconnect_socket(&mut self) {
@@ -695,7 +695,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {
695695
}
696696
let mut peer = Peer { id: peer_id, peers_connected: &peers };
697697
match loss_detector.handler.read_event(&mut peer, get_slice!(get_slice!(1)[0])) {
698-
Ok(res) => assert!(!res),
698+
Ok(()) => {},
699699
Err(_) => {
700700
peers.borrow_mut()[peer_id as usize] = false;
701701
},

lightning-background-processor/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
774774
/// # #[derive(Eq, PartialEq, Clone, Hash)]
775775
/// # struct SocketDescriptor {}
776776
/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
777-
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
777+
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
778778
/// # fn disconnect_socket(&mut self) {}
779779
/// # }
780780
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
@@ -1878,7 +1878,7 @@ mod tests {
18781878
#[derive(Clone, Hash, PartialEq, Eq)]
18791879
struct TestDescriptor {}
18801880
impl SocketDescriptor for TestDescriptor {
1881-
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
1881+
fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize {
18821882
0
18831883
}
18841884

lightning-net-tokio/src/lib.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,8 @@ impl Connection {
243243
Ok(len) => {
244244
let read_res =
245245
peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
246-
let mut us_lock = us.lock().unwrap();
247246
match read_res {
248-
Ok(pause_read) => {
249-
if pause_read {
250-
us_lock.read_paused = true;
251-
}
252-
},
247+
Ok(()) => {},
253248
Err(_) => break Disconnect::CloseConnection,
254249
}
255250
},
@@ -533,7 +528,7 @@ impl SocketDescriptor {
533528
}
534529
}
535530
impl peer_handler::SocketDescriptor for SocketDescriptor {
536-
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
531+
fn send_data(&mut self, data: &[u8], continue_read: bool) -> usize {
537532
// To send data, we take a lock on our Connection to access the TcpStream, writing to it if
538533
// there's room in the kernel buffer, or otherwise create a new Waker with a
539534
// SocketDescriptor in it which can wake up the write_avail Sender, waking up the
@@ -544,13 +539,16 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
544539
return 0;
545540
}
546541

547-
if resume_read && us.read_paused {
542+
let read_was_paused = us.read_paused;
543+
us.read_paused = !continue_read;
544+
545+
if continue_read && read_was_paused {
548546
// The schedule_read future may go to lock up but end up getting woken up by there
549547
// being more room in the write buffer, dropping the other end of this Sender
550548
// before we get here, so we ignore any failures to wake it up.
551-
us.read_paused = false;
552549
let _ = us.read_waker.try_send(());
553550
}
551+
554552
if data.is_empty() {
555553
return 0;
556554
}
@@ -576,16 +574,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
576574
}
577575
},
578576
task::Poll::Ready(Err(_)) => return written_len,
579-
task::Poll::Pending => {
580-
// We're queued up for a write event now, but we need to make sure we also
581-
// pause read given we're now waiting on the remote end to ACK (and in
582-
// accordance with the send_data() docs).
583-
us.read_paused = true;
584-
// Further, to avoid any current pending read causing a `read_event` call, wake
585-
// up the read_waker and restart its loop.
586-
let _ = us.read_waker.try_send(());
587-
return written_len;
588-
},
577+
task::Poll::Pending => return written_len,
589578
}
590579
}
591580
}

0 commit comments

Comments
 (0)