From 1f376e2e7a835d5553b78919ed5c935ca9219cbc Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 19 Dec 2023 22:05:39 -0300 Subject: [PATCH] fix tests --- pallas-network/src/facades.rs | 60 ++++++++-------------------- pallas-network/src/multiplexer.rs | 13 ++++--- pallas-network/tests/plexer.rs | 8 ++-- pallas-network/tests/protocols.rs | 65 ++++++++++++++++++------------- 4 files changed, 68 insertions(+), 78 deletions(-) diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 357f59c3..1ee099a3 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -1,10 +1,5 @@ -use std::path::Path; - use thiserror::Error; -use tracing::{debug, error}; - -#[cfg(unix)] -use std::os::unix::net::UnixListener; +use tracing::error; use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable}; @@ -43,10 +38,7 @@ pub struct PeerClient { } impl PeerClient { - pub async fn connect(address: &str, magic: u64) -> Result { - debug!("connecting"); - let bearer = Bearer::connect_tcp(address).map_err(Error::ConnectFailure)?; - + pub async fn connect(bearer: Bearer, magic: u64) -> Result { let mut plexer = multiplexer::Plexer::new(bearer); let channel0 = plexer.subscribe_client(0); @@ -84,8 +76,8 @@ impl PeerClient { &mut self.blockfetch } - pub fn abort(self) -> Result<(), Error> { - self.plexer.abort().map_err(Error::PlexerFailure) + pub fn abort(&self) { + self.plexer.abort(); } } @@ -98,9 +90,7 @@ pub struct PeerServer { } impl PeerServer { - pub async fn accept(listener: &std::net::TcpListener, magic: u64) -> Result { - let (bearer, _) = Bearer::accept_tcp(listener).map_err(Error::ConnectFailure)?; - + pub async fn serve(bearer: Bearer, magic: u64) -> Result { let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE); @@ -126,7 +116,7 @@ impl PeerServer { blockfetch: server_bf, }) } else { - plexer.abort().map_err(Error::PlexerFailure)?; + plexer.abort(); Err(Error::IncompatibleVersion) } } @@ -139,8 +129,8 @@ impl PeerServer { &mut self.blockfetch } - pub fn abort(self) -> Result<(), Error> { - self.plexer.abort().map_err(Error::PlexerFailure) + pub fn abort(&self) { + self.plexer.abort(); } } @@ -186,13 +176,7 @@ impl NodeClient { } #[cfg(unix)] - pub async fn connect(path: impl AsRef, magic: u64) -> Result { - debug!("connecting"); - - let bearer = Bearer::connect_unix(path) - .await - .map_err(Error::ConnectFailure)?; - + pub async fn connect(bearer: Bearer, magic: u64) -> Result { let versions = handshake::n2c::VersionTable::v10_and_above(magic); Self::connect_bearer(bearer, versions).await @@ -216,15 +200,9 @@ impl NodeClient { #[cfg(unix)] pub async fn handshake_query( - path: impl AsRef, + bearer: Bearer, magic: u64, ) -> Result { - debug!("connecting"); - - let bearer = Bearer::connect_unix(path) - .await - .map_err(Error::ConnectFailure)?; - let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE); @@ -249,7 +227,7 @@ impl NodeClient { Err(Error::IncompatibleVersion) } Confirmation::QueryReply(version_table) => { - plexer.abort().map_err(Error::PlexerFailure)?; + plexer.abort(); Ok(version_table) } } @@ -263,8 +241,8 @@ impl NodeClient { &mut self.statequery } - pub fn abort(self) -> Result<(), Error> { - self.plexer.abort().map_err(Error::PlexerFailure) + pub fn abort(&self) { + self.plexer.abort(); } } @@ -279,11 +257,7 @@ pub struct NodeServer { #[cfg(unix)] impl NodeServer { - pub async fn accept(listener: &UnixListener, magic: u64) -> Result { - let (bearer, _) = Bearer::accept_unix(listener) - .await - .map_err(Error::ConnectFailure)?; - + pub async fn serve(bearer: Bearer, magic: u64) -> Result { let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE); @@ -309,7 +283,7 @@ impl NodeServer { statequery: server_sq, }) } else { - plexer.abort().map_err(Error::PlexerFailure)?; + plexer.abort(); Err(Error::IncompatibleVersion) } } @@ -322,7 +296,7 @@ impl NodeServer { &mut self.statequery } - pub fn abort(self) -> Result<(), Error> { - self.plexer.abort().map_err(Error::PlexerFailure) + pub fn abort(&self) { + self.plexer.abort(); } } diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 300851f5..90eb7ffb 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -94,21 +94,19 @@ impl Bearer { } #[cfg(unix)] - pub async fn connect_unix(path: impl AsRef) -> IOResult { + pub fn connect_unix(path: impl AsRef) -> IOResult { let stream = unix::UnixStream::connect(path)?; Ok(Self::Unix(stream)) } #[cfg(unix)] - pub async fn accept_unix(listener: &unix::UnixListener) -> IOResult<(Self, unix::SocketAddr)> { + pub fn accept_unix(listener: &unix::UnixListener) -> IOResult<(Self, unix::SocketAddr)> { let (stream, addr) = listener.accept()?; Ok((Self::Unix(stream), addr)) } #[cfg(windows)] - pub async fn connect_named_pipe( - pipe_name: impl AsRef, - ) -> Result { + pub fn connect_named_pipe(pipe_name: impl AsRef) -> IOResult { let client = tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name)?; Ok(Self::NamedPipe(client)) } @@ -378,8 +376,11 @@ pub struct RunningPlexer { } impl RunningPlexer { - pub fn abort(self) -> Result<(), Error> { + pub fn abort(&self) { self.abort.store(true, Ordering::Relaxed); + } + + pub fn join(self) -> Result<(), Error> { self.demuxer.join().expect("couldn't join demuxer thread")?; self.muxer.join().expect("couldn't join muxer thread")?; diff --git a/pallas-network/tests/plexer.rs b/pallas-network/tests/plexer.rs index 7f224029..bca83c5f 100644 --- a/pallas-network/tests/plexer.rs +++ b/pallas-network/tests/plexer.rs @@ -29,7 +29,7 @@ fn random_payload(size: usize) -> Vec { #[tokio::test] async fn one_way_small_sequence_of_payloads() { - let mut passive = setup_passive_muxer::<50301>(); + let passive = tokio::task::spawn_blocking(|| setup_passive_muxer::<50301>()); // HACK: a small sleep seems to be required for Github actions runner to // formally expose the port @@ -37,6 +37,8 @@ async fn one_way_small_sequence_of_payloads() { let mut active = setup_active_muxer::<50301>(); + let mut passive = passive.await.unwrap(); + let mut sender_channel = active.subscribe_client(3); let mut receiver_channel = passive.subscribe_server(3); @@ -51,6 +53,6 @@ async fn one_way_small_sequence_of_payloads() { assert_eq!(payload, received_payload); } - passive.abort().unwrap(); - active.abort().unwrap(); + passive.abort(); + active.abort(); } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 7a8b5c7a..9893c728 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -26,9 +26,8 @@ use std::os::unix::net::UnixListener; #[tokio::test] #[ignore] pub async fn chainsync_history_happy_path() { - let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2) - .await - .unwrap(); + let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap(); + let mut peer = PeerClient::connect(bearer, 2).await.unwrap(); let client = peer.chainsync(); @@ -79,9 +78,8 @@ pub async fn chainsync_history_happy_path() { #[tokio::test] #[ignore] pub async fn chainsync_tip_happy_path() { - let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2) - .await - .unwrap(); + let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap(); + let mut peer = PeerClient::connect(bearer, 2).await.unwrap(); let client = peer.chainsync(); @@ -120,9 +118,8 @@ pub async fn chainsync_tip_happy_path() { #[tokio::test] #[ignore] pub async fn blockfetch_happy_path() { - let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2) - .await - .unwrap(); + let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap(); + let mut peer = PeerClient::connect(bearer, 2).await.unwrap(); let client = peer.blockfetch(); @@ -180,10 +177,15 @@ pub async fn blockfetch_server_and_client_happy_path() { async move { // server setup - let server_listener = + let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)).unwrap(); - let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap(); + let (bearer, _) = tokio::task::spawn_blocking(move || Bearer::accept_tcp(&listener)) + .await + .unwrap() + .unwrap(); + + let mut peer_server = PeerServer::serve(bearer, 0).await.unwrap(); let server_bf = peer_server.blockfetch(); @@ -215,9 +217,8 @@ pub async fn blockfetch_server_and_client_happy_path() { let client = tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; - // client setup - - let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap(); + let bearer = Bearer::connect_tcp("localhost:30001").unwrap(); + let mut client_to_server_conn = PeerClient::connect(bearer, 0).await.unwrap(); let client_bf = client_to_server_conn.blockfetch(); @@ -271,9 +272,12 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { // server setup let server_listener = - TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)).unwrap(); + TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30002)).unwrap(); + + let bearer = + tokio::task::spawn_blocking(move || Bearer::accept_tcp(&server_listener).unwrap()); - let (bearer, _) = Bearer::accept_tcp(&server_listener).unwrap(); + let (bearer, _) = bearer.await.unwrap(); let mut server_plexer = Plexer::new(bearer); @@ -377,15 +381,14 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { assert!(server_cs.recv_while_idle().await.unwrap().is_none()); assert_eq!(*server_cs.state(), chainsync::State::Done); + + server_plexer.abort(); } }); let client = tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(2)).await; - - // client setup - - let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap(); + let bearer = Bearer::connect_tcp("localhost:30002").unwrap(); + let mut client_to_server_conn = PeerClient::connect(bearer, 0).await.unwrap(); let client_cs = client_to_server_conn.chainsync(); @@ -467,9 +470,14 @@ pub async fn local_state_query_server_and_client_happy_path() { fs::remove_file(socket_path).unwrap(); } - let unix_listener = UnixListener::bind(socket_path).unwrap(); + let listener = UnixListener::bind(socket_path).unwrap(); + + let (bearer, _) = tokio::task::spawn_blocking(move || Bearer::accept_unix(&listener)) + .await + .unwrap() + .unwrap(); - let mut server = pallas_network::facades::NodeServer::accept(&unix_listener, 0) + let mut server = pallas_network::facades::NodeServer::serve(bearer, 0) .await .unwrap(); @@ -552,7 +560,9 @@ pub async fn local_state_query_server_and_client_happy_path() { x => panic!("unexpected message from client: {x:?}"), }; - let addr_hex = "981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD"; + let addr_hex = +"981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD" +; let addr = hex::decode(addr_hex).unwrap(); let addr: Addr = addr.to_vec().into(); let addrs: Addrs = Vec::from([addr]); @@ -632,7 +642,8 @@ pub async fn local_state_query_server_and_client_happy_path() { let socket_path = "node.socket"; - let mut client = NodeClient::connect(socket_path, 0).await.unwrap(); + let bearer = Bearer::connect_unix(&socket_path).unwrap(); + let mut client = NodeClient::connect(bearer, 0).await.unwrap(); // client sends acquire @@ -705,7 +716,9 @@ pub async fn local_state_query_server_and_client_happy_path() { assert_eq!(result, localstate::queries_v16::StakeDistribution { pools }); - let addr_hex = "981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD"; + let addr_hex = +"981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD" +; let addr = hex::decode(addr_hex).unwrap(); let addr: Addr = addr.to_vec().into(); let addrs: Addrs = Vec::from([addr]);