Skip to content

Commit

Permalink
fix: apply ledger genesis when WAL is at origin (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Dec 23, 2024
1 parent ad28f87 commit ab9eb1e
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions src/sync/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{ledger, prelude::*};
pub type UpstreamPort = gasket::messaging::InputPort<RollEvent>;

#[derive(Stage)]
#[stage(name = "ledger", unit = "RollEvent", worker = "Worker")]
#[stage(name = "apply", unit = "()", worker = "Worker")]
pub struct Stage {
wal: crate::wal::redb::WalStore,
ledger: crate::state::LedgerStore,
Expand Down Expand Up @@ -45,7 +45,7 @@ impl Stage {
}
}

fn process_origin(&mut self) -> Result<(), WorkerError> {
fn process_origin(&self) -> Result<(), WorkerError> {
info!("applying origin");

let delta = crate::ledger::compute_origin_delta(&self.genesis.byron);
Expand All @@ -54,7 +54,7 @@ impl Stage {
Ok(())
}

fn process_undo(&mut self, block: &wal::RawBlock) -> Result<(), WorkerError> {
fn process_undo(&self, block: &wal::RawBlock) -> Result<(), WorkerError> {
let wal::RawBlock { slot, body, .. } = block;

info!(slot, "undoing block");
Expand All @@ -70,14 +70,14 @@ impl Stage {
Ok(())
}

fn process_apply(&mut self, block: &wal::RawBlock) -> Result<(), WorkerError> {
fn process_apply(&self, block: &wal::RawBlock) -> Result<(), WorkerError> {
let wal::RawBlock { slot, body, .. } = block;

info!(slot, "applying block");

let block = MultiEraBlock::decode(body).or_panic()?;

crate::state::apply_block_batch([&block], &mut self.ledger, &self.genesis).or_panic()?;
crate::state::apply_block_batch([&block], &self.ledger, &self.genesis).or_panic()?;

self.mempool.apply_block(&block);

Expand All @@ -102,7 +102,12 @@ impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
let cursor = stage.ledger.cursor().or_panic()?;

info!(?cursor, "cursor found");
if cursor.is_none() {
info!("cursor not found, applying origin");
stage.process_origin()?;
} else {
info!(?cursor, "cursor found");
}

let point = match cursor {
Some(ledger::ChainPoint(s, h)) => wal::ChainPoint::Specific(s, h),
Expand All @@ -116,20 +121,16 @@ impl gasket::framework::Worker<Stage> for Worker {
Ok(Self(seq))
}

async fn schedule(
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<RollEvent>, WorkerError> {
let msg = stage.upstream.recv().await.or_panic()?;

Ok(WorkSchedule::Unit(msg.payload))
async fn schedule(&mut self, stage: &mut Stage) -> Result<WorkSchedule<()>, WorkerError> {
let _ = stage.upstream.recv().await.or_panic()?;
Ok(WorkSchedule::Unit(()))
}

/// Catch-up ledger with latest state of WAL
///
/// Reads from WAL using the latest known cursor and applies the
/// corresponding downstream changes to the ledger
async fn execute(&mut self, _: &RollEvent, stage: &mut Stage) -> Result<(), WorkerError> {
async fn execute(&mut self, _: &(), stage: &mut Stage) -> Result<(), WorkerError> {
let iter = stage.wal.crawl_from(Some(self.0)).or_panic()?.skip(1);

// TODO: analyze scenario where we're too far behind and this for loop takes
Expand Down

0 comments on commit ab9eb1e

Please sign in to comment.