Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement safeguards to check plan against the archive #24

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/penumbra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,54 @@ impl RegenerationStep {
_ => self,
}
}

/// Check the feasability of this step against an archive.
///
/// Will return `Ok(Err(_))` if this step is guaranteed to fail (at that starting point).
pub async fn check_against_archive(
&self,
start: u64,
archive: &Archive,
) -> anyhow::Result<anyhow::Result<()>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably simplify the signature and the rest of the call stack here, right? It sounds like the inner error is a boolean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a double-take on the sig too. Even if this is necessary, would appreciate more comments to explain.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially wrote this change with a boolean, but then decided against it, because it's useful to understand and print why a plan failed a check against an archive, and not just that it failed.

As far as the nested results, this is because the first layer can signal spurious failure (io ops failing, whatever the sqlite lib might throw at us), while the latter signals permanent failure. The plan will never succeed against that archive without manual intervention if the inner error is set. By having two layers, we could in a further change add retries around the outer layer. I think distinguishing between "errors you can do something about" and "errors you can't hope to resolve" is good.

match self {
RegenerationStep::Migrate { .. } => Ok(Ok(())),
// For this to work, we need to be able to fetch the genesis,
// and then to be able to do a "run to" from the start to the potential last block.
RegenerationStep::InitThenRunTo {
genesis_height,
last_block,
..
} => {
if !archive.genesis_does_exist(*genesis_height).await? {
return Err(anyhow!(
"genesis at height {} does not exist",
genesis_height,
));
}
if start > 0 && !archive.block_does_exist(start).await? {
return Err(anyhow!("missing block at height {}", start));
}
if let Some(block) = last_block {
if !archive.block_does_exist(*block).await? {
return Err(anyhow!("missing block at height {}", block));
}
}
Ok(Ok(()))
}
// To run from a start block to a last block, both blocks should exist.
RegenerationStep::RunTo { last_block, .. } => {
if start > 0 && !archive.block_does_exist(start).await? {
return Err(anyhow!("missing block at height {}", start));
}
if let Some(block) = last_block {
if !archive.block_does_exist(*block).await? {
return Err(anyhow!("missing block at height {}", block));
}
}
Ok(Ok(()))
}
}
}
}

/// Represents a series of steps to regenerate events.
Expand Down Expand Up @@ -195,6 +243,23 @@ impl RegenerationPlan {
Self { steps }
}

/// Check the integrity of this plan against an archive.
///
/// This avoids running a plan which can't possibly succeed against an archive.
///
/// If this plan returns `Ok(false)`, then running it against that archive *will*
/// fail. An error might just be something spurious, e.g. an IO error.
pub async fn check_against_archive(
&self,
archive: &Archive,
) -> anyhow::Result<anyhow::Result<()>> {
let mut good = Ok(());
for (start, step) in &self.steps {
good = good.and(step.check_against_archive(*start, archive).await?);
}
Ok(good)
}

/// Some regeneration plans are pre-specified, by a chain id.
pub fn from_known_chain_id(chain_id: &str) -> Option<Self> {
match chain_id {
Expand Down Expand Up @@ -368,6 +433,7 @@ impl Regenerator {
stop,
plan
);
plan.check_against_archive(&self.archive).await??;
for (start, step) in plan.steps.into_iter() {
use RegenerationStep::*;
match step {
Expand Down
20 changes: 20 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ impl Storage {
Ok(data.map(|x| Genesis::decode(&x.0)).transpose()?)
}

pub async fn genesis_does_exist(&self, initial_height: u64) -> anyhow::Result<bool> {
let exists: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM geneses WHERE initial_height = ?)")
.bind(i64::try_from(initial_height)?)
.fetch_one(&self.pool)
.await?;
Ok(exists)
}

/// Get a block from storage.
///
/// This will return [Option::None] if there's no such block.
Expand All @@ -285,6 +294,15 @@ impl Storage {
Ok(data.map(|x| Block::decode(&x.0)).transpose()?)
}

pub async fn block_does_exist(&self, height: u64) -> anyhow::Result<bool> {
let exists: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM blocks WHERE height = ?)")
.bind(i64::try_from(height)?)
.fetch_one(&self.pool)
.await?;
Ok(exists)
}

/// Get the highest known block in the storage.
#[allow(dead_code)]
pub async fn last_height(&self) -> anyhow::Result<Option<u64>> {
Expand Down Expand Up @@ -333,6 +351,7 @@ mod test {
let height = in_block.height();
let storage = Storage::new(None, Some(CHAIN_ID)).await?;
storage.put_block(&in_block).await?;
assert!(storage.block_does_exist(height).await?);
let out_block = storage.get_block(height).await?;
assert_eq!(out_block, Some(in_block));
let last_height = storage.last_height().await?;
Expand Down Expand Up @@ -361,6 +380,7 @@ mod test {
let storage = Storage::new(None, Some(CHAIN_ID)).await?;
let genesis = Genesis::test_value();
storage.put_genesis(&genesis).await?;
assert!(storage.genesis_does_exist(genesis.initial_height()).await?);
let out = storage
.get_genesis(genesis.initial_height())
.await?
Expand Down
Loading