Skip to content

Commit

Permalink
feat(rpc): introduce FilterIter
Browse files Browse the repository at this point in the history
  • Loading branch information
ValuedMammal committed Jan 1, 2025
1 parent 03a08bb commit c0f16e0
Show file tree
Hide file tree
Showing 5 changed files with 533 additions and 1 deletion.
4 changes: 4 additions & 0 deletions crates/bitcoind_rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ bdk_chain = { path = "../chain" }
default = ["std"]
std = ["bitcoin/std", "bdk_core/std"]
serde = ["bitcoin/serde", "bdk_core/serde"]

[[example]]
name = "filter_iter"
required-features = ["std"]
106 changes: 106 additions & 0 deletions crates/bitcoind_rpc/examples/filter_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#![allow(clippy::print_stdout)]
use std::time::Instant;

use anyhow::Context;
use bdk_bitcoind_rpc::bip158::{Event, EventInner, FilterIter};
use bdk_chain::bitcoin::{constants::genesis_block, secp256k1::Secp256k1, Network};
use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex;
use bdk_chain::local_chain::LocalChain;
use bdk_chain::miniscript::Descriptor;
use bdk_chain::{BlockId, ConfirmationBlockTime, IndexedTxGraph, SpkIterator};
use bdk_testenv::anyhow;

// This example shows how BDK chain and tx-graph structures are updated using compact
// filters syncing. Assumes a connection can be made to a bitcoin node via environment
// variables `RPC_URL` and `RPC_COOKIE`.

// Usage: `cargo run -p bdk_bitcoind_rpc --example filter_iter`

const EXTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/0/*)#uswl2jj7";
const INTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/1/*)#dyt7h8zx";
const SPK_COUNT: u32 = 25;
const NETWORK: Network = Network::Signet;

const START_HEIGHT: u32 = 170_000;
const START_HASH: &str = "00000041c812a89f084f633e4cf47e819a2f6b1c0a15162355a930410522c99d";

fn main() -> anyhow::Result<()> {
// Setup receiving chain and graph structures.
let secp = Secp256k1::new();
let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?;
let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?;
let (mut chain, _) = LocalChain::from_genesis_hash(genesis_block(NETWORK).block_hash());
let mut graph = IndexedTxGraph::<ConfirmationBlockTime, KeychainTxOutIndex<&str>>::new({
let mut index = KeychainTxOutIndex::default();
index.insert_descriptor("external", descriptor.clone())?;
index.insert_descriptor("internal", change_descriptor.clone())?;
index
});

// Assume a minimum birthday height
let block = BlockId {
height: START_HEIGHT,
hash: START_HASH.parse()?,
};
let _ = chain.insert_block(block)?;

// Configure RPC client
let url = std::env::var("RPC_URL").context("must set RPC_URL")?;
let cookie = std::env::var("RPC_COOKIE").context("must set RPC_COOKIE")?;
let rpc_client =
bitcoincore_rpc::Client::new(&url, bitcoincore_rpc::Auth::CookieFile(cookie.into()))?;

// Initialize block emitter
let cp = chain.tip();
let start_height = cp.height();
let mut emitter = FilterIter::new_with_checkpoint(&rpc_client, cp);
for (_, desc) in graph.index.keychains() {
let spks = SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, spk)| spk);
emitter.add_spks(spks);
}

let start = Instant::now();

// Sync
if let Some(tip) = emitter.get_tip()? {
let blocks_to_scan = tip.height - start_height;

for event in emitter.by_ref() {
let event = event?;
let curr = event.height();
// apply relevant blocks
if let Event::Block(EventInner { height, ref block }) = event {
let _ = graph.apply_block_relevant(block, height);
println!("Matched block {}", curr);
}
if curr % 1000 == 0 {
let progress = (curr - start_height) as f32 / blocks_to_scan as f32;
println!("[{:.2}%]", progress * 100.0);
}
}
// update chain
if let Some(cp) = emitter.chain_update() {
let _ = chain.apply_update(cp)?;
}
}

println!("\ntook: {}s", start.elapsed().as_secs());
println!("Local tip: {}", chain.tip().height());
let unspent: Vec<_> = graph
.graph()
.filter_chain_unspents(
&chain,
chain.tip().block_id(),
graph.index.outpoints().clone(),
)
.collect();
if !unspent.is_empty() {
println!("\nUnspent");
for (index, utxo) in unspent {
// (k, index) | value | outpoint |
println!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint);
}
}

Ok(())
}
277 changes: 277 additions & 0 deletions crates/bitcoind_rpc/src/bip158.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
//! Compact block filters sync over RPC. For more details refer to [BIP157][0].
//!
//! This module is home to [`FilterIter`], a structure that returns bitcoin blocks by matching
//! a list of script pubkeys against a [BIP158][1] [`BlockFilter`].
//!
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
use bdk_core::collections::BTreeMap;
use core::fmt;

use bdk_core::bitcoin;
use bdk_core::{BlockId, CheckPoint};
use bitcoin::{
bip158::{self, BlockFilter},
Block, BlockHash, ScriptBuf,
};
use bitcoincore_rpc;
use bitcoincore_rpc::RpcApi;

/// Block height
type Height = u32;

/// Type that generates block [`Event`]s by matching a list of script pubkeys against a
/// [`BlockFilter`].
#[derive(Debug)]
pub struct FilterIter<'c, C> {
// RPC client
client: &'c C,
// SPK inventory
spks: Vec<ScriptBuf>,
// local cp
cp: Option<CheckPoint>,
// blocks map
blocks: BTreeMap<Height, BlockHash>,
// next filter
next_filter: Option<NextFilter>,
// best height counter
height: Height,
// stop height
stop: Height,
}

impl<'c, C: RpcApi> FilterIter<'c, C> {
/// Construct [`FilterIter`] from a given `client` and start `height`.
pub fn new_with_height(client: &'c C, height: u32) -> Self {
Self {
client,
spks: vec![],
cp: None,
blocks: BTreeMap::new(),
next_filter: None,
height,
stop: 0,
}
}

/// Construct [`FilterIter`] from a given `client` and [`CheckPoint`].
pub fn new_with_checkpoint(client: &'c C, cp: CheckPoint) -> Self {
let mut filter_iter = Self::new_with_height(client, cp.height());
filter_iter.cp = Some(cp);
filter_iter
}

/// Extends `self` with an iterator of spks.
pub fn add_spks(&mut self, spks: impl IntoIterator<Item = ScriptBuf>) {
self.spks.extend(spks)
}

/// Add spk to the list of spks to scan with.
pub fn add_spk(&mut self, spk: ScriptBuf) {
self.spks.push(spk);
}

/// Get the next filter and increment the current best height.
///
/// Returns `Ok(None)` when the stop height is exceeded.
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
if self.height > self.stop {
return Ok(None);
}
let height = self.height;
let hash = match self.blocks.get(&height) {
Some(h) => *h,
None => self.client.get_block_hash(height as u64)?,
};
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
let filter = BlockFilter::new(&filter_bytes);
self.height += 1;
Ok(Some((BlockId { height, hash }, filter)))
}

/// Get the remote tip.
///
/// Returns `None` if the remote height is not strictly greater than the height of this
/// [`FilterIter`].
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
let tip_hash = self.client.get_best_block_hash()?;
let mut header = self.client.get_block_header_info(&tip_hash)?;
let tip_height = header.height as u32;
if self.height >= tip_height {
// nothing to do
return Ok(None);
}
self.blocks.insert(tip_height, tip_hash);

// if we have a checkpoint we use a lookback of ten blocks
// to ensure consistency of the local chain
if let Some(cp) = self.cp.as_ref() {
// adjust start height to point of agreement + 1
let base = self.find_base_with(cp.clone())?;
self.height = base.height + 1;

for _ in 0..9 {
let hash = match header.previous_block_hash {
Some(hash) => hash,
None => break,
};
header = self.client.get_block_header_info(&hash)?;
let height = header.height as u32;
if height < self.height {
break;
}
self.blocks.insert(height, hash);
}
}

self.stop = tip_height;

// get the first filter
self.next_filter = self.next_filter()?;

Ok(Some(BlockId {
height: tip_height,
hash: tip_hash,
}))
}
}

/// Alias for a compact filter and associated block id.
type NextFilter = (BlockId, BlockFilter);

/// Event inner type
#[derive(Debug, Clone)]
pub struct EventInner {
/// Height
pub height: Height,
/// Block
pub block: Block,
}

/// Kind of event produced by [`FilterIter`].
#[derive(Debug, Clone)]
pub enum Event {
/// Block
Block(EventInner),
/// No match
NoMatch(Height),
}

impl Event {
/// Whether this event contains a matching block.
pub fn is_match(&self) -> bool {
matches!(self, Event::Block(_))
}

/// Get the height of this event.
pub fn height(&self) -> Height {
match self {
Self::Block(EventInner { height, .. }) => *height,
Self::NoMatch(h) => *h,
}
}
}

impl<C: RpcApi> Iterator for FilterIter<'_, C> {
type Item = Result<Event, Error>;

fn next(&mut self) -> Option<Self::Item> {
let (block, filter) = self.next_filter.clone()?;

(|| -> Result<_, Error> {
// if the next filter matches any of our watched spks, get the block
// and return it, inserting relevant block ids along the way
let height = block.height;
let hash = block.hash;

let result = if self.spks.is_empty() {
Err(Error::NoScripts)
} else if filter
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
.map_err(Error::Bip158)?
{
let block = self.client.get_block(&hash)?;
self.blocks.insert(height, hash);
let inner = EventInner { height, block };
Ok(Some(Event::Block(inner)))
} else {
Ok(Some(Event::NoMatch(height)))
};

self.next_filter = self.next_filter()?;

result
})()
.transpose()
}
}

impl<C: RpcApi> FilterIter<'_, C> {
/// Returns the point of agreement between `self` and the given `cp`.
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
loop {
let height = cp.height();
let fetched_hash = match self.blocks.get(&height) {
Some(hash) => *hash,
None if height == 0 => cp.hash(),
_ => self.client.get_block_hash(height as _)?,
};
if cp.hash() == fetched_hash {
// ensure this block also exists in self
self.blocks.insert(height, cp.hash());
return Ok(cp.block_id());
}
// remember conflicts
self.blocks.insert(height, fetched_hash);
cp = cp.prev().expect("must break before genesis");
}
}

/// Returns a chain update from the newly scanned blocks.
///
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
pub fn chain_update(&mut self) -> Option<CheckPoint> {
if self.cp.is_none() || self.blocks.is_empty() {
return None;
}

// note: to connect with the local chain we must guarantee that `self.blocks.first()`
// is also the point of agreement with `self.cp`.
Some(
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
.expect("blocks must be in order"),
)
}
}

/// Errors that may occur during a compact filters sync.
#[derive(Debug)]
pub enum Error {
/// bitcoin bip158 error
Bip158(bip158::Error),
/// attempted to scan blocks without any script pubkeys
NoScripts,
/// `bitcoincore_rpc` error
Rpc(bitcoincore_rpc::Error),
}

impl From<bitcoincore_rpc::Error> for Error {
fn from(e: bitcoincore_rpc::Error) -> Self {
Self::Rpc(e)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Bip158(e) => e.fmt(f),
Self::NoScripts => write!(f, "no script pubkeys were provided to match with"),
Self::Rpc(e) => e.fmt(f),
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for Error {}
Loading

0 comments on commit c0f16e0

Please sign in to comment.