Skip to content

Commit

Permalink
fix: implement missing chainsync logic (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored May 25, 2024
1 parent 06aa492 commit e44b7e4
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 184 deletions.
287 changes: 193 additions & 94 deletions src/serve/ouroboros/chainsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,136 +7,235 @@ use tracing::{debug, info, instrument};

use crate::{
prelude::Error,
wal::{self, redb::WalStore, ReadUtils, WalReader},
wal::{
self, redb::WalStore, ChainPoint, LogEntry, LogSeq, LogValue, RawBlock, ReadUtils,
WalReader,
},
};

pub struct State<'a> {
pub struct Session<'a> {
wal: WalStore,
cursor: Option<wal::redb::WalIter<'a>>,
current_iterator: Option<wal::redb::WalIter<'a>>,
is_new_intersection: bool,
last_known_seq: Option<LogSeq>,
connection: N2NServer,
}

#[instrument(skip_all)]
async fn handle_next_request(state: &mut State<'_>) -> Result<(), Error> {
info!("handling next request");

let next = state
.cursor
.as_mut()
.ok_or(Error::custom("requesting next without intersection"))?
.filter_forward()
.into_blocks()
.flatten()
.next();

let tip = state
.wal
.find_tip()
.map_err(Error::server)?
.map(|(_, x)| Tip(x.into(), 0))
.unwrap_or(Tip(Point::Origin, 0));

if let Some(block) = next {
state
.connection
.send_roll_forward(super::convert::header_cbor_to_chainsync(block)?, tip)
.await
.map_err(Error::server)?;
} else {
state
.connection
.send_await_reply()
.await
.map_err(Error::server)?;

debug!("waiting for tip change notification");
state.wal.tip_change().await.map_err(Error::server)?;
impl<'a> Session<'a> {
fn prepare_tip(&self) -> Result<Tip, Error> {
let tip = self
.wal
.find_tip()
.map_err(Error::server)?
.map(|(_, x)| Tip(x.into(), 0))
.unwrap_or(Tip(Point::Origin, 0));

todo!("tip chainsync not implemented yet");
Ok(tip)
}

Ok(())
}

#[instrument(skip_all)]
async fn handle_intersect(state: &mut State<'_>, points: Vec<Point>) -> Result<(), Error> {
info!(?points, "handling intersect request");

let tip = state
.wal
.find_tip()
.map_err(Error::server)?
// TODO: send real value for block height
.map(|(_, x)| Tip(x.into(), 0))
.unwrap_or(Tip(Point::Origin, 0));

// if points are empty it means that client wants to start sync from origin.
if points.is_empty() {
state
.connection
.send_intersect_found(Point::Origin, tip)
.await
.map_err(Error::server)?;
fn restart_iterator(&mut self) -> Result<(), Error> {
let seq = self
.last_known_seq
.as_ref()
.expect("broken invariant, we should have a last seen seq");

return Ok(());
}
self.current_iterator = self
.wal
.crawl_from(Some(*seq))
.map_err(Error::server)?
.into();

let points = points.into_iter().map(From::from).collect_vec();
// we need to skip the first item since we're already seen it
self.current_iterator.as_mut().unwrap().next();

let seq = state.wal.find_intersect(&points).map_err(Error::server)?;
Ok(())
}

if let Some((seq, point)) = seq {
info!(?point, "found intersect point");
async fn send_intersect_found(&mut self, seq: LogSeq, point: ChainPoint) -> Result<(), Error> {
debug!("sending intersection found");

state.cursor = state
self.current_iterator = self
.wal
.crawl_from(Some(seq))
.map_err(Error::server)?
.into();

state
.connection
self.is_new_intersection = true;

let tip = self.prepare_tip()?;

self.connection
.send_intersect_found(point.into(), tip)
.await
.map_err(Error::server)
} else {
info!("could not intersect");
.map_err(Error::server)?;

state.cursor = None;
Ok(())
}

state
.connection
.send_intersect_not_found(tip)
fn read_next_wal(&mut self) -> Result<Option<LogEntry>, Error> {
let next = self
.current_iterator
.as_mut()
.ok_or(Error::custom("requesting next without intersection"))?
// filter forward will just gives us Apply & Mark events. We can just skip Undo events
// in Ouroboros since they are not required by the protocol.
.filter_forward()
.next();

// since iterators might get exhausted, we need to keep our internal state of
// how far we're in the WAL sequence in case we need to recreate a new
// iterator from where we left of.
if let Some((seen, _)) = &next {
self.last_known_seq = Some(*seen);
}

Ok(next)
}

async fn wait_for_next_wal(&mut self) -> Result<LogEntry, Error> {
loop {
self.wal.tip_change().await.map_err(Error::server)?;

self.restart_iterator()?;

if let Some(next) = self.read_next_wal()? {
break Ok(next);
}
}
}

async fn send_forward(&mut self, block: RawBlock) -> Result<(), Error> {
debug!("sending forward event");

let tip = self.prepare_tip()?;

// Ouroboros chain-sync always starts by sending the intersection point as an
// initial rollback event. The `is_new_intersection`` flag allows us to track if
// we have already sent that initial rollback or not
if self.is_new_intersection {
self.connection
.send_roll_backward(Point::from(ChainPoint::from(&block)), tip)
.await
.map_err(Error::server)?;

self.is_new_intersection = false;
} else {
self.connection
.send_roll_forward(super::convert::header_cbor_to_chainsync(block)?, tip)
.await
.map_err(Error::server)?;
}

Ok(())
}

async fn send_rollback(&mut self, point: ChainPoint) -> Result<(), Error> {
debug!("sending rollback event");

let tip = self.prepare_tip()?;

self.connection
.send_roll_backward(Point::from(point), tip)
.await
.map_err(Error::server)
}
}

async fn process_request(state: &mut State<'_>, req: ClientRequest) -> Result<(), Error> {
match req {
ClientRequest::Intersect(points) => handle_intersect(state, points).await,
ClientRequest::RequestNext => handle_next_request(state).await,
async fn send_next_wal(&mut self, log: LogValue) -> Result<(), Error> {
match log {
LogValue::Apply(x) => self.send_forward(x).await,
LogValue::Mark(x) => self.send_rollback(x).await,
// any other type of events should be already filtered by the `filter_forward`
// predicate. We consider any other variant unreachable.
_ => unreachable!(),
}
}

#[instrument(skip_all)]
async fn handle_next_request(&mut self) -> Result<(), Error> {
info!("handling next request");

let next = self.read_next_wal()?;

if let Some((_, log)) = next {
self.send_next_wal(log).await?;
} else {
self.connection
.send_await_reply()
.await
.map_err(Error::server)?;

debug!("waiting for tip change notification");

let (_, log) = self.wait_for_next_wal().await?;

self.send_next_wal(log).await?;
}

Ok(())
}

#[instrument(skip_all)]
async fn handle_intersect(&mut self, mut points: Vec<Point>) -> Result<(), Error> {
info!(?points, "handling intersect request");

let tip = self.prepare_tip()?;

// TODO: if points are empty it means that client wants to start sync from
// origin?.
if points.is_empty() {
info!("intersect candidates empty, using origin");
points.push(Point::Origin);
}

let points = points.into_iter().map(From::from).collect_vec();

let seq = self.wal.find_intersect(&points).map_err(Error::server)?;

if let Some((seq, point)) = seq {
info!(?point, "found intersect point");
self.send_intersect_found(seq, point).await
} else {
info!("could not intersect");

self.current_iterator = None;

self.connection
.send_intersect_not_found(tip)
.await
.map_err(Error::server)
}
}

async fn process_requests(&mut self) -> Result<(), Error> {
while let Some(req) = self
.connection
.recv_while_idle()
.await
.map_err(Error::server)?
{
let result = match req {
ClientRequest::Intersect(points) => self.handle_intersect(points).await,
ClientRequest::RequestNext => self.handle_next_request().await,
};

result.map_err(Error::server)?;
}

Ok(())
}
}

#[instrument(skip_all)]
pub async fn handle_session(wal: WalStore, connection: N2NServer) -> Result<(), Error> {
let mut state = State {
let mut session = Session {
wal,
connection,
cursor: None,
current_iterator: None,
last_known_seq: None,
is_new_intersection: false,
};

while let Some(req) = state
.connection
.recv_while_idle()
.await
.map_err(Error::server)?
{
process_request(&mut state, req)
.await
.map_err(Error::server)?;
}
session.process_requests().await?;

info!("client ended protocol");

Expand Down
35 changes: 30 additions & 5 deletions src/serve/ouroboros/convert.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
use pallas::ledger::traverse::Era;
use pallas::ledger::traverse::MultiEraBlock;
use pallas::network::miniprotocols::chainsync;

use crate::prelude::*;
use crate::wal;
use crate::wal::RawBlock;

fn era_to_header_variant(era: Era) -> u8 {
match era {
Era::Byron => 0,
Era::Shelley => 1,
Era::Allegra => 2,
Era::Mary => 3,
Era::Alonzo => 4,
Era::Babbage => 5,
Era::Conway => 6,
_ => todo!("don't know how to process era"),
}
}

fn define_byron_prefix(block: &MultiEraBlock) -> Option<(u8, u64)> {
match block.era() {
pallas::ledger::traverse::Era::Byron => {
if block.header().as_eb().is_some() {
Some((0, 0))
} else {
Some((1, 0))
}
}
_ => None,
}
}

pub fn header_cbor_to_chainsync(block: wal::RawBlock) -> Result<chainsync::HeaderContent, Error> {
let RawBlock { body, .. } = block;

let block = pallas::ledger::traverse::MultiEraBlock::decode(&body).map_err(Error::parse)?;

let out = chainsync::HeaderContent {
variant: block.era() as u8,
byron_prefix: match block.era() {
pallas::ledger::traverse::Era::Byron => Some((1, 0)),
_ => None,
},
variant: era_to_header_variant(block.era()),
byron_prefix: define_byron_prefix(&block),
cbor: block.header().cbor().to_vec(),
};

Expand Down
Loading

0 comments on commit e44b7e4

Please sign in to comment.