diff --git a/src/serve/ouroboros/chainsync.rs b/src/serve/ouroboros/chainsync.rs index c221f2e9..b4f2073b 100644 --- a/src/serve/ouroboros/chainsync.rs +++ b/src/serve/ouroboros/chainsync.rs @@ -7,136 +7,235 @@ use tracing::{debug, info, instrument}; use crate::{ prelude::Error, - wal::{self, redb::WalStore, ReadUtils, WalReader}, + wal::{ + self, redb::WalStore, ChainPoint, LogEntry, LogSeq, LogValue, RawBlock, ReadUtils, + WalReader, + }, }; -pub struct State<'a> { +pub struct Session<'a> { wal: WalStore, - cursor: Option>, + current_iterator: Option>, + is_new_intersection: bool, + last_known_seq: Option, connection: N2NServer, } -#[instrument(skip_all)] -async fn handle_next_request(state: &mut State<'_>) -> Result<(), Error> { - info!("handling next request"); - - let next = state - .cursor - .as_mut() - .ok_or(Error::custom("requesting next without intersection"))? - .filter_forward() - .into_blocks() - .flatten() - .next(); - - let tip = state - .wal - .find_tip() - .map_err(Error::server)? - .map(|(_, x)| Tip(x.into(), 0)) - .unwrap_or(Tip(Point::Origin, 0)); - - if let Some(block) = next { - state - .connection - .send_roll_forward(super::convert::header_cbor_to_chainsync(block)?, tip) - .await - .map_err(Error::server)?; - } else { - state - .connection - .send_await_reply() - .await - .map_err(Error::server)?; - - debug!("waiting for tip change notification"); - state.wal.tip_change().await.map_err(Error::server)?; +impl<'a> Session<'a> { + fn prepare_tip(&self) -> Result { + let tip = self + .wal + .find_tip() + .map_err(Error::server)? + .map(|(_, x)| Tip(x.into(), 0)) + .unwrap_or(Tip(Point::Origin, 0)); - todo!("tip chainsync not implemented yet"); + Ok(tip) } - Ok(()) -} - -#[instrument(skip_all)] -async fn handle_intersect(state: &mut State<'_>, points: Vec) -> Result<(), Error> { - info!(?points, "handling intersect request"); - - let tip = state - .wal - .find_tip() - .map_err(Error::server)? - // TODO: send real value for block height - .map(|(_, x)| Tip(x.into(), 0)) - .unwrap_or(Tip(Point::Origin, 0)); - - // if points are empty it means that client wants to start sync from origin. - if points.is_empty() { - state - .connection - .send_intersect_found(Point::Origin, tip) - .await - .map_err(Error::server)?; + fn restart_iterator(&mut self) -> Result<(), Error> { + let seq = self + .last_known_seq + .as_ref() + .expect("broken invariant, we should have a last seen seq"); - return Ok(()); - } + self.current_iterator = self + .wal + .crawl_from(Some(*seq)) + .map_err(Error::server)? + .into(); - let points = points.into_iter().map(From::from).collect_vec(); + // we need to skip the first item since we're already seen it + self.current_iterator.as_mut().unwrap().next(); - let seq = state.wal.find_intersect(&points).map_err(Error::server)?; + Ok(()) + } - if let Some((seq, point)) = seq { - info!(?point, "found intersect point"); + async fn send_intersect_found(&mut self, seq: LogSeq, point: ChainPoint) -> Result<(), Error> { + debug!("sending intersection found"); - state.cursor = state + self.current_iterator = self .wal .crawl_from(Some(seq)) .map_err(Error::server)? .into(); - state - .connection + self.is_new_intersection = true; + + let tip = self.prepare_tip()?; + + self.connection .send_intersect_found(point.into(), tip) .await - .map_err(Error::server) - } else { - info!("could not intersect"); + .map_err(Error::server)?; - state.cursor = None; + Ok(()) + } - state - .connection - .send_intersect_not_found(tip) + fn read_next_wal(&mut self) -> Result, Error> { + let next = self + .current_iterator + .as_mut() + .ok_or(Error::custom("requesting next without intersection"))? + // filter forward will just gives us Apply & Mark events. We can just skip Undo events + // in Ouroboros since they are not required by the protocol. + .filter_forward() + .next(); + + // since iterators might get exhausted, we need to keep our internal state of + // how far we're in the WAL sequence in case we need to recreate a new + // iterator from where we left of. + if let Some((seen, _)) = &next { + self.last_known_seq = Some(*seen); + } + + Ok(next) + } + + async fn wait_for_next_wal(&mut self) -> Result { + loop { + self.wal.tip_change().await.map_err(Error::server)?; + + self.restart_iterator()?; + + if let Some(next) = self.read_next_wal()? { + break Ok(next); + } + } + } + + async fn send_forward(&mut self, block: RawBlock) -> Result<(), Error> { + debug!("sending forward event"); + + let tip = self.prepare_tip()?; + + // Ouroboros chain-sync always starts by sending the intersection point as an + // initial rollback event. The `is_new_intersection`` flag allows us to track if + // we have already sent that initial rollback or not + if self.is_new_intersection { + self.connection + .send_roll_backward(Point::from(ChainPoint::from(&block)), tip) + .await + .map_err(Error::server)?; + + self.is_new_intersection = false; + } else { + self.connection + .send_roll_forward(super::convert::header_cbor_to_chainsync(block)?, tip) + .await + .map_err(Error::server)?; + } + + Ok(()) + } + + async fn send_rollback(&mut self, point: ChainPoint) -> Result<(), Error> { + debug!("sending rollback event"); + + let tip = self.prepare_tip()?; + + self.connection + .send_roll_backward(Point::from(point), tip) .await .map_err(Error::server) } -} -async fn process_request(state: &mut State<'_>, req: ClientRequest) -> Result<(), Error> { - match req { - ClientRequest::Intersect(points) => handle_intersect(state, points).await, - ClientRequest::RequestNext => handle_next_request(state).await, + async fn send_next_wal(&mut self, log: LogValue) -> Result<(), Error> { + match log { + LogValue::Apply(x) => self.send_forward(x).await, + LogValue::Mark(x) => self.send_rollback(x).await, + // any other type of events should be already filtered by the `filter_forward` + // predicate. We consider any other variant unreachable. + _ => unreachable!(), + } + } + + #[instrument(skip_all)] + async fn handle_next_request(&mut self) -> Result<(), Error> { + info!("handling next request"); + + let next = self.read_next_wal()?; + + if let Some((_, log)) = next { + self.send_next_wal(log).await?; + } else { + self.connection + .send_await_reply() + .await + .map_err(Error::server)?; + + debug!("waiting for tip change notification"); + + let (_, log) = self.wait_for_next_wal().await?; + + self.send_next_wal(log).await?; + } + + Ok(()) + } + + #[instrument(skip_all)] + async fn handle_intersect(&mut self, mut points: Vec) -> Result<(), Error> { + info!(?points, "handling intersect request"); + + let tip = self.prepare_tip()?; + + // TODO: if points are empty it means that client wants to start sync from + // origin?. + if points.is_empty() { + info!("intersect candidates empty, using origin"); + points.push(Point::Origin); + } + + let points = points.into_iter().map(From::from).collect_vec(); + + let seq = self.wal.find_intersect(&points).map_err(Error::server)?; + + if let Some((seq, point)) = seq { + info!(?point, "found intersect point"); + self.send_intersect_found(seq, point).await + } else { + info!("could not intersect"); + + self.current_iterator = None; + + self.connection + .send_intersect_not_found(tip) + .await + .map_err(Error::server) + } + } + + async fn process_requests(&mut self) -> Result<(), Error> { + while let Some(req) = self + .connection + .recv_while_idle() + .await + .map_err(Error::server)? + { + let result = match req { + ClientRequest::Intersect(points) => self.handle_intersect(points).await, + ClientRequest::RequestNext => self.handle_next_request().await, + }; + + result.map_err(Error::server)?; + } + + Ok(()) } } #[instrument(skip_all)] pub async fn handle_session(wal: WalStore, connection: N2NServer) -> Result<(), Error> { - let mut state = State { + let mut session = Session { wal, connection, - cursor: None, + current_iterator: None, + last_known_seq: None, + is_new_intersection: false, }; - while let Some(req) = state - .connection - .recv_while_idle() - .await - .map_err(Error::server)? - { - process_request(&mut state, req) - .await - .map_err(Error::server)?; - } + session.process_requests().await?; info!("client ended protocol"); diff --git a/src/serve/ouroboros/convert.rs b/src/serve/ouroboros/convert.rs index 715d658f..514ad05e 100644 --- a/src/serve/ouroboros/convert.rs +++ b/src/serve/ouroboros/convert.rs @@ -1,20 +1,45 @@ +use pallas::ledger::traverse::Era; +use pallas::ledger::traverse::MultiEraBlock; use pallas::network::miniprotocols::chainsync; use crate::prelude::*; use crate::wal; use crate::wal::RawBlock; +fn era_to_header_variant(era: Era) -> u8 { + match era { + Era::Byron => 0, + Era::Shelley => 1, + Era::Allegra => 2, + Era::Mary => 3, + Era::Alonzo => 4, + Era::Babbage => 5, + Era::Conway => 6, + _ => todo!("don't know how to process era"), + } +} + +fn define_byron_prefix(block: &MultiEraBlock) -> Option<(u8, u64)> { + match block.era() { + pallas::ledger::traverse::Era::Byron => { + if block.header().as_eb().is_some() { + Some((0, 0)) + } else { + Some((1, 0)) + } + } + _ => None, + } +} + pub fn header_cbor_to_chainsync(block: wal::RawBlock) -> Result { let RawBlock { body, .. } = block; let block = pallas::ledger::traverse::MultiEraBlock::decode(&body).map_err(Error::parse)?; let out = chainsync::HeaderContent { - variant: block.era() as u8, - byron_prefix: match block.era() { - pallas::ledger::traverse::Era::Byron => Some((1, 0)), - _ => None, - }, + variant: era_to_header_variant(block.era()), + byron_prefix: define_byron_prefix(&block), cbor: block.header().cbor().to_vec(), }; diff --git a/src/serve/ouroboros/tests.rs b/src/serve/ouroboros/tests.rs index d69af510..4d4f86f1 100644 --- a/src/serve/ouroboros/tests.rs +++ b/src/serve/ouroboros/tests.rs @@ -1,44 +1,13 @@ use pallas::network::{ facades::PeerClient, - miniprotocols::{Point, MAINNET_MAGIC}, + miniprotocols::{chainsync::NextResponse, Point, MAINNET_MAGIC}, }; -use crate::wal::{self, WalWriter}; - -const DUMMY_BLOCK_BYTES: &str = "820183851a2d964a09582089d9b5a5b8ddc8d7e5a6795e9774d97faf1efea59b2caf7eaf9f8c5b32059df484830058200e5751c026e543b2e8ab2eb06099daa1d1e5df47778f7787faab45cdf12fe3a85820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b8300582025777aca9e4a73d48fc73b4f961d345b06d4a6f349cb7916570d35537d53479f5820d36a2619a672494604e11bb447cbcf5231e9f2ba25c2169177edc941bd50ad6c5820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b58204e66280cd94d591072349bec0a3090a53aa945562efb6d08d56e53654b0e40988482000058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab98101820282840058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab9584061261a95b7613ee6bf2067dad77b70349729b0c50d57bc1cf30de0db4a1e73a885d0054af7c23fc6c37919dba41c602a57e2d0f9329a7954b867338d6fb2c9455840e03e62f083df5576360e60a32e22bbb07b3c8df4fcab8079f1d6f61af3954d242ba8a06516c395939f24096f3df14e103a7d9c2b80a68a9363cf1f27c7a4e307584044f18ef23db7d2813415cb1b62e8f3ead497f238edf46bb7a97fd8e9105ed9775e8421d18d47e05a2f602b700d932c181e8007bbfb231d6f1a050da4ebeeba048483000000826a63617264616e6f2d736c00a058204ba92aa320c60acc9ad7b9a64f2eda55c4d2ec28e604faf186708b4f0c4e8edf849fff8300d9010280d90102809fff82809fff81a0"; +use crate::wal::{self, redb::WalStore, WalWriter}; type ServerHandle = tokio::task::JoinHandle>; -fn slot_to_hash(slot: u64) -> wal::BlockHash { - let mut hasher = pallas::crypto::hash::Hasher::<256>::new(); - hasher.input(&(slot as i32).to_le_bytes()); - hasher.finalize() -} - -fn slot_to_block(slot: u64) -> wal::RawBlock { - let bytes = &hex::decode(DUMMY_BLOCK_BYTES).unwrap(); - let block = pallas::ledger::traverse::MultiEraBlock::decode(&bytes).unwrap(); - - wal::RawBlock { - slot, - hash: block.hash(), - era: block.era(), - body: hex::decode(DUMMY_BLOCK_BYTES).unwrap(), - } -} - -fn setup_dummy_db() -> wal::redb::WalStore { - let mut wal = wal::redb::WalStore::memory().unwrap(); - - let blocks = (0..300).map(slot_to_block); - wal.roll_forward(blocks).unwrap(); - - wal -} - -async fn setup_server_client_pair(port: u32) -> (ServerHandle, PeerClient) { - let wal = setup_dummy_db(); - +async fn setup_server_client_pair(port: u32, wal: WalStore) -> (ServerHandle, PeerClient) { let server = tokio::spawn(super::serve( super::Config { listen_address: format!("[::]:{port}"), @@ -55,19 +24,21 @@ async fn setup_server_client_pair(port: u32) -> (ServerHandle, PeerClient) { } #[tokio::test] -async fn test_blockfetch() { +async fn test_blockfetch_happy_path() { // let _ = tracing::subscriber::set_global_default( // tracing_subscriber::FmtSubscriber::builder() // .with_max_level(tracing::Level::DEBUG) // .finish(), // ); + let wal = wal::testing::db_with_dummy_blocks(300); + // use servers in different ports until we implement some sort of test harness - let (server, mut client) = setup_server_client_pair(30031).await; + let (server, mut client) = setup_server_client_pair(30031, wal.clone()).await; let range = ( - Point::Specific(20, slot_to_hash(20).to_vec()), - Point::Specific(60, slot_to_hash(60).to_vec()), + Point::Specific(20, wal::testing::slot_to_hash(20).to_vec()), + Point::Specific(60, wal::testing::slot_to_hash(60).to_vec()), ); let blocks = client.blockfetch().fetch_range(range).await.unwrap(); @@ -83,17 +54,19 @@ async fn test_blockfetch() { } #[tokio::test] -async fn test_chainsync() { +async fn test_chainsync_happy_path() { // let _ = tracing::subscriber::set_global_default( // tracing_subscriber::FmtSubscriber::builder() // .with_max_level(tracing::Level::DEBUG) // .finish(), // ); + let mut wal = wal::testing::db_with_dummy_blocks(300); + // use servers in different ports until we implement some sort of test harness - let (server, mut client) = setup_server_client_pair(30032).await; + let (server, mut client) = setup_server_client_pair(30032, wal.clone()).await; - let known_points = vec![Point::Specific(20, slot_to_hash(20).to_vec())]; + let known_points = vec![Point::Specific(20, wal::testing::slot_to_hash(20).to_vec())]; let (point, _) = client .chainsync() @@ -103,8 +76,60 @@ async fn test_chainsync() { assert_eq!(point.unwrap(), known_points[0]); - // hangs here on receiving next block, even though server sends it - let _next = client.chainsync().request_next().await.unwrap(); + let next = client.chainsync().request_next().await.unwrap(); + + match next { + NextResponse::RollBackward(Point::Specific(slot, _), _) => assert_eq!(slot, 20), + _ => panic!("expected rollback to point"), + } + + for _ in 21..300 { + let next = client.chainsync().request_next().await.unwrap(); + + match next { + NextResponse::RollForward(_, _) => (), + _ => panic!("expected rollforward"), + } + } + + let next = client.chainsync().request_next().await.unwrap(); + + match next { + NextResponse::Await => (), + _ => panic!("expected await"), + } + + for slot in 301..320 { + wal.roll_forward(std::iter::once(wal::testing::dummy_block_from_slot(slot))) + .unwrap(); + + let next = client.chainsync().recv_while_must_reply().await.unwrap(); + + match next { + NextResponse::RollForward(_, _) => (), + _ => panic!("expected rollforward"), + } + + let next = client.chainsync().request_next().await.unwrap(); + + match next { + NextResponse::Await => (), + _ => panic!("expected await"), + } + } + + wal.roll_back(&wal::ChainPoint::Specific( + 310, + wal::testing::slot_to_hash(310), + )) + .unwrap(); + + let next = client.chainsync().recv_while_must_reply().await.unwrap(); + + match next { + NextResponse::RollBackward(Point::Specific(slot, _), _) => assert_eq!(slot, 310), + _ => panic!("expected rollback to point"), + } server.abort(); } diff --git a/src/wal/mod.rs b/src/wal/mod.rs index c4c1e4e9..63c4ab5a 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -7,6 +7,12 @@ mod reader; mod stream; mod writer; +// A concrete implementation of the WAL using Redb +pub mod redb; + +#[cfg(test)] +pub mod testing; + pub type BlockSlot = u64; pub type BlockHash = pallas::crypto::hash::Hash<32>; pub type BlockEra = pallas::ledger::traverse::Era; @@ -49,11 +55,18 @@ impl From for PallasPoint { } } +impl From<&RawBlock> for ChainPoint { + fn from(value: &RawBlock) -> Self { + let RawBlock { slot, hash, .. } = value; + ChainPoint::Specific(*slot, *hash) + } +} + impl From<&LogValue> for ChainPoint { fn from(value: &LogValue) -> Self { match value { - LogValue::Apply(RawBlock { slot, hash, .. }) => ChainPoint::Specific(*slot, *hash), - LogValue::Undo(RawBlock { slot, hash, .. }) => ChainPoint::Specific(*slot, *hash), + LogValue::Apply(x) => ChainPoint::from(x), + LogValue::Undo(x) => ChainPoint::from(x), LogValue::Mark(x) => x.clone(), } } @@ -67,13 +80,6 @@ pub struct RawBlock { pub body: BlockBody, } -impl From<&RawBlock> for ChainPoint { - fn from(value: &RawBlock) -> Self { - let RawBlock { slot, hash, .. } = value; - ChainPoint::Specific(*slot, *hash) - } -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LogValue { Apply(RawBlock), @@ -107,8 +113,6 @@ pub use reader::{ReadUtils, WalReader}; pub use stream::WalStream; pub use writer::WalWriter; -pub mod redb; - #[cfg(test)] mod tests { use super::*; diff --git a/src/wal/redb.rs b/src/wal/redb.rs index 96d14d22..9b0fd9f0 100644 --- a/src/wal/redb.rs +++ b/src/wal/redb.rs @@ -2,7 +2,7 @@ use bincode; use itertools::Itertools; use log::info; use redb::{Range, ReadableTable, TableDefinition}; -use std::{ops::RangeBounds, path::Path, sync::Arc}; +use std::{path::Path, sync::Arc}; use tracing::warn; use super::{ChainPoint, LogEntry, LogSeq, LogValue, RawBlock, WalError, WalReader, WalWriter}; diff --git a/src/wal/testing.rs b/src/wal/testing.rs new file mode 100644 index 00000000..79063236 --- /dev/null +++ b/src/wal/testing.rs @@ -0,0 +1,34 @@ +use super::*; + +const DUMMY_BLOCK_BYTES: &str = "820183851a2d964a09582089d9b5a5b8ddc8d7e5a6795e9774d97faf1efea59b2caf7eaf9f8c5b32059df484830058200e5751c026e543b2e8ab2eb06099daa1d1e5df47778f7787faab45cdf12fe3a85820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b8300582025777aca9e4a73d48fc73b4f961d345b06d4a6f349cb7916570d35537d53479f5820d36a2619a672494604e11bb447cbcf5231e9f2ba25c2169177edc941bd50ad6c5820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b58204e66280cd94d591072349bec0a3090a53aa945562efb6d08d56e53654b0e40988482000058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab98101820282840058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab9584061261a95b7613ee6bf2067dad77b70349729b0c50d57bc1cf30de0db4a1e73a885d0054af7c23fc6c37919dba41c602a57e2d0f9329a7954b867338d6fb2c9455840e03e62f083df5576360e60a32e22bbb07b3c8df4fcab8079f1d6f61af3954d242ba8a06516c395939f24096f3df14e103a7d9c2b80a68a9363cf1f27c7a4e307584044f18ef23db7d2813415cb1b62e8f3ead497f238edf46bb7a97fd8e9105ed9775e8421d18d47e05a2f602b700d932c181e8007bbfb231d6f1a050da4ebeeba048483000000826a63617264616e6f2d736c00a058204ba92aa320c60acc9ad7b9a64f2eda55c4d2ec28e604faf186708b4f0c4e8edf849fff8300d9010280d90102809fff82809fff81a0"; + +pub fn slot_to_hash(slot: u64) -> BlockHash { + let mut hasher = pallas::crypto::hash::Hasher::<256>::new(); + hasher.input(&(slot as i32).to_le_bytes()); + hasher.finalize() +} + +pub fn dummy_block_from_slot(slot: u64) -> RawBlock { + let bytes = &hex::decode(DUMMY_BLOCK_BYTES).unwrap(); + let block = pallas::ledger::traverse::MultiEraBlock::decode(&bytes).unwrap(); + + RawBlock { + slot, + hash: slot_to_hash(slot), + era: block.era(), + body: hex::decode(DUMMY_BLOCK_BYTES).unwrap(), + } +} + +pub fn empty_db() -> redb::WalStore { + super::redb::WalStore::memory().unwrap() +} + +pub fn db_with_dummy_blocks(quantity: usize) -> redb::WalStore { + let mut wal = empty_db(); + + let blocks = (0..quantity).map(|x| dummy_block_from_slot(x as u64)); + wal.roll_forward(blocks).unwrap(); + + wal +} diff --git a/src/wal/writer.rs b/src/wal/writer.rs index 30228d3b..e07fe7a5 100644 --- a/src/wal/writer.rs +++ b/src/wal/writer.rs @@ -41,33 +41,9 @@ pub trait WalWriter: WalReader { mod tests { use super::*; - fn setup_ephemeral_db() -> redb::WalStore { - super::redb::WalStore::memory().unwrap() - } - - const DUMMY_BLOCK_BYTES: &str = "820183851a2d964a09582089d9b5a5b8ddc8d7e5a6795e9774d97faf1efea59b2caf7eaf9f8c5b32059df484830058200e5751c026e543b2e8ab2eb06099daa1d1e5df47778f7787faab45cdf12fe3a85820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b8300582025777aca9e4a73d48fc73b4f961d345b06d4a6f349cb7916570d35537d53479f5820d36a2619a672494604e11bb447cbcf5231e9f2ba25c2169177edc941bd50ad6c5820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b58204e66280cd94d591072349bec0a3090a53aa945562efb6d08d56e53654b0e40988482000058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab98101820282840058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab9584061261a95b7613ee6bf2067dad77b70349729b0c50d57bc1cf30de0db4a1e73a885d0054af7c23fc6c37919dba41c602a57e2d0f9329a7954b867338d6fb2c9455840e03e62f083df5576360e60a32e22bbb07b3c8df4fcab8079f1d6f61af3954d242ba8a06516c395939f24096f3df14e103a7d9c2b80a68a9363cf1f27c7a4e307584044f18ef23db7d2813415cb1b62e8f3ead497f238edf46bb7a97fd8e9105ed9775e8421d18d47e05a2f602b700d932c181e8007bbfb231d6f1a050da4ebeeba048483000000826a63617264616e6f2d736c00a058204ba92aa320c60acc9ad7b9a64f2eda55c4d2ec28e604faf186708b4f0c4e8edf849fff8300d9010280d90102809fff82809fff81a0"; - - fn slot_to_hash(slot: u64) -> BlockHash { - let mut hasher = pallas::crypto::hash::Hasher::<256>::new(); - hasher.input(&(slot as i32).to_le_bytes()); - hasher.finalize() - } - - fn dummy_block_from_slot(slot: u64) -> RawBlock { - let bytes = &hex::decode(DUMMY_BLOCK_BYTES).unwrap(); - let block = pallas::ledger::traverse::MultiEraBlock::decode(&bytes).unwrap(); - - RawBlock { - slot, - hash: slot_to_hash(slot), - era: block.era(), - body: hex::decode(DUMMY_BLOCK_BYTES).unwrap(), - } - } - #[test] fn test_origin_event() { - let db = setup_ephemeral_db(); + let db = testing::empty_db(); let mut iter = db.crawl_from(None).unwrap(); @@ -85,9 +61,9 @@ mod tests { #[test] fn test_basic_append() { - let mut db = setup_ephemeral_db(); + let mut db = testing::empty_db(); - let expected_block = dummy_block_from_slot(11); + let expected_block = testing::dummy_block_from_slot(11); let expected_point = ChainPoint::Specific(11, expected_block.hash); db.roll_forward(std::iter::once(expected_block.clone())) @@ -118,12 +94,12 @@ mod tests { #[test] fn test_rollback_undos() { - let mut db = setup_ephemeral_db(); + let mut db = testing::empty_db(); - let forward = (0..=5).map(|x| dummy_block_from_slot(x * 10)); + let forward = (0..=5).map(|x| testing::dummy_block_from_slot(x * 10)); db.roll_forward(forward).unwrap(); - let rollback_to = ChainPoint::Specific(20, slot_to_hash(20)); + let rollback_to = ChainPoint::Specific(20, testing::slot_to_hash(20)); db.roll_back(&rollback_to).unwrap(); // ensure tip show rollback point