Skip to content

Commit

Permalink
Merge branch 'main' into refactor/split-bearer-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 20, 2023
2 parents e139765 + 1ed2161 commit 63c0911
Show file tree
Hide file tree
Showing 14 changed files with 606 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ scratchpad
.DS_Store
RELEASE.md
.idea
.vscode
6 changes: 6 additions & 0 deletions examples/n2n-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use pallas::network::{
miniprotocols::{chainsync, Point, MAINNET_MAGIC},
multiplexer::Bearer,
};
use tokio::time::Instant;
use tracing::info;

async fn do_blockfetch(peer: &mut PeerClient) {
Expand Down Expand Up @@ -36,7 +37,12 @@ async fn do_chainsync(peer: &mut PeerClient) {

info!("intersected point is {:?}", point);

let mut keepalive_timer = Instant::now();
for _ in 0..10 {
if keepalive_timer.elapsed().as_secs() > 20 {
peer.keepalive().send_keepalive().await.unwrap();
keepalive_timer = Instant::now();
}
let next = peer.chainsync().request_next().await.unwrap();

match next {
Expand Down
2 changes: 1 addition & 1 deletion pallas-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ hex = "0.4.3"
itertools = "0.10.5"
pallas-codec = { version = "=0.20.0", path = "../pallas-codec" }
pallas-crypto = { version = "=0.20.0", path = "../pallas-crypto" }
rand = "0.8.5"
thiserror = "1.0.31"
tokio = { version = "1", features = ["rt", "net", "io-util", "time", "sync", "macros"] }
tracing = "0.1.37"

[dev-dependencies]
tracing-subscriber = "0.3.16"
tokio = { version = "1", features = ["full"] }
rand = "0.8.5"

49 changes: 34 additions & 15 deletions pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ use tracing::error;

use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable};

use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE;
use crate::multiplexer::RunningPlexer;
use crate::{
miniprotocols::{
blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH,
PROTOCOL_N2N_CHAIN_SYNC,
},
multiplexer::{self, Bearer},
use crate::miniprotocols::{
blockfetch, chainsync, handshake, keepalive, localstate, txsubmission, PROTOCOL_N2C_CHAIN_SYNC,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH,
PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE,
PROTOCOL_N2N_TX_SUBMISSION,
};
use crate::multiplexer::{self, Bearer};

#[derive(Debug, Error)]
pub enum Error {
Expand All @@ -35,20 +32,24 @@ pub struct PeerClient {
pub handshake: handshake::Confirmation<handshake::n2n::VersionData>,
pub chainsync: chainsync::N2NClient,
pub blockfetch: blockfetch::Client,
pub txsubmission: txsubmission::Client,
pub keepalive: keepalive::Client,
}

impl PeerClient {
pub async fn connect(bearer: Bearer, magic: u64) -> Result<Self, Error> {
let mut plexer = multiplexer::Plexer::new(bearer);

let channel0 = plexer.subscribe_client(0);
let channel2 = plexer.subscribe_client(2);
let channel3 = plexer.subscribe_client(3);
let hs_channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE);
let cs_channel = plexer.subscribe_client(PROTOCOL_N2N_CHAIN_SYNC);
let bf_channel = plexer.subscribe_client(PROTOCOL_N2N_BLOCK_FETCH);
let txsub_channel = plexer.subscribe_client(PROTOCOL_N2N_TX_SUBMISSION);
let keepalive_channel = plexer.subscribe_client(PROTOCOL_N2N_KEEP_ALIVE);

let plexer = plexer.spawn();

let versions = handshake::n2n::VersionTable::v7_and_above(magic);
let mut client = handshake::Client::new(channel0);
let mut client = handshake::Client::new(hs_channel);

let handshake = client
.handshake(versions)
Expand All @@ -63,8 +64,10 @@ impl PeerClient {
Ok(Self {
plexer,
handshake,
chainsync: chainsync::Client::new(channel2),
blockfetch: blockfetch::Client::new(channel3),
chainsync: chainsync::Client::new(cs_channel),
blockfetch: blockfetch::Client::new(bf_channel),
txsubmission: txsubmission::Client::new(txsub_channel),
keepalive: keepalive::Client::new(keepalive_channel),
})
}

Expand All @@ -76,6 +79,14 @@ impl PeerClient {
&mut self.blockfetch
}

pub fn txsubmission(&mut self) -> &mut txsubmission::Client {
&mut self.txsubmission
}

pub fn keepalive(&mut self) -> &mut keepalive::Client {
&mut self.keepalive
}

pub fn abort(&self) {
self.plexer.abort();
}
Expand All @@ -87,6 +98,7 @@ pub struct PeerServer {
pub version: (VersionNumber, n2n::VersionData),
pub chainsync: chainsync::N2NServer,
pub blockfetch: blockfetch::Server,
pub txsubmission: txsubmission::Server,
}

impl PeerServer {
Expand All @@ -96,10 +108,12 @@ impl PeerServer {
let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE);
let cs_channel = plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC);
let bf_channel = plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH);
let txsub_channel = plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION);

let mut server_hs: handshake::Server<n2n::VersionData> = handshake::Server::new(hs_channel);
let server_cs = chainsync::N2NServer::new(cs_channel);
let server_bf = blockfetch::Server::new(bf_channel);
let server_txsub = txsubmission::Server::new(txsub_channel);

let plexer = plexer.spawn();

Expand All @@ -114,6 +128,7 @@ impl PeerServer {
version: ver,
chainsync: server_cs,
blockfetch: server_bf,
txsubmission: server_txsub,
})
} else {
plexer.abort();
Expand All @@ -129,6 +144,10 @@ impl PeerServer {
&mut self.blockfetch
}

pub fn txsubmission(&mut self) -> &mut txsubmission::Server {
&mut self.txsubmission
}

pub fn abort(&self) {
self.plexer.abort();
}
Expand Down
91 changes: 64 additions & 27 deletions pallas-network/src/miniprotocols/handshake/n2n.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,66 @@
use std::collections::HashMap;

use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use pallas_codec::minicbor::{decode, Decode, Decoder, encode, Encode, Encoder};

pub type VersionTable = super::protocol::VersionTable<VersionData>;

const PROTOCOL_V4: u64 = 4;
const PROTOCOL_V5: u64 = 5;
const PROTOCOL_V6: u64 = 6;
const PROTOCOL_V7: u64 = 7;
const PROTOCOL_V8: u64 = 8;
const PROTOCOL_V9: u64 = 9;
const PROTOCOL_V10: u64 = 10;
const PROTOCOL_V11: u64 = 11;
const PROTOCOL_V12: u64 = 12;
const PROTOCOL_V13: u64 = 13;

const PEER_SHARING_DISABLED: u8 = 0;

impl VersionTable {
pub fn v4_and_above(network_magic: u64) -> VersionTable {
// Older versions are not supported anymore (removed from network-spec.pdf).
// Try not to break compatibility with older pallas users.
return Self::v7_and_above(network_magic);
}

pub fn v6_and_above(network_magic: u64) -> VersionTable {
// Older versions are not supported anymore (removed from network-spec.pdf).
// Try not to break compatibility with older pallas users.
return Self::v7_and_above(network_magic);
}

pub fn v7_to_v10(network_magic: u64) -> VersionTable {
let values = vec![
(PROTOCOL_V4, VersionData::new(network_magic, false)),
(PROTOCOL_V5, VersionData::new(network_magic, false)),
(PROTOCOL_V6, VersionData::new(network_magic, false)),
(PROTOCOL_V7, VersionData::new(network_magic, false)),
(PROTOCOL_V8, VersionData::new(network_magic, false)),
(PROTOCOL_V9, VersionData::new(network_magic, false)),
(PROTOCOL_V10, VersionData::new(network_magic, false)),
(PROTOCOL_V7, VersionData::new(network_magic, false, None, None)),
(PROTOCOL_V8, VersionData::new(network_magic, false, None, None)),
(PROTOCOL_V9, VersionData::new(network_magic, false, None, None)),
(PROTOCOL_V10, VersionData::new(network_magic, false, None, None)),
]
.into_iter()
.collect::<HashMap<u64, VersionData>>();

VersionTable { values }
}

pub fn v6_and_above(network_magic: u64) -> VersionTable {
pub fn v7_and_above(network_magic: u64) -> VersionTable {
let values = vec![
(PROTOCOL_V6, VersionData::new(network_magic, false)),
(PROTOCOL_V7, VersionData::new(network_magic, false)),
(PROTOCOL_V8, VersionData::new(network_magic, false)),
(PROTOCOL_V9, VersionData::new(network_magic, false)),
(PROTOCOL_V10, VersionData::new(network_magic, false)),
(PROTOCOL_V7, VersionData::new(network_magic, false, None, None)),
(PROTOCOL_V8, VersionData::new(network_magic, false, None, None)),
(PROTOCOL_V9, VersionData::new(network_magic, false, None, None)),
(PROTOCOL_V10, VersionData::new(network_magic, false, None, None)),
(PROTOCOL_V11, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))),
(PROTOCOL_V12, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))),
(PROTOCOL_V13, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))),
]
.into_iter()
.collect::<HashMap<u64, VersionData>>();

VersionTable { values }
}

pub fn v7_and_above(network_magic: u64) -> VersionTable {
pub fn v11_and_above(network_magic: u64) -> VersionTable {
let values = vec![
(PROTOCOL_V7, VersionData::new(network_magic, false)),
(PROTOCOL_V8, VersionData::new(network_magic, false)),
(PROTOCOL_V9, VersionData::new(network_magic, false)),
(PROTOCOL_V10, VersionData::new(network_magic, false)),
(PROTOCOL_V11, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))),
(PROTOCOL_V12, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))),
(PROTOCOL_V13, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))),
]
.into_iter()
.collect::<HashMap<u64, VersionData>>();
Expand All @@ -61,13 +73,17 @@ impl VersionTable {
pub struct VersionData {
network_magic: u64,
initiator_and_responder_diffusion_mode: bool,
peer_sharing: Option<u8>,
query: Option<bool>,
}

impl VersionData {
pub fn new(network_magic: u64, initiator_and_responder_diffusion_mode: bool) -> Self {
pub fn new(network_magic: u64, initiator_and_responder_diffusion_mode: bool, peer_sharing: Option<u8>, query: Option<bool>) -> Self {
VersionData {
network_magic,
initiator_and_responder_diffusion_mode,
peer_sharing,
query,
}
}
}
Expand All @@ -78,23 +94,44 @@ impl Encode<()> for VersionData {
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.array(2)?
.u64(self.network_magic)?
.bool(self.initiator_and_responder_diffusion_mode)?;
match (self.peer_sharing, self.query) {
(Some(peer_sharing), Some(query)) => {
e.array(4)?
.u64(self.network_magic)?
.bool(self.initiator_and_responder_diffusion_mode)?
.u8(peer_sharing)?
.bool(query)?;
},
_ => {
e.array(2)?
.u64(self.network_magic)?
.bool(self.initiator_and_responder_diffusion_mode)?;
},
};

Ok(())
}
}

impl<'b> Decode<'b, ()> for VersionData {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let len = d.array()?;
let network_magic = d.u64()?;
let initiator_and_responder_diffusion_mode = d.bool()?;
let peer_sharing = match len {
Some(4) => Some(d.u8()?),
_ => None,
};
let query = match len {
Some(4) => Some(d.bool()?),
_ => None,
};

Ok(Self {
network_magic,
initiator_and_responder_diffusion_mode,
peer_sharing,
query,
})
}
}
Loading

0 comments on commit 63c0911

Please sign in to comment.