From d71678a83ffdcdbab547cbb5ec09721d735e63a9 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Wed, 20 Dec 2023 08:44:24 -0300 Subject: [PATCH] improve facade entry point --- examples/n2n-miniprotocols/src/main.rs | 6 +- pallas-network/src/facades.rs | 238 ++++++++++++++++--------- pallas-network/src/multiplexer.rs | 18 +- pallas-network/tests/protocols.rs | 63 +++---- 4 files changed, 186 insertions(+), 139 deletions(-) diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index 60d8e2f1..f3d06d94 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -1,7 +1,6 @@ use pallas::network::{ facades::PeerClient, miniprotocols::{chainsync, Point, MAINNET_MAGIC}, - multiplexer::Bearer, }; use tokio::time::Instant; use tracing::info; @@ -66,8 +65,9 @@ async fn main() { // setup a TCP socket to act as data bearer between our agents and the remote // relay. - let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - let mut peer = PeerClient::connect(bearer, MAINNET_MAGIC).await.unwrap(); + let mut peer = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC) + .await + .unwrap(); // fetch an arbitrary batch of block do_blockfetch(&mut peer).await; diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 7fed1b60..6e6170b9 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -1,7 +1,10 @@ +use std::net::{SocketAddr, TcpListener}; +use std::os::unix::net::UnixListener; +use std::path::Path; use thiserror::Error; use tracing::error; -use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable}; +use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber}; use crate::miniprotocols::{ blockfetch, chainsync, handshake, keepalive, localstate, txsubmission, PROTOCOL_N2C_CHAIN_SYNC, @@ -9,7 +12,8 @@ use crate::miniprotocols::{ PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE, PROTOCOL_N2N_TX_SUBMISSION, }; -use crate::multiplexer::{self, Bearer}; + +use crate::multiplexer::{self, Bearer, RunningPlexer}; #[derive(Debug, Error)] pub enum Error { @@ -29,15 +33,15 @@ pub enum Error { /// Client of N2N Ouroboros pub struct PeerClient { plexer: RunningPlexer, - pub handshake: handshake::Confirmation, - pub chainsync: chainsync::N2NClient, - pub blockfetch: blockfetch::Client, - pub txsubmission: txsubmission::Client, - pub keepalive: keepalive::Client, + handshake: handshake::N2NClient, + chainsync: chainsync::N2NClient, + blockfetch: blockfetch::Client, + txsubmission: txsubmission::Client, + keepalive: keepalive::Client, } impl PeerClient { - pub async fn connect(bearer: Bearer, magic: u64) -> Result { + pub fn new(bearer: Bearer) -> Self { let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE); @@ -48,10 +52,28 @@ impl PeerClient { let plexer = plexer.spawn(); + Self { + plexer, + handshake: handshake::Client::new(hs_channel), + chainsync: chainsync::Client::new(cs_channel), + blockfetch: blockfetch::Client::new(bf_channel), + txsubmission: txsubmission::Client::new(txsub_channel), + keepalive: keepalive::Client::new(keepalive_channel), + } + } + + pub async fn connect(addr: &'static str, magic: u64) -> Result { + let bearer = tokio::task::spawn_blocking(move || Bearer::connect_tcp(addr)) + .await + .expect("can't join tokio thread") + .map_err(Error::ConnectFailure)?; + + let mut client = Self::new(bearer); + let versions = handshake::n2n::VersionTable::v7_and_above(magic); - let mut client = handshake::Client::new(hs_channel); let handshake = client + .handshake() .handshake(versions) .await .map_err(Error::HandshakeProtocol)?; @@ -61,14 +83,11 @@ impl PeerClient { return Err(Error::IncompatibleVersion); } - Ok(Self { - plexer, - handshake, - chainsync: chainsync::Client::new(cs_channel), - blockfetch: blockfetch::Client::new(bf_channel), - txsubmission: txsubmission::Client::new(txsub_channel), - keepalive: keepalive::Client::new(keepalive_channel), - }) + Ok(client) + } + + pub fn handshake(&mut self) -> &mut handshake::N2NClient { + &mut self.handshake } pub fn chainsync(&mut self) -> &mut chainsync::N2NClient { @@ -94,15 +113,17 @@ impl PeerClient { /// Server of N2N Ouroboros pub struct PeerServer { - pub plexer: RunningPlexer, - pub version: (VersionNumber, n2n::VersionData), - pub chainsync: chainsync::N2NServer, - pub blockfetch: blockfetch::Server, - pub txsubmission: txsubmission::Server, + plexer: RunningPlexer, + handshake: handshake::N2NServer, + chainsync: chainsync::N2NServer, + blockfetch: blockfetch::Server, + txsubmission: txsubmission::Server, + accepted_address: Option, + accepted_version: Option, } impl PeerServer { - pub async fn serve(bearer: Bearer, magic: u64) -> Result { + pub fn new(bearer: Bearer) -> Self { let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE); @@ -110,32 +131,56 @@ impl PeerServer { let bf_channel = plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH); let txsub_channel = plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION); - let mut server_hs: handshake::Server = handshake::Server::new(hs_channel); - let server_cs = chainsync::N2NServer::new(cs_channel); - let server_bf = blockfetch::Server::new(bf_channel); - let server_txsub = txsubmission::Server::new(txsub_channel); + let hs = handshake::N2NServer::new(hs_channel); + let cs = chainsync::N2NServer::new(cs_channel); + let bf = blockfetch::Server::new(bf_channel); + let txsub = txsubmission::Server::new(txsub_channel); let plexer = plexer.spawn(); - let accepted_version = server_hs + Self { + plexer, + handshake: hs, + chainsync: cs, + blockfetch: bf, + txsubmission: txsub, + accepted_address: None, + accepted_version: None, + } + } + + pub async fn accept( + listener: impl AsRef + Send + 'static, + magic: u64, + ) -> Result { + let (bearer, address) = + tokio::task::spawn_blocking(move || Bearer::accept_tcp(listener.as_ref())) + .await + .expect("can't join tokio thread") + .map_err(Error::ConnectFailure)?; + + let mut client = Self::new(bearer); + + let accepted_version = client + .handshake() .handshake(n2n::VersionTable::v7_and_above(magic)) .await .map_err(Error::HandshakeProtocol)?; - if let Some(ver) = accepted_version { - Ok(Self { - plexer, - version: ver, - chainsync: server_cs, - blockfetch: server_bf, - txsubmission: server_txsub, - }) + if let Some((version, _)) = accepted_version { + client.accepted_address = Some(address); + client.accepted_version = Some(version); + Ok(client) } else { - plexer.abort(); + client.abort(); Err(Error::IncompatibleVersion) } } + pub fn handshake(&mut self) -> &mut handshake::N2NServer { + &mut self.handshake + } + pub fn chainsync(&mut self) -> &mut chainsync::N2NServer { &mut self.chainsync } @@ -156,16 +201,13 @@ impl PeerServer { /// Client of N2C Ouroboros pub struct NodeClient { plexer: RunningPlexer, - pub handshake: handshake::Confirmation, - pub chainsync: chainsync::N2CClient, - pub statequery: localstate::Client, + handshake: handshake::N2CClient, + chainsync: chainsync::N2CClient, + statequery: localstate::Client, } impl NodeClient { - async fn connect_bearer( - bearer: Bearer, - versions: VersionTable, - ) -> Result { + pub fn new(bearer: Bearer) -> Self { let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE); @@ -174,47 +216,39 @@ impl NodeClient { let plexer = plexer.spawn(); - let mut client = handshake::Client::new(hs_channel); - - let handshake = client - .handshake(versions) - .await - .map_err(Error::HandshakeProtocol)?; - - if let handshake::Confirmation::Rejected(reason) = handshake { - error!(?reason, "handshake refused"); - return Err(Error::IncompatibleVersion); - } - - Ok(Self { + Self { plexer, - handshake, + handshake: handshake::Client::new(hs_channel), chainsync: chainsync::Client::new(cs_channel), statequery: localstate::Client::new(sq_channel), - }) - } - - #[cfg(unix)] - pub async fn connect(bearer: Bearer, magic: u64) -> Result { - let versions = handshake::n2c::VersionTable::v10_and_above(magic); - - Self::connect_bearer(bearer, versions).await + } } - #[cfg(windows)] pub async fn connect( - pipe_name: impl AsRef, + path: impl AsRef + Send + 'static, magic: u64, ) -> Result { - debug!("connecting"); - - let bearer = Bearer::connect_named_pipe(pipe_name) + let bearer = tokio::task::spawn_blocking(move || Bearer::connect_unix(path)) .await + .expect("can't join tokio thread") .map_err(Error::ConnectFailure)?; + let mut client = Self::new(bearer); + let versions = handshake::n2c::VersionTable::v10_and_above(magic); - Self::connect_bearer(bearer, versions).await + let handshake = client + .handshake() + .handshake(versions) + .await + .map_err(Error::HandshakeProtocol)?; + + if let handshake::Confirmation::Rejected(reason) = handshake { + error!(?reason, "handshake refused"); + return Err(Error::IncompatibleVersion); + } + + Ok(client) } #[cfg(unix)] @@ -252,6 +286,10 @@ impl NodeClient { } } + pub fn handshake(&mut self) -> &mut handshake::N2CClient { + &mut self.handshake + } + pub fn chainsync(&mut self) -> &mut chainsync::N2CClient { &mut self.chainsync } @@ -268,45 +306,71 @@ impl NodeClient { /// Server of N2C Ouroboros. #[cfg(unix)] pub struct NodeServer { - pub plexer: RunningPlexer, - pub version: (VersionNumber, n2c::VersionData), - pub chainsync: chainsync::N2CServer, - pub statequery: localstate::Server, + plexer: RunningPlexer, + handshake: handshake::N2CServer, + chainsync: chainsync::N2CServer, + statequery: localstate::Server, + accepted_address: Option, + accpeted_version: Option<(VersionNumber, n2c::VersionData)>, } #[cfg(unix)] impl NodeServer { - pub async fn serve(bearer: Bearer, magic: u64) -> Result { + pub async fn new(bearer: Bearer) -> Self { let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE); let cs_channel = plexer.subscribe_server(PROTOCOL_N2C_CHAIN_SYNC); let sq_channel = plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY); - let mut server_hs: handshake::Server = handshake::Server::new(hs_channel); + let server_hs = handshake::Server::::new(hs_channel); let server_cs = chainsync::N2CServer::new(cs_channel); let server_sq = localstate::Server::new(sq_channel); let plexer = plexer.spawn(); - let accepted_version = server_hs + Self { + plexer, + handshake: server_hs, + chainsync: server_cs, + statequery: server_sq, + accepted_address: None, + accpeted_version: None, + } + } + + pub async fn accept( + listener: impl AsRef + Send + 'static, + magic: u64, + ) -> Result { + let (bearer, address) = + tokio::task::spawn_blocking(move || Bearer::accept_unix(listener.as_ref())) + .await + .expect("can't join tokio thread") + .map_err(Error::ConnectFailure)?; + + let mut client = Self::new(bearer).await; + + let accepted_version = client + .handshake() .handshake(n2c::VersionTable::v10_and_above(magic)) .await .map_err(Error::HandshakeProtocol)?; - if let Some(ver) = accepted_version { - Ok(Self { - plexer, - version: ver, - chainsync: server_cs, - statequery: server_sq, - }) + if let Some(version) = accepted_version { + client.accepted_address = Some(address); + client.accpeted_version = Some(version); + Ok(client) } else { - plexer.abort(); + client.abort(); Err(Error::IncompatibleVersion) } } + pub fn handshake(&mut self) -> &mut handshake::N2CServer { + &mut self.handshake + } + pub fn chainsync(&mut self) -> &mut chainsync::N2CServer { &mut self.chainsync } diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 5c968c32..1bf010a2 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -90,19 +90,15 @@ impl Bearer { pub async fn connect_tcp_timeout( addr: impl tcp::ToSocketAddrs, timeout: std::time::Duration, - ) -> Result { - match tokio::time::timeout(timeout, Self::connect_tcp(addr)).await { - Ok(Ok(stream)) => Ok(stream), - Ok(Err(err)) => Err(err), - Err(_) => Err(tokio::io::Error::new( - tokio::io::ErrorKind::TimedOut, - "connection timed out", - )), - } + ) -> IOResult { + let addr = addr.to_socket_addrs()?.next().unwrap(); + let stream = tcp::TcpStream::connect_timeout(&addr, timeout)?; + stream.set_nodelay(true)?; + Ok(Self::Tcp(stream)) } - pub async fn accept_tcp(listener: &tcp::TcpListener) -> tokio::io::Result<(Self, SocketAddr)> { - let (stream, addr) = listener.accept().await?; + pub fn accept_tcp(listener: &tcp::TcpListener) -> IOResult<(Self, tcp::SocketAddr)> { + let (stream, addr) = listener.accept()?; stream.set_nodelay(true)?; Ok((Self::Tcp(stream), addr)) } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index e4b64d71..9cbc2b9e 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -1,5 +1,6 @@ use std::fs; use std::net::{Ipv4Addr, SocketAddrV4}; +use std::sync::Arc; use std::time::Duration; use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap}; @@ -27,8 +28,9 @@ use std::os::unix::net::UnixListener; #[tokio::test] #[ignore] pub async fn chainsync_history_happy_path() { - let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap(); - let mut peer = PeerClient::connect(bearer, 2).await.unwrap(); + let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2) + .await + .unwrap(); let client = peer.chainsync(); @@ -79,8 +81,9 @@ pub async fn chainsync_history_happy_path() { #[tokio::test] #[ignore] pub async fn chainsync_tip_happy_path() { - let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap(); - let mut peer = PeerClient::connect(bearer, 2).await.unwrap(); + let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2) + .await + .unwrap(); let client = peer.chainsync(); @@ -119,8 +122,9 @@ pub async fn chainsync_tip_happy_path() { #[tokio::test] #[ignore] pub async fn blockfetch_happy_path() { - let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap(); - let mut peer = PeerClient::connect(bearer, 2).await.unwrap(); + let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2) + .await + .unwrap(); let client = peer.blockfetch(); @@ -172,21 +176,16 @@ pub async fn blockfetch_server_and_client_happy_path() { hex::decode("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(), ); + let listener = + Arc::new(TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30003)).unwrap()); + let server = tokio::spawn({ let bodies = block_bodies.clone(); let point = point.clone(); async move { // server setup - let listener = - TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)).unwrap(); - - let (bearer, _) = tokio::task::spawn_blocking(move || Bearer::accept_tcp(&listener)) - .await - .unwrap() - .unwrap(); - - let mut peer_server = PeerServer::serve(bearer, 0).await.unwrap(); + let mut peer_server = PeerServer::accept(listener, 0).await.unwrap(); let server_bf = peer_server.blockfetch(); @@ -218,8 +217,7 @@ pub async fn blockfetch_server_and_client_happy_path() { let client = tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; - let bearer = Bearer::connect_tcp("localhost:30001").unwrap(); - let mut client_to_server_conn = PeerClient::connect(bearer, 0).await.unwrap(); + let mut client_to_server_conn = PeerClient::connect("localhost:30003", 0).await.unwrap(); let client_bf = client_to_server_conn.blockfetch(); @@ -388,8 +386,7 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { }); let client = tokio::spawn(async move { - let bearer = Bearer::connect_tcp("localhost:30002").unwrap(); - let mut client_to_server_conn = PeerClient::connect(bearer, 0).await.unwrap(); + let mut client_to_server_conn = PeerClient::connect("localhost:30002", 0).await.unwrap(); let client_cs = client_to_server_conn.chainsync(); @@ -465,20 +462,15 @@ pub async fn local_state_query_server_and_client_happy_path() { let server = tokio::spawn({ async move { // server setup - let socket_path = Path::new("node.socket"); + let socket_path = Path::new("node1.socket"); if socket_path.exists() { fs::remove_file(socket_path).unwrap(); } - let listener = UnixListener::bind(socket_path).unwrap(); + let listener = Arc::new(UnixListener::bind(socket_path).unwrap()); - let (bearer, _) = tokio::task::spawn_blocking(move || Bearer::accept_unix(&listener)) - .await - .unwrap() - .unwrap(); - - let mut server = pallas_network::facades::NodeServer::serve(bearer, 0) + let mut server = pallas_network::facades::NodeServer::accept(listener, 0) .await .unwrap(); @@ -640,11 +632,9 @@ pub async fn local_state_query_server_and_client_happy_path() { tokio::time::sleep(Duration::from_secs(1)).await; // client setup + let socket_path = "node1.socket"; - let socket_path = "node.socket"; - - let bearer = Bearer::connect_unix(&socket_path).unwrap(); - let mut client = NodeClient::connect(bearer, 0).await.unwrap(); + let mut client = NodeClient::connect(socket_path, 0).await.unwrap(); // client sends acquire @@ -790,14 +780,13 @@ pub async fn local_state_query_server_and_client_happy_path() { pub async fn txsubmission_server_and_client_happy_path_n2n() { let test_txs = vec![(vec![0], vec![0, 0, 0]), (vec![1], vec![1, 1, 1])]; + let server_listener = + Arc::new(TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)).unwrap()); + let server = tokio::spawn({ let test_txs = test_txs.clone(); async move { - let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)) - .await - .unwrap(); - - let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap(); + let mut peer_server = PeerServer::accept(server_listener, 0).await.unwrap(); let server_txsub = peer_server.txsubmission(); @@ -871,7 +860,6 @@ pub async fn txsubmission_server_and_client_happy_path_n2n() { let mut mempool = test_txs.clone(); // client setup - let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap(); let client_txsub = client_to_server_conn.txsubmission(); @@ -955,7 +943,6 @@ pub async fn txsubmission_submit_to_mainnet_peer_n2n() { let mempool = vec![(tx_hash, tx_bytes)]; // client setup - let mut client_to_server_conn = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC) .await