Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 20, 2023
1 parent 60588b6 commit 1f376e2
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 78 deletions.
60 changes: 17 additions & 43 deletions pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use std::path::Path;

use thiserror::Error;
use tracing::{debug, error};

#[cfg(unix)]
use std::os::unix::net::UnixListener;
use tracing::error;

use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable};

Check warning on line 4 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

unused import: `Confirmation`

Expand Down Expand Up @@ -43,10 +38,7 @@ pub struct PeerClient {
}

impl PeerClient {
pub async fn connect(address: &str, magic: u64) -> Result<Self, Error> {
debug!("connecting");
let bearer = Bearer::connect_tcp(address).map_err(Error::ConnectFailure)?;

pub async fn connect(bearer: Bearer, magic: u64) -> Result<Self, Error> {
let mut plexer = multiplexer::Plexer::new(bearer);

let channel0 = plexer.subscribe_client(0);
Expand Down Expand Up @@ -84,8 +76,8 @@ impl PeerClient {
&mut self.blockfetch
}

pub fn abort(self) -> Result<(), Error> {
self.plexer.abort().map_err(Error::PlexerFailure)
pub fn abort(&self) {
self.plexer.abort();
}
}

Expand All @@ -98,9 +90,7 @@ pub struct PeerServer {
}

impl PeerServer {
pub async fn accept(listener: &std::net::TcpListener, magic: u64) -> Result<Self, Error> {
let (bearer, _) = Bearer::accept_tcp(listener).map_err(Error::ConnectFailure)?;

pub async fn serve(bearer: Bearer, magic: u64) -> Result<Self, Error> {
let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE);
Expand All @@ -126,7 +116,7 @@ impl PeerServer {
blockfetch: server_bf,
})
} else {
plexer.abort().map_err(Error::PlexerFailure)?;
plexer.abort();
Err(Error::IncompatibleVersion)
}
}
Expand All @@ -139,8 +129,8 @@ impl PeerServer {
&mut self.blockfetch
}

pub fn abort(self) -> Result<(), Error> {
self.plexer.abort().map_err(Error::PlexerFailure)
pub fn abort(&self) {
self.plexer.abort();
}
}

Expand Down Expand Up @@ -186,13 +176,7 @@ impl NodeClient {
}

#[cfg(unix)]
pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
debug!("connecting");

let bearer = Bearer::connect_unix(path)
.await
.map_err(Error::ConnectFailure)?;

pub async fn connect(bearer: Bearer, magic: u64) -> Result<Self, Error> {
let versions = handshake::n2c::VersionTable::v10_and_above(magic);

Self::connect_bearer(bearer, versions).await
Expand All @@ -216,15 +200,9 @@ impl NodeClient {

#[cfg(unix)]
pub async fn handshake_query(
path: impl AsRef<Path>,
bearer: Bearer,
magic: u64,
) -> Result<handshake::n2c::VersionTable, Error> {
debug!("connecting");

let bearer = Bearer::connect_unix(path)
.await
.map_err(Error::ConnectFailure)?;

let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
Expand All @@ -249,7 +227,7 @@ impl NodeClient {
Err(Error::IncompatibleVersion)
}
Confirmation::QueryReply(version_table) => {
plexer.abort().map_err(Error::PlexerFailure)?;
plexer.abort();
Ok(version_table)
}
}
Expand All @@ -263,8 +241,8 @@ impl NodeClient {
&mut self.statequery
}

pub fn abort(self) -> Result<(), Error> {
self.plexer.abort().map_err(Error::PlexerFailure)
pub fn abort(&self) {
self.plexer.abort();
}
}

Expand All @@ -279,11 +257,7 @@ pub struct NodeServer {

#[cfg(unix)]
impl NodeServer {
pub async fn accept(listener: &UnixListener, magic: u64) -> Result<Self, Error> {
let (bearer, _) = Bearer::accept_unix(listener)
.await
.map_err(Error::ConnectFailure)?;

pub async fn serve(bearer: Bearer, magic: u64) -> Result<Self, Error> {
let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE);
Expand All @@ -309,7 +283,7 @@ impl NodeServer {
statequery: server_sq,
})
} else {
plexer.abort().map_err(Error::PlexerFailure)?;
plexer.abort();
Err(Error::IncompatibleVersion)
}
}
Expand All @@ -322,7 +296,7 @@ impl NodeServer {
&mut self.statequery
}

pub fn abort(self) -> Result<(), Error> {
self.plexer.abort().map_err(Error::PlexerFailure)
pub fn abort(&self) {
self.plexer.abort();
}
}
13 changes: 7 additions & 6 deletions pallas-network/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,19 @@ impl Bearer {
}

#[cfg(unix)]
pub async fn connect_unix(path: impl AsRef<std::path::Path>) -> IOResult<Self> {
pub fn connect_unix(path: impl AsRef<std::path::Path>) -> IOResult<Self> {
let stream = unix::UnixStream::connect(path)?;
Ok(Self::Unix(stream))
}

#[cfg(unix)]
pub async fn accept_unix(listener: &unix::UnixListener) -> IOResult<(Self, unix::SocketAddr)> {
pub fn accept_unix(listener: &unix::UnixListener) -> IOResult<(Self, unix::SocketAddr)> {
let (stream, addr) = listener.accept()?;
Ok((Self::Unix(stream), addr))
}

#[cfg(windows)]
pub async fn connect_named_pipe(
pipe_name: impl AsRef<std::ffi::OsStr>,
) -> Result<Self, tokio::io::Error> {
pub fn connect_named_pipe(pipe_name: impl AsRef<std::ffi::OsStr>) -> IOResult<Self> {
let client = tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name)?;
Ok(Self::NamedPipe(client))
}
Expand Down Expand Up @@ -378,8 +376,11 @@ pub struct RunningPlexer {
}

impl RunningPlexer {
pub fn abort(self) -> Result<(), Error> {
pub fn abort(&self) {
self.abort.store(true, Ordering::Relaxed);
}

pub fn join(self) -> Result<(), Error> {
self.demuxer.join().expect("couldn't join demuxer thread")?;
self.muxer.join().expect("couldn't join muxer thread")?;

Expand Down
8 changes: 5 additions & 3 deletions pallas-network/tests/plexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ fn random_payload(size: usize) -> Vec<u8> {

#[tokio::test]
async fn one_way_small_sequence_of_payloads() {
let mut passive = setup_passive_muxer::<50301>();
let passive = tokio::task::spawn_blocking(|| setup_passive_muxer::<50301>());

// HACK: a small sleep seems to be required for Github actions runner to
// formally expose the port
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

let mut active = setup_active_muxer::<50301>();

let mut passive = passive.await.unwrap();

let mut sender_channel = active.subscribe_client(3);
let mut receiver_channel = passive.subscribe_server(3);

Expand All @@ -51,6 +53,6 @@ async fn one_way_small_sequence_of_payloads() {
assert_eq!(payload, received_payload);
}

passive.abort().unwrap();
active.abort().unwrap();
passive.abort();
active.abort();
}
65 changes: 39 additions & 26 deletions pallas-network/tests/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ use std::os::unix::net::UnixListener;
#[tokio::test]
#[ignore]
pub async fn chainsync_history_happy_path() {
let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2)
.await
.unwrap();
let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap();
let mut peer = PeerClient::connect(bearer, 2).await.unwrap();

let client = peer.chainsync();

Expand Down Expand Up @@ -79,9 +78,8 @@ pub async fn chainsync_history_happy_path() {
#[tokio::test]
#[ignore]
pub async fn chainsync_tip_happy_path() {
let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2)
.await
.unwrap();
let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap();
let mut peer = PeerClient::connect(bearer, 2).await.unwrap();

let client = peer.chainsync();

Expand Down Expand Up @@ -120,9 +118,8 @@ pub async fn chainsync_tip_happy_path() {
#[tokio::test]
#[ignore]
pub async fn blockfetch_happy_path() {
let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2)
.await
.unwrap();
let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap();
let mut peer = PeerClient::connect(bearer, 2).await.unwrap();

let client = peer.blockfetch();

Expand Down Expand Up @@ -180,10 +177,15 @@ pub async fn blockfetch_server_and_client_happy_path() {
async move {
// server setup

let server_listener =
let listener =
TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)).unwrap();

let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap();
let (bearer, _) = tokio::task::spawn_blocking(move || Bearer::accept_tcp(&listener))
.await
.unwrap()
.unwrap();

let mut peer_server = PeerServer::serve(bearer, 0).await.unwrap();

let server_bf = peer_server.blockfetch();

Expand Down Expand Up @@ -215,9 +217,8 @@ pub async fn blockfetch_server_and_client_happy_path() {
let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;

// client setup

let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap();
let bearer = Bearer::connect_tcp("localhost:30001").unwrap();
let mut client_to_server_conn = PeerClient::connect(bearer, 0).await.unwrap();

let client_bf = client_to_server_conn.blockfetch();

Expand Down Expand Up @@ -271,9 +272,12 @@ pub async fn chainsync_server_and_client_happy_path_n2n() {
// server setup

let server_listener =
TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)).unwrap();
TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30002)).unwrap();

let bearer =
tokio::task::spawn_blocking(move || Bearer::accept_tcp(&server_listener).unwrap());

let (bearer, _) = Bearer::accept_tcp(&server_listener).unwrap();
let (bearer, _) = bearer.await.unwrap();

let mut server_plexer = Plexer::new(bearer);

Expand Down Expand Up @@ -377,15 +381,14 @@ pub async fn chainsync_server_and_client_happy_path_n2n() {

assert!(server_cs.recv_while_idle().await.unwrap().is_none());
assert_eq!(*server_cs.state(), chainsync::State::Done);

server_plexer.abort();
}
});

let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;

// client setup

let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap();
let bearer = Bearer::connect_tcp("localhost:30002").unwrap();
let mut client_to_server_conn = PeerClient::connect(bearer, 0).await.unwrap();

let client_cs = client_to_server_conn.chainsync();

Expand Down Expand Up @@ -467,9 +470,14 @@ pub async fn local_state_query_server_and_client_happy_path() {
fs::remove_file(socket_path).unwrap();
}

let unix_listener = UnixListener::bind(socket_path).unwrap();
let listener = UnixListener::bind(socket_path).unwrap();

let (bearer, _) = tokio::task::spawn_blocking(move || Bearer::accept_unix(&listener))
.await
.unwrap()
.unwrap();

let mut server = pallas_network::facades::NodeServer::accept(&unix_listener, 0)
let mut server = pallas_network::facades::NodeServer::serve(bearer, 0)
.await
.unwrap();

Expand Down Expand Up @@ -552,7 +560,9 @@ pub async fn local_state_query_server_and_client_happy_path() {
x => panic!("unexpected message from client: {x:?}"),
};

let addr_hex = "981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD";
let addr_hex =
"981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD"
;
let addr = hex::decode(addr_hex).unwrap();
let addr: Addr = addr.to_vec().into();
let addrs: Addrs = Vec::from([addr]);
Expand Down Expand Up @@ -632,7 +642,8 @@ pub async fn local_state_query_server_and_client_happy_path() {

let socket_path = "node.socket";

let mut client = NodeClient::connect(socket_path, 0).await.unwrap();
let bearer = Bearer::connect_unix(&socket_path).unwrap();
let mut client = NodeClient::connect(bearer, 0).await.unwrap();

// client sends acquire

Expand Down Expand Up @@ -705,7 +716,9 @@ pub async fn local_state_query_server_and_client_happy_path() {

assert_eq!(result, localstate::queries_v16::StakeDistribution { pools });

let addr_hex = "981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD";
let addr_hex =
"981D186018CE18F718FB185F188918A918C7186A186518AC18DD1874186D189E188410184D186F1882184D187D18C4184F1842187F18CA18A118DD"
;
let addr = hex::decode(addr_hex).unwrap();
let addr: Addr = addr.to_vec().into();
let addrs: Addrs = Vec::from([addr]);
Expand Down

0 comments on commit 1f376e2

Please sign in to comment.