diff --git a/.gitignore b/.gitignore index 0fef8b69..92688d40 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ scratchpad .DS_Store RELEASE.md .idea +.vscode diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index a989db1f..60d8e2f1 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -3,6 +3,7 @@ use pallas::network::{ miniprotocols::{chainsync, Point, MAINNET_MAGIC}, multiplexer::Bearer, }; +use tokio::time::Instant; use tracing::info; async fn do_blockfetch(peer: &mut PeerClient) { @@ -36,7 +37,12 @@ async fn do_chainsync(peer: &mut PeerClient) { info!("intersected point is {:?}", point); + let mut keepalive_timer = Instant::now(); for _ in 0..10 { + if keepalive_timer.elapsed().as_secs() > 20 { + peer.keepalive().send_keepalive().await.unwrap(); + keepalive_timer = Instant::now(); + } let next = peer.chainsync().request_next().await.unwrap(); match next { diff --git a/pallas-network/Cargo.toml b/pallas-network/Cargo.toml index 73b74b4e..381d33ea 100644 --- a/pallas-network/Cargo.toml +++ b/pallas-network/Cargo.toml @@ -16,6 +16,7 @@ hex = "0.4.3" itertools = "0.10.5" pallas-codec = { version = "=0.20.0", path = "../pallas-codec" } pallas-crypto = { version = "=0.20.0", path = "../pallas-crypto" } +rand = "0.8.5" thiserror = "1.0.31" tokio = { version = "1", features = ["rt", "net", "io-util", "time", "sync", "macros"] } tracing = "0.1.37" @@ -23,5 +24,4 @@ tracing = "0.1.37" [dev-dependencies] tracing-subscriber = "0.3.16" tokio = { version = "1", features = ["full"] } -rand = "0.8.5" diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 1ee099a3..7fed1b60 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -3,16 +3,13 @@ use tracing::error; use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable}; -use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE; -use crate::multiplexer::RunningPlexer; -use crate::{ - miniprotocols::{ - blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC, - PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH, - PROTOCOL_N2N_CHAIN_SYNC, - }, - multiplexer::{self, Bearer}, +use crate::miniprotocols::{ + blockfetch, chainsync, handshake, keepalive, localstate, txsubmission, PROTOCOL_N2C_CHAIN_SYNC, + PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH, + PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE, + PROTOCOL_N2N_TX_SUBMISSION, }; +use crate::multiplexer::{self, Bearer}; #[derive(Debug, Error)] pub enum Error { @@ -35,20 +32,24 @@ pub struct PeerClient { pub handshake: handshake::Confirmation, pub chainsync: chainsync::N2NClient, pub blockfetch: blockfetch::Client, + pub txsubmission: txsubmission::Client, + pub keepalive: keepalive::Client, } impl PeerClient { pub async fn connect(bearer: Bearer, magic: u64) -> Result { let mut plexer = multiplexer::Plexer::new(bearer); - let channel0 = plexer.subscribe_client(0); - let channel2 = plexer.subscribe_client(2); - let channel3 = plexer.subscribe_client(3); + let hs_channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE); + let cs_channel = plexer.subscribe_client(PROTOCOL_N2N_CHAIN_SYNC); + let bf_channel = plexer.subscribe_client(PROTOCOL_N2N_BLOCK_FETCH); + let txsub_channel = plexer.subscribe_client(PROTOCOL_N2N_TX_SUBMISSION); + let keepalive_channel = plexer.subscribe_client(PROTOCOL_N2N_KEEP_ALIVE); let plexer = plexer.spawn(); let versions = handshake::n2n::VersionTable::v7_and_above(magic); - let mut client = handshake::Client::new(channel0); + let mut client = handshake::Client::new(hs_channel); let handshake = client .handshake(versions) @@ -63,8 +64,10 @@ impl PeerClient { Ok(Self { plexer, handshake, - chainsync: chainsync::Client::new(channel2), - blockfetch: blockfetch::Client::new(channel3), + chainsync: chainsync::Client::new(cs_channel), + blockfetch: blockfetch::Client::new(bf_channel), + txsubmission: txsubmission::Client::new(txsub_channel), + keepalive: keepalive::Client::new(keepalive_channel), }) } @@ -76,6 +79,14 @@ impl PeerClient { &mut self.blockfetch } + pub fn txsubmission(&mut self) -> &mut txsubmission::Client { + &mut self.txsubmission + } + + pub fn keepalive(&mut self) -> &mut keepalive::Client { + &mut self.keepalive + } + pub fn abort(&self) { self.plexer.abort(); } @@ -87,6 +98,7 @@ pub struct PeerServer { pub version: (VersionNumber, n2n::VersionData), pub chainsync: chainsync::N2NServer, pub blockfetch: blockfetch::Server, + pub txsubmission: txsubmission::Server, } impl PeerServer { @@ -96,10 +108,12 @@ impl PeerServer { let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE); let cs_channel = plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC); let bf_channel = plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH); + let txsub_channel = plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION); let mut server_hs: handshake::Server = handshake::Server::new(hs_channel); let server_cs = chainsync::N2NServer::new(cs_channel); let server_bf = blockfetch::Server::new(bf_channel); + let server_txsub = txsubmission::Server::new(txsub_channel); let plexer = plexer.spawn(); @@ -114,6 +128,7 @@ impl PeerServer { version: ver, chainsync: server_cs, blockfetch: server_bf, + txsubmission: server_txsub, }) } else { plexer.abort(); @@ -129,6 +144,10 @@ impl PeerServer { &mut self.blockfetch } + pub fn txsubmission(&mut self) -> &mut txsubmission::Server { + &mut self.txsubmission + } + pub fn abort(&self) { self.plexer.abort(); } diff --git a/pallas-network/src/miniprotocols/handshake/n2n.rs b/pallas-network/src/miniprotocols/handshake/n2n.rs index b5841231..ad7bc178 100644 --- a/pallas-network/src/miniprotocols/handshake/n2n.rs +++ b/pallas-network/src/miniprotocols/handshake/n2n.rs @@ -1,27 +1,38 @@ use std::collections::HashMap; -use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; +use pallas_codec::minicbor::{decode, Decode, Decoder, encode, Encode, Encoder}; pub type VersionTable = super::protocol::VersionTable; -const PROTOCOL_V4: u64 = 4; -const PROTOCOL_V5: u64 = 5; -const PROTOCOL_V6: u64 = 6; const PROTOCOL_V7: u64 = 7; const PROTOCOL_V8: u64 = 8; const PROTOCOL_V9: u64 = 9; const PROTOCOL_V10: u64 = 10; +const PROTOCOL_V11: u64 = 11; +const PROTOCOL_V12: u64 = 12; +const PROTOCOL_V13: u64 = 13; + +const PEER_SHARING_DISABLED: u8 = 0; impl VersionTable { pub fn v4_and_above(network_magic: u64) -> VersionTable { + // Older versions are not supported anymore (removed from network-spec.pdf). + // Try not to break compatibility with older pallas users. + return Self::v7_and_above(network_magic); + } + + pub fn v6_and_above(network_magic: u64) -> VersionTable { + // Older versions are not supported anymore (removed from network-spec.pdf). + // Try not to break compatibility with older pallas users. + return Self::v7_and_above(network_magic); + } + + pub fn v7_to_v10(network_magic: u64) -> VersionTable { let values = vec![ - (PROTOCOL_V4, VersionData::new(network_magic, false)), - (PROTOCOL_V5, VersionData::new(network_magic, false)), - (PROTOCOL_V6, VersionData::new(network_magic, false)), - (PROTOCOL_V7, VersionData::new(network_magic, false)), - (PROTOCOL_V8, VersionData::new(network_magic, false)), - (PROTOCOL_V9, VersionData::new(network_magic, false)), - (PROTOCOL_V10, VersionData::new(network_magic, false)), + (PROTOCOL_V7, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V8, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V9, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V10, VersionData::new(network_magic, false, None, None)), ] .into_iter() .collect::>(); @@ -29,13 +40,15 @@ impl VersionTable { VersionTable { values } } - pub fn v6_and_above(network_magic: u64) -> VersionTable { + pub fn v7_and_above(network_magic: u64) -> VersionTable { let values = vec![ - (PROTOCOL_V6, VersionData::new(network_magic, false)), - (PROTOCOL_V7, VersionData::new(network_magic, false)), - (PROTOCOL_V8, VersionData::new(network_magic, false)), - (PROTOCOL_V9, VersionData::new(network_magic, false)), - (PROTOCOL_V10, VersionData::new(network_magic, false)), + (PROTOCOL_V7, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V8, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V9, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V10, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V11, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V12, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V13, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), ] .into_iter() .collect::>(); @@ -43,12 +56,11 @@ impl VersionTable { VersionTable { values } } - pub fn v7_and_above(network_magic: u64) -> VersionTable { + pub fn v11_and_above(network_magic: u64) -> VersionTable { let values = vec![ - (PROTOCOL_V7, VersionData::new(network_magic, false)), - (PROTOCOL_V8, VersionData::new(network_magic, false)), - (PROTOCOL_V9, VersionData::new(network_magic, false)), - (PROTOCOL_V10, VersionData::new(network_magic, false)), + (PROTOCOL_V11, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V12, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V13, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), ] .into_iter() .collect::>(); @@ -61,13 +73,17 @@ impl VersionTable { pub struct VersionData { network_magic: u64, initiator_and_responder_diffusion_mode: bool, + peer_sharing: Option, + query: Option, } impl VersionData { - pub fn new(network_magic: u64, initiator_and_responder_diffusion_mode: bool) -> Self { + pub fn new(network_magic: u64, initiator_and_responder_diffusion_mode: bool, peer_sharing: Option, query: Option) -> Self { VersionData { network_magic, initiator_and_responder_diffusion_mode, + peer_sharing, + query, } } } @@ -78,9 +94,20 @@ impl Encode<()> for VersionData { e: &mut Encoder, _ctx: &mut (), ) -> Result<(), encode::Error> { - e.array(2)? - .u64(self.network_magic)? - .bool(self.initiator_and_responder_diffusion_mode)?; + match (self.peer_sharing, self.query) { + (Some(peer_sharing), Some(query)) => { + e.array(4)? + .u64(self.network_magic)? + .bool(self.initiator_and_responder_diffusion_mode)? + .u8(peer_sharing)? + .bool(query)?; + }, + _ => { + e.array(2)? + .u64(self.network_magic)? + .bool(self.initiator_and_responder_diffusion_mode)?; + }, + }; Ok(()) } @@ -88,13 +115,23 @@ impl Encode<()> for VersionData { impl<'b> Decode<'b, ()> for VersionData { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - d.array()?; + let len = d.array()?; let network_magic = d.u64()?; let initiator_and_responder_diffusion_mode = d.bool()?; + let peer_sharing = match len { + Some(4) => Some(d.u8()?), + _ => None, + }; + let query = match len { + Some(4) => Some(d.bool()?), + _ => None, + }; Ok(Self { network_magic, initiator_and_responder_diffusion_mode, + peer_sharing, + query, }) } } diff --git a/pallas-network/src/miniprotocols/keepalive/client.rs b/pallas-network/src/miniprotocols/keepalive/client.rs new file mode 100644 index 00000000..45aa48b4 --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/client.rs @@ -0,0 +1,132 @@ +use std::fmt::Debug; +use rand::Rng; +use thiserror::*; +use tracing::debug; + +use super::protocol::*; +use crate::multiplexer; + +#[derive(Error, Debug)] +pub enum Error { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("keepalive cookie mismatch")] + KeepAliveCookieMismatch, + + #[error("error while sending or receiving data through the channel")] + Plexer(multiplexer::Error), +} + +pub struct KeepAliveSharedState { + saved_cookie: u16, +} + +pub struct Client(State, multiplexer::ChannelBuffer, KeepAliveSharedState); + +impl Client { + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Client, multiplexer::ChannelBuffer::new(channel), KeepAliveSharedState{ saved_cookie: 0 }) + } + + pub fn state(&self) -> &State { + &self.0 + } + + pub fn is_done(&self) -> bool { + self.0 == State::Done + } + + fn has_agency(&self) -> bool { + match &self.0 { + State::Client => true, + State::Server => false, + State::Done => false, + } + } + + fn assert_agency_is_ours(&self) -> Result<(), Error> { + if !self.has_agency() { + Err(Error::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), Error> { + if self.has_agency() { + Err(Error::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Client, Message::KeepAlive(..)) => Ok(()), + (State::Client, Message::Done) => Ok(()), + _ => Err(Error::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Server, Message::ResponseKeepAlive(..)) => Ok(()), + _ => Err(Error::InvalidInbound), + } + } + + pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?; + + Ok(()) + } + + pub async fn recv_message(&mut self) -> Result { + self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?; + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + pub async fn send_keepalive(&mut self) -> Result<(), Error> { + // generate random cookie value + let cookie = rand::thread_rng().gen::(); + let msg = Message::KeepAlive(cookie); + self.send_message(&msg).await?; + self.2.saved_cookie = cookie; + self.0 = State::Server; + debug!("sent keepalive message with cookie {}", cookie); + + self.recv_while_sending_keepalive().await?; + + Ok(()) + } + + async fn recv_while_sending_keepalive(&mut self) -> Result<(), Error> { + match self.recv_message().await? { + Message::ResponseKeepAlive(cookie) => { + debug!("received keepalive response with cookie {}", cookie); + if cookie == self.2.saved_cookie { + self.0 = State::Client; + Ok(()) + } else { + Err(Error::KeepAliveCookieMismatch) + } + } + _ => Err(Error::InvalidInbound), + } + } +} diff --git a/pallas-network/src/miniprotocols/keepalive/codec.rs b/pallas-network/src/miniprotocols/keepalive/codec.rs new file mode 100644 index 00000000..e00ed9b0 --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/codec.rs @@ -0,0 +1,49 @@ +use super::protocol::*; +use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder}; + +impl Encode<()> for Message { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + Message::KeepAlive(cookie) => { + e.array(2)?.u16(0)?; + e.encode(cookie)?; + }, + Message::ResponseKeepAlive(cookie) => { + e.array(2)?.u16(1)?; + e.encode(cookie)?; + }, + Message::Done => { + e.array(1)?.u16(2)?; + }, + } + + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for Message { + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + _ctx: &mut (), + ) -> Result { + d.array()?; + let label = d.u16()?; + + match label { + 0 => { + let cookie = d.decode()?; + Ok(Message::KeepAlive(cookie)) + } + 1 => { + let cookie = d.decode()?; + Ok(Message::ResponseKeepAlive(cookie)) + } + 2 => Ok(Message::Done), + _ => Err(decode::Error::message("can't decode Message")), + } + } +} diff --git a/pallas-network/src/miniprotocols/keepalive/mod.rs b/pallas-network/src/miniprotocols/keepalive/mod.rs new file mode 100644 index 00000000..a9eaa043 --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/mod.rs @@ -0,0 +1,7 @@ +mod client; +mod codec; +mod protocol; + +pub use client::*; +pub use codec::*; +pub use protocol::*; diff --git a/pallas-network/src/miniprotocols/keepalive/protocol.rs b/pallas-network/src/miniprotocols/keepalive/protocol.rs new file mode 100644 index 00000000..121228c3 --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/protocol.rs @@ -0,0 +1,15 @@ +pub type KeepAliveCookie = u16; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum State { + Client, + Server, + Done, +} + +#[derive(Debug, Clone)] +pub enum Message { + KeepAlive(KeepAliveCookie), + ResponseKeepAlive(KeepAliveCookie), + Done, +} diff --git a/pallas-network/src/miniprotocols/mod.rs b/pallas-network/src/miniprotocols/mod.rs index 6c452db2..dbbd59c1 100644 --- a/pallas-network/src/miniprotocols/mod.rs +++ b/pallas-network/src/miniprotocols/mod.rs @@ -5,6 +5,7 @@ mod common; pub mod blockfetch; pub mod chainsync; pub mod handshake; +pub mod keepalive; pub mod localstate; pub mod localtxsubmission; pub mod txmonitor; diff --git a/pallas-network/src/miniprotocols/txsubmission/client.rs b/pallas-network/src/miniprotocols/txsubmission/client.rs index c6696605..1dea3cdd 100644 --- a/pallas-network/src/miniprotocols/txsubmission/client.rs +++ b/pallas-network/src/miniprotocols/txsubmission/client.rs @@ -132,14 +132,16 @@ where pub async fn next_request(&mut self) -> Result, Error> { match self.recv_message().await? { - Message::RequestTxIds(blocking, ack, req) => { - self.0 = State::TxIdsBlocking; - - match blocking { - true => Ok(Request::TxIds(ack, req)), - false => Ok(Request::TxIdsNonBlocking(ack, req)), + Message::RequestTxIds(blocking, ack, req) => match blocking { + true => { + self.0 = State::TxIdsBlocking; + Ok(Request::TxIds(ack, req)) } - } + false => { + self.0 = State::TxIdsNonBlocking; + Ok(Request::TxIdsNonBlocking(ack, req)) + } + }, Message::RequestTxs(x) => { self.0 = State::Txs; Ok(Request::Txs(x)) diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 90eb7ffb..5c968c32 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -87,8 +87,22 @@ impl Bearer { Ok(Self::Tcp(stream)) } - pub fn accept_tcp(listener: &tcp::TcpListener) -> IOResult<(Self, tcp::SocketAddr)> { - let (stream, addr) = listener.accept()?; + pub async fn connect_tcp_timeout( + addr: impl tcp::ToSocketAddrs, + timeout: std::time::Duration, + ) -> Result { + match tokio::time::timeout(timeout, Self::connect_tcp(addr)).await { + Ok(Ok(stream)) => Ok(stream), + Ok(Err(err)) => Err(err), + Err(_) => Err(tokio::io::Error::new( + tokio::io::ErrorKind::TimedOut, + "connection timed out", + )), + } + } + + pub async fn accept_tcp(listener: &tcp::TcpListener) -> tokio::io::Result<(Self, SocketAddr)> { + let (stream, addr) = listener.accept().await?; stream.set_nodelay(true)?; Ok((Self::Tcp(stream), addr)) } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 9893c728..e4b64d71 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -10,12 +10,13 @@ use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::localstate::queries_v16::{Addr, Addrs, Value}; use pallas_network::miniprotocols::localstate::ClientQueryRequest; +use pallas_network::miniprotocols::txsubmission::{EraTxBody, TxIdAndSize}; use pallas_network::miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, Point, }; -use pallas_network::miniprotocols::{handshake, localstate}; +use pallas_network::miniprotocols::{handshake, localstate, txsubmission, MAINNET_MAGIC}; use pallas_network::multiplexer::{Bearer, Plexer}; use std::net::TcpListener; use std::path::Path; @@ -289,7 +290,7 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { server_hs.receive_proposed_versions().await.unwrap(); server_hs - .accept_version(10, VersionData::new(0, false)) + .accept_version(10, VersionData::new(0, false, None, None)) .await .unwrap(); @@ -783,3 +784,270 @@ pub async fn local_state_query_server_and_client_happy_path() { _ = tokio::join!(client, server); } + +#[tokio::test] +#[ignore] +pub async fn txsubmission_server_and_client_happy_path_n2n() { + let test_txs = vec![(vec![0], vec![0, 0, 0]), (vec![1], vec![1, 1, 1])]; + + let server = tokio::spawn({ + let test_txs = test_txs.clone(); + async move { + let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)) + .await + .unwrap(); + + let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap(); + + let server_txsub = peer_server.txsubmission(); + + // server waits for init + + server_txsub.wait_for_init().await.unwrap(); + + // server requests some tx ids + + server_txsub + .acknowledge_and_request_tx_ids(false, 0, 2) + .await + .unwrap(); + + assert_eq!(*server_txsub.state(), txsubmission::State::TxIdsNonBlocking); + + // server receives tx ids + + let txids = match server_txsub.receive_next_reply().await.unwrap() { + txsubmission::Reply::TxIds(x) => x, + _ => panic!("unexpected message"), + }; + + assert_eq!(*server_txsub.state(), txsubmission::State::Idle); + + // server requests txs for ids + + let txids: Vec<_> = txids.into_iter().map(|t| t.0).collect(); + + assert_eq!(txids[0].1, test_txs[0].0); + assert_eq!(txids[1].1, test_txs[1].0); + + server_txsub.request_txs(txids).await.unwrap(); + + assert_eq!(*server_txsub.state(), txsubmission::State::Txs); + + // server receives txs + + let txs = match server_txsub.receive_next_reply().await.unwrap() { + txsubmission::Reply::Txs(x) => x, + _ => panic!("unexpected message"), + }; + + assert_eq!(*server_txsub.state(), txsubmission::State::Idle); + + assert_eq!(txs[0].1, test_txs[0].1); + assert_eq!(txs[1].1, test_txs[1].1); + + // server requests more tx ids (blocking) + + server_txsub + .acknowledge_and_request_tx_ids(true, 2, 1) + .await + .unwrap(); + + assert_eq!(*server_txsub.state(), txsubmission::State::TxIdsBlocking); + + // server receives done from client + + match server_txsub.receive_next_reply().await.unwrap() { + txsubmission::Reply::Done => (), + _ => panic!("unexpected message"), + } + + assert_eq!(*server_txsub.state(), txsubmission::State::Done); + } + }); + + let client = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(3)).await; + let mut mempool = test_txs.clone(); + + // client setup + + let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap(); + + let client_txsub = client_to_server_conn.txsubmission(); + + // send init + + client_txsub.send_init().await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive ids request from server + + let (_, req) = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIdsNonBlocking(ack, req) => (ack, req), + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + + // send ids to server + + let to_send = mempool.drain(..req as usize).collect::>(); + + let ids_and_size = to_send + .clone() + .into_iter() + .map(|(h, b)| TxIdAndSize(txsubmission::EraTxId(0, h), b.len() as u32)) + .collect(); + + client_txsub.reply_tx_ids(ids_and_size).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive txs request from server + + let ids = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::Txs(ids) => ids, + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::Txs); + + assert_eq!(ids[0].1, test_txs[0].0); + assert_eq!(ids[1].1, test_txs[1].0); + + // send txs to server + + let txs_to_send: Vec<_> = to_send.into_iter().map(|(_, b)| EraTxBody(0, b)).collect(); + + client_txsub.reply_txs(txs_to_send).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive tx ids request from server (blocking) + + match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIds(_, _) => (), + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking); + + // send done to server + + client_txsub.send_done().await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Done); + }); + + _ = tokio::join!(client, server); +} + +#[tokio::test] +#[ignore] +pub async fn txsubmission_submit_to_mainnet_peer_n2n() { + let tx_hash = + hex::decode("8b6e50e09376b5021e93fe688ba9e7100e3682cebcb39970af5f4e5962bc5a3d").unwrap(); + let tx_hex = include_str!("../../test_data/babbage11.tx"); + let tx_bytes = hex::decode(tx_hex).unwrap(); + + let mempool = vec![(tx_hash, tx_bytes)]; + + // client setup + + let mut client_to_server_conn = + PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC) + .await + .unwrap(); + + let client_txsub = client_to_server_conn.txsubmission(); + + // send init + + client_txsub.send_init().await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive ids request from server + + let ack = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIds(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking); + ack + } + txsubmission::Request::TxIdsNonBlocking(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + ack + } + _ => panic!("unexpected message"), + }; + + assert_eq!(ack, 0); + + // send ids to server + + let to_send = mempool.clone(); + + let ids_and_size = to_send + .clone() + .into_iter() + .map(|(h, b)| TxIdAndSize(txsubmission::EraTxId(4, h), b.len() as u32)) + .collect(); + + client_txsub.reply_tx_ids(ids_and_size).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive txs request from server + + let ids = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::Txs(ids) => ids, + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::Txs); + + assert_eq!(ids[0].1, mempool[0].0); + + // send txs to server + + let txs_to_send: Vec<_> = to_send.into_iter().map(|(_, b)| EraTxBody(4, b)).collect(); + + client_txsub.reply_txs(txs_to_send).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive tx ids request from server (blocking) + + // server usually sends another request before processing/acknowledging our + // previous response, so ack is 0. the ack comes in the next message. + match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIdsNonBlocking(_, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + } + _ => panic!("unexpected message"), + }; + + client_txsub.reply_tx_ids(vec![]).await.unwrap(); + + let ack = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIds(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking); + + client_txsub.send_done().await.unwrap(); + assert_eq!(*client_txsub.state(), txsubmission::State::Done); + + ack + } + txsubmission::Request::TxIdsNonBlocking(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + + ack + } + _ => panic!("unexpected message"), + }; + + // server should acknowledge the one transaction we sent now + assert_eq!(ack, 1); +} diff --git a/test_data/babbage11.tx b/test_data/babbage11.tx new file mode 100644 index 00000000..71c70452 --- /dev/null +++ b/test_data/babbage11.tx @@ -0,0 +1 @@ +84a5008182582030edb55f21419f693940372b080fa931d3f7340f9d362573d1b8a20bcc7f208000018182583901c04c6e21cc83f7322439d5cd6950bd36d114da35859fe01fbb31da1f58646661658b029b6906bd3a5b35150cf1b274cbdffd2f504119eb9f1a009b83c1021a000618f0031a20b40da0048183028200581c58646661658b029b6906bd3a5b35150cf1b274cbdffd2f504119eb9f581cae66e56ab11ccb39e882669f220a37956c683e4ce84fefd910012d7aa1008282582080d6e8a2928c2b54cfe94a607c246de2062dc32fd49bd7e522cc210fe581c45b58404e17fe684a23f53e30abc9024f8333419e8eba5dce149dd8d7d0e641b8e05d893a48202c05bc23c983a15b17f1fc9641b6ebb544e3d787518150f2e6e097990a8258202e4103fac5c3fbd804802647dc1c6c9a3610584ba73a77f70dc5a4f962334a325840635471d189d510d6e8d18f28ec73d9cc191f86e625610710ed1695d484397bd32e0b67078ce70daea328c065af1e887a17bd08950623504b29b2e82c804d7607f5f6 \ No newline at end of file