From ab9eb1e146267b8d6b3c8d17941e5c788ae9698d Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Mon, 23 Dec 2024 18:39:52 -0300 Subject: [PATCH] fix: apply ledger genesis when WAL is at origin (#415) --- src/sync/apply.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/sync/apply.rs b/src/sync/apply.rs index 0f011f4..99aa21e 100644 --- a/src/sync/apply.rs +++ b/src/sync/apply.rs @@ -11,7 +11,7 @@ use crate::{ledger, prelude::*}; pub type UpstreamPort = gasket::messaging::InputPort; #[derive(Stage)] -#[stage(name = "ledger", unit = "RollEvent", worker = "Worker")] +#[stage(name = "apply", unit = "()", worker = "Worker")] pub struct Stage { wal: crate::wal::redb::WalStore, ledger: crate::state::LedgerStore, @@ -45,7 +45,7 @@ impl Stage { } } - fn process_origin(&mut self) -> Result<(), WorkerError> { + fn process_origin(&self) -> Result<(), WorkerError> { info!("applying origin"); let delta = crate::ledger::compute_origin_delta(&self.genesis.byron); @@ -54,7 +54,7 @@ impl Stage { Ok(()) } - fn process_undo(&mut self, block: &wal::RawBlock) -> Result<(), WorkerError> { + fn process_undo(&self, block: &wal::RawBlock) -> Result<(), WorkerError> { let wal::RawBlock { slot, body, .. } = block; info!(slot, "undoing block"); @@ -70,14 +70,14 @@ impl Stage { Ok(()) } - fn process_apply(&mut self, block: &wal::RawBlock) -> Result<(), WorkerError> { + fn process_apply(&self, block: &wal::RawBlock) -> Result<(), WorkerError> { let wal::RawBlock { slot, body, .. } = block; info!(slot, "applying block"); let block = MultiEraBlock::decode(body).or_panic()?; - crate::state::apply_block_batch([&block], &mut self.ledger, &self.genesis).or_panic()?; + crate::state::apply_block_batch([&block], &self.ledger, &self.genesis).or_panic()?; self.mempool.apply_block(&block); @@ -102,7 +102,12 @@ impl gasket::framework::Worker for Worker { async fn bootstrap(stage: &Stage) -> Result { let cursor = stage.ledger.cursor().or_panic()?; - info!(?cursor, "cursor found"); + if cursor.is_none() { + info!("cursor not found, applying origin"); + stage.process_origin()?; + } else { + info!(?cursor, "cursor found"); + } let point = match cursor { Some(ledger::ChainPoint(s, h)) => wal::ChainPoint::Specific(s, h), @@ -116,20 +121,16 @@ impl gasket::framework::Worker for Worker { Ok(Self(seq)) } - async fn schedule( - &mut self, - stage: &mut Stage, - ) -> Result, WorkerError> { - let msg = stage.upstream.recv().await.or_panic()?; - - Ok(WorkSchedule::Unit(msg.payload)) + async fn schedule(&mut self, stage: &mut Stage) -> Result, WorkerError> { + let _ = stage.upstream.recv().await.or_panic()?; + Ok(WorkSchedule::Unit(())) } /// Catch-up ledger with latest state of WAL /// /// Reads from WAL using the latest known cursor and applies the /// corresponding downstream changes to the ledger - async fn execute(&mut self, _: &RollEvent, stage: &mut Stage) -> Result<(), WorkerError> { + async fn execute(&mut self, _: &(), stage: &mut Stage) -> Result<(), WorkerError> { let iter = stage.wal.crawl_from(Some(self.0)).or_panic()?.skip(1); // TODO: analyze scenario where we're too far behind and this for loop takes