From b8114ae072b108d2e9ddea72b4269fcd9693f2d4 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 29 Dec 2023 12:59:41 -0300 Subject: [PATCH] feat: implement file-based chain cursor (#723) --- Cargo.lock | 8 +- Cargo.toml | 4 +- examples/file_cursor/.gitignore | 1 + examples/file_cursor/daemon.toml | 14 +++ src/_utils/facade.rs | 42 ------- src/_utils/retry.rs | 127 --------------------- src/_utils/time.rs | 184 ------------------------------- src/bin/oura/daemon.rs | 60 +++++----- src/cursor/file.rs | 153 +++++++++++++++++++++++++ src/cursor/memory.rs | 55 +++++++++ src/cursor/mod.rs | 60 ++++++++++ src/filters/legacy_v1/mod.rs | 4 +- src/filters/legacy_v1/prelude.rs | 6 +- src/filters/mod.rs | 49 ++++---- src/framework/cursor.rs | 45 -------- src/framework/mod.rs | 76 +++++++++---- src/lib.rs | 1 + src/sinks/assert/mod.rs | 8 +- src/sinks/aws_lambda.rs | 8 +- src/sinks/aws_s3.rs | 8 +- src/sinks/aws_sqs.rs | 8 +- src/sinks/elasticsearch.rs | 6 +- src/sinks/file_rotate.rs | 8 +- src/sinks/gcp_cloudfunction.rs | 8 +- src/sinks/gcp_pubsub.rs | 8 +- src/sinks/kafka.rs | 8 +- src/sinks/mod.rs | 80 ++++++++++---- src/sinks/noop.rs | 9 +- src/sinks/rabbitmq.rs | 8 +- src/sinks/redis.rs | 8 +- src/sinks/stdout.rs | 9 +- src/sinks/terminal/mod.rs | 8 +- src/sinks/webhook.rs | 8 +- src/sources/mod.rs | 20 ++-- src/sources/n2c.rs | 27 +++-- src/sources/n2n.rs | 24 ++-- src/sources/utxorpc.rs | 47 ++++---- 37 files changed, 581 insertions(+), 626 deletions(-) create mode 100644 examples/file_cursor/.gitignore create mode 100644 examples/file_cursor/daemon.toml delete mode 100644 src/_utils/facade.rs delete mode 100644 src/_utils/retry.rs delete mode 100644 src/_utils/time.rs create mode 100644 src/cursor/file.rs create mode 100644 src/cursor/memory.rs create mode 100644 src/cursor/mod.rs delete mode 100644 src/framework/cursor.rs diff --git a/Cargo.lock b/Cargo.lock index a32f6db4..614ac746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2933,9 +2933,9 @@ dependencies = [ [[package]] name = "gasket" -version = "0.4.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf74bee250a2b5a1a8d65186643ae771fb59f4d5f8d28ff07ec22a10ff3014d3" +checksum = "7541dcf8d4eeaef26146a14eaf2d392c242d3d02c9c3678d4a093fc3bbad34d8" dependencies = [ "async-trait", "crossbeam", @@ -2948,9 +2948,9 @@ dependencies = [ [[package]] name = "gasket-derive" -version = "0.4.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64839c204f4402daf2a6726ef1470f263d474ade061fbc3dcf7142a386fe39b6" +checksum = "19c5a116a2bb67b22a0e5cb7f0fb58220c2729715b5c8a6267df320ae3393138" dependencies = [ "proc-macro2 1.0.66", "quote 1.0.32", diff --git a/Cargo.toml b/Cargo.toml index acdce4f9..25fdc33e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,9 +30,9 @@ pallas = "0.19.0" # pallas = { path = "../pallas/pallas" } # pallas = { git = "https://github.com/txpipe/pallas" } -gasket = { version = "^0.4", features = ["derive"] } +gasket = { version = "^0.6", features = ["derive"] } # gasket = { path = "../../construkts/gasket-rs/gasket", features = ["derive"] } -# gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] } +# gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] } utxorpc = { version = "1.0.0-alpha.1" } hex = "0.4.3" diff --git a/examples/file_cursor/.gitignore b/examples/file_cursor/.gitignore new file mode 100644 index 00000000..94a2dd14 --- /dev/null +++ b/examples/file_cursor/.gitignore @@ -0,0 +1 @@ +*.json \ No newline at end of file diff --git a/examples/file_cursor/daemon.toml b/examples/file_cursor/daemon.toml new file mode 100644 index 00000000..b995a77f --- /dev/null +++ b/examples/file_cursor/daemon.toml @@ -0,0 +1,14 @@ +[source] +type = "N2N" +peers = ["relays-new.cardano-mainnet.iohk.io:3001"] + +[intersect] +type = "Point" +value = [4493860, "ce7f821d2140419fea1a7900cf71b0c0a0e94afbb1f814a6717cff071c3b6afc"] + +[cursor] +type = "File" +path = "my_cursor.json" + +[sink] +type = "Stdout" diff --git a/src/_utils/facade.rs b/src/_utils/facade.rs deleted file mode 100644 index c0159671..00000000 --- a/src/_utils/facade.rs +++ /dev/null @@ -1,42 +0,0 @@ -use super::*; - -///! A friendly facade to simplify access to common utils procedures - -impl Utils { - // To be used by source stages to access the cursor, if any - pub fn get_cursor_if_any(&self) -> Option { - match &self.cursor { - Some(provider) => provider.get_cursor(), - _ => None, - } - } - - /// To be used by sink stages to track progress - pub fn track_source_progress(&self, event: &Event) { - if let Some(metrics) = &self.metrics { - metrics.on_source_event(event); - } - } - - /// To be used by sink stages to track progress - pub fn track_sink_progress(&self, event: &Event) { - let point = match (event.context.slot, &event.context.block_hash) { - (Some(slot), Some(hash)) => cursor::PointArg(slot, hash.to_owned()), - _ => return, - }; - - if let Some(cursor) = &self.cursor { - cursor.set_cursor(point).ok_or_warn("failed to set cursor") - } - - if let Some(metrics) = &self.metrics { - metrics.on_sink_event(event); - } - } - - pub fn track_chain_tip(&self, tip: u64) { - if let Some(metrics) = &self.metrics { - metrics.on_chain_tip(tip); - } - } -} diff --git a/src/_utils/retry.rs b/src/_utils/retry.rs deleted file mode 100644 index e664ab93..00000000 --- a/src/_utils/retry.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::{fmt::Debug, ops::Mul, time::Duration}; - -use serde::{Deserialize, Deserializer}; - -#[derive(Debug, Deserialize, Copy, Clone)] -pub struct Policy { - pub max_retries: u32, - #[serde(deserialize_with = "deserialize_duration")] - pub backoff_unit: Duration, - pub backoff_factor: u32, - #[serde(deserialize_with = "deserialize_duration")] - pub max_backoff: Duration, -} - -fn deserialize_duration<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let millis = u64::deserialize(deserializer)?; - - Ok(Duration::from_millis(millis)) -} - -const DEFAULT_MAX_RETRIES: u32 = 20; -const DEFAULT_BACKOFF_DELAY: u64 = 5_000; - -impl Default for Policy { - fn default() -> Self { - Self { - max_retries: DEFAULT_MAX_RETRIES, - backoff_unit: Duration::from_millis(DEFAULT_BACKOFF_DELAY), - backoff_factor: 2, - max_backoff: Duration::from_millis(20 * DEFAULT_BACKOFF_DELAY), - } - } -} - -fn compute_backoff_delay(policy: &Policy, retry: u32) -> Duration { - let units = policy.backoff_factor.pow(retry); - let backoff = policy.backoff_unit.mul(units); - core::cmp::min(backoff, policy.max_backoff) -} - -pub fn retry_operation(op: impl Fn() -> Result, policy: &Policy) -> Result -where - E: Debug, -{ - let mut retry = 0; - - loop { - let result = op(); - - match result { - Ok(x) => break Ok(x), - Err(err) if retry < policy.max_retries => { - log::warn!("retryable operation error: {:?}", err); - - retry += 1; - - let backoff = compute_backoff_delay(policy, retry); - - log::debug!( - "backoff for {}s until next retry #{}", - backoff.as_secs(), - retry - ); - - std::thread::sleep(backoff); - } - Err(x) => { - log::error!("max retries reached, failing whole operation"); - break Err(x); - } - } - } -} - -#[cfg(test)] -mod tests { - use std::{cell::RefCell, rc::Rc}; - - use super::*; - - #[test] - fn honors_max_retries() { - let counter = Rc::new(RefCell::new(0)); - - let inner_counter = counter.clone(); - let op = move || -> Result<(), String> { - *inner_counter.borrow_mut() += 1; - Err("very bad stuff happened".to_string()) - }; - - let policy = Policy { - max_retries: 3, - backoff_unit: Duration::from_secs(1), - backoff_factor: 0, - max_backoff: Duration::from_secs(100), - }; - - assert!(retry_operation(op, &policy).is_err()); - - assert_eq!(*counter.borrow(), 4); - } - - #[test] - fn honors_exponential_backoff() { - let op = move || -> Result<(), String> { Err("very bad stuff happened".to_string()) }; - - let policy = Policy { - max_retries: 10, - backoff_unit: Duration::from_millis(1), - backoff_factor: 2, - max_backoff: Duration::MAX, - }; - - let start = std::time::Instant::now(); - let result = retry_operation(op, &policy); - let elapsed = start.elapsed(); - - assert!(result.is_err()); - - // not an exact science, should be 2046, adding +/- 10% - assert!(elapsed.as_millis() >= 1842); - assert!(elapsed.as_millis() <= 2250); - } -} diff --git a/src/_utils/time.rs b/src/_utils/time.rs deleted file mode 100644 index da2bef4d..00000000 --- a/src/_utils/time.rs +++ /dev/null @@ -1,184 +0,0 @@ -//! Blockchain time utils -//! -//! Common operations to deal with blockchain time and wallclock conversions - -use crate::utils::ChainWellKnownInfo; - -/// Abstraction available to stages to deal with blockchain time conversions -pub(crate) trait TimeProvider { - /// Maps between slots and wallclock - fn slot_to_wallclock(&self, slot: u64) -> u64; - fn absolute_slot_to_relative(&self, slot: u64) -> (u64, u64); -} - -/// A naive, standalone implementation of a time provider -/// -/// This time provider doesn't require any external resources other than an -/// initial config. It works by applying simple slot => wallclock conversion -/// logic from a well-known configured point in the chain, assuming homogeneous -/// slot length from that point forward. -#[derive(Clone)] -pub(crate) struct NaiveProvider { - config: ChainWellKnownInfo, - shelley_start_epoch: u64, -} - -impl NaiveProvider { - pub fn new(config: ChainWellKnownInfo) -> Self { - assert!( - config.byron_epoch_length > 0, - "byron epoch length needs to be greater than zero" - ); - - assert!( - config.shelley_epoch_length > 0, - "shelley epoch length needs to be greater than zero" - ); - - let (shelley_start_epoch, _) = compute_era_epoch( - config.shelley_known_slot, - config.byron_slot_length as u64, - config.byron_epoch_length as u64, - ); - - NaiveProvider { - config, - shelley_start_epoch, - } - } -} - -#[inline] -fn compute_linear_timestamp( - known_slot: u64, - known_time: u64, - slot_length: u64, - query_slot: u64, -) -> u64 { - known_time + (query_slot - known_slot) * slot_length -} - -#[inline] -fn compute_era_epoch(era_slot: u64, era_slot_length: u64, era_epoch_length: u64) -> (u64, u64) { - let epoch = (era_slot * era_slot_length) / era_epoch_length; - let reminder = era_slot % era_epoch_length; - - (epoch, reminder) -} - -impl TimeProvider for NaiveProvider { - fn slot_to_wallclock(&self, slot: u64) -> u64 { - let NaiveProvider { config, .. } = self; - - if slot < config.shelley_known_slot { - compute_linear_timestamp( - config.byron_known_slot, - config.byron_known_time, - config.byron_slot_length as u64, - slot, - ) - } else { - compute_linear_timestamp( - config.shelley_known_slot, - config.shelley_known_time, - config.shelley_slot_length as u64, - slot, - ) - } - } - - fn absolute_slot_to_relative(&self, slot: u64) -> (u64, u64) { - let NaiveProvider { - config, - shelley_start_epoch, - } = self; - - if slot < config.shelley_known_slot { - compute_era_epoch( - slot, - config.byron_slot_length as u64, - config.byron_epoch_length as u64, - ) - } else { - let era_slot = slot - config.shelley_known_slot; - - let (era_epoch, reminder) = compute_era_epoch( - era_slot, - config.shelley_slot_length as u64, - config.shelley_epoch_length as u64, - ); - - (shelley_start_epoch + era_epoch, reminder) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn assert_slot_matches_timestamp( - provider: &NaiveProvider, - slot: u64, - expected_ts: u64, - expected_epoch: u64, - expected_epoch_slot: u64, - ) { - let wallclock = provider.slot_to_wallclock(slot); - assert_eq!(wallclock, expected_ts); - - let (epoch, epoch_slot) = provider.absolute_slot_to_relative(slot); - assert_eq!(epoch, expected_epoch); - assert_eq!(epoch_slot, expected_epoch_slot); - } - - #[test] - fn naive_provider_matches_mainnet_values() { - let provider = NaiveProvider::new(ChainWellKnownInfo::mainnet()); - - // Byron start, value copied from: - // https://explorer.cardano.org/en/block?id=f0f7892b5c333cffc4b3c4344de48af4cc63f55e44936196f365a9ef2244134f - assert_slot_matches_timestamp(&provider, 0, 1506203091, 0, 0); - - // Byron middle, value copied from: - // https://explorer.cardano.org/en/block?id=c1b57d58761af4dc3c6bdcb3542170cec6db3c81e551cd68012774d1c38129a3 - assert_slot_matches_timestamp(&provider, 2160007, 1549403231, 100, 7); - - // Shelley start, value copied from: - // https://explorer.cardano.org/en/block?id=aa83acbf5904c0edfe4d79b3689d3d00fcfc553cf360fd2229b98d464c28e9de - assert_slot_matches_timestamp(&provider, 4492800, 1596059091, 208, 0); - - // Shelly middle, value copied from: - // https://explorer.cardano.org/en/block?id=ca60833847d0e70a1adfa6b7f485766003cf7d96d28d481c20d4390f91b76d68 - assert_slot_matches_timestamp(&provider, 51580240, 1643146531, 316, 431440); - - // Shelly middle, value copied from: - // https://explorer.cardano.org/en/block?id=ec07c6f74f344062db5340480e5b364aac8bb40768d184c1b1491e05c5bec4c4 - assert_slot_matches_timestamp(&provider, 54605026, 1646171317, 324, 226); - } - - #[test] - fn naive_provider_matches_testnet_values() { - let provider = NaiveProvider::new(ChainWellKnownInfo::testnet()); - - // Byron origin, value copied from: - // https://explorer.cardano-testnet.iohkdev.io/en/block?id=8f8602837f7c6f8b8867dd1cbc1842cf51a27eaed2c70ef48325d00f8efb320f - assert_slot_matches_timestamp(&provider, 0, 1564010416, 0, 0); - - // Byron start, value copied from: - // https://explorer.cardano-testnet.iohkdev.io/en/block?id=388a82f053603f3552717d61644a353188f2d5500f4c6354cc1ad27a36a7ea91 - assert_slot_matches_timestamp(&provider, 1031, 1564031036, 0, 1031); - - // Byron middle, value copied from: - // https://explorer.cardano-testnet.iohkdev.io/en/block?id=66102c0b80e1eebc9cddf9cab43c1bf912e4f1963d6f3b8ff948952f8409e779 - assert_slot_matches_timestamp(&provider, 561595, 1575242316, 25, 129595); - - // Shelley start, value copied from: - // https://explorer.cardano-testnet.iohkdev.io/en/block?id=02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f - assert_slot_matches_timestamp(&provider, 1598400, 1595967616, 74, 0); - - // Shelley middle, value copied from: - // https://explorer.cardano-testnet.iohkdev.io/en/block?id=26a1b5a649309c0c8dd48f3069d9adea5a27edf5171dfb941b708acaf2d76dcd - assert_slot_matches_timestamp(&provider, 48783593, 1643152809, 183, 97193); - } -} diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index bbef3550..337fa3c1 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -1,7 +1,7 @@ use gasket::runtime::Tether; -use oura::{filters, framework::*, sinks, sources}; +use oura::{cursor, filters, framework::*, sinks, sources}; use serde::Deserialize; -use std::{collections::VecDeque, time::Duration}; +use std::time::Duration; use tracing::{info, warn}; use crate::console; @@ -15,6 +15,7 @@ struct ConfigRoot { finalize: Option, chain: Option, retries: Option, + cursor: Option, } impl ConfigRoot { @@ -43,6 +44,7 @@ struct Runtime { source: Tether, filters: Vec, sink: Tether, + cursor: Tether, } impl Runtime { @@ -50,6 +52,7 @@ impl Runtime { std::iter::once(&self.source) .chain(self.filters.iter()) .chain(std::iter::once(&self.sink)) + .chain(std::iter::once(&self.cursor)) } fn should_stop(&self) -> bool { @@ -97,39 +100,24 @@ fn define_gasket_policy(config: Option<&gasket::retries::Policy>) -> gasket::run } } -fn chain_stages<'a>( - source: &'a mut dyn StageBootstrapper, - filters: Vec<&'a mut dyn StageBootstrapper>, - sink: &'a mut dyn StageBootstrapper, -) { - let mut prev = source; - - for filter in filters { - let (to_next, from_prev) = gasket::messaging::tokio::channel(100); - prev.connect_output(to_next); - filter.connect_input(from_prev); - prev = filter; - } - - let (to_next, from_prev) = gasket::messaging::tokio::channel(100); - prev.connect_output(to_next); - sink.connect_input(from_prev); -} - fn bootstrap( mut source: sources::Bootstrapper, mut filters: Vec, mut sink: sinks::Bootstrapper, + mut cursor: cursor::Bootstrapper, policy: gasket::runtime::Policy, ) -> Result { - chain_stages( - &mut source, - filters - .iter_mut() - .map(|x| x as &mut dyn StageBootstrapper) - .collect::>(), - &mut sink, - ); + let mut prev = source.borrow_output(); + + for filter in filters.iter_mut() { + gasket::messaging::tokio::connect_ports(prev, filter.borrow_input(), 100); + prev = filter.borrow_output(); + } + + gasket::messaging::tokio::connect_ports(prev, sink.borrow_input(), 100); + let prev = sink.borrow_cursor(); + + gasket::messaging::tokio::connect_ports(prev, cursor.borrow_track(), 100); let runtime = Runtime { source: source.spawn(policy.clone()), @@ -137,7 +125,8 @@ fn bootstrap( .into_iter() .map(|x| x.spawn(policy.clone())) .collect(), - sink: sink.spawn(policy), + sink: sink.spawn(policy.clone()), + cursor: cursor.spawn(policy), }; Ok(runtime) @@ -152,16 +141,15 @@ pub fn run(args: &Args) -> Result<(), Error> { let intersect = config.intersect; let finalize = config.finalize; let current_dir = std::env::current_dir().unwrap(); - - // TODO: load from persistence mechanism - let cursor = Cursor::new(VecDeque::new()); + let cursor = config.cursor.unwrap_or_default(); + let breadcrumbs = cursor.initial_load()?; let ctx = Context { chain, intersect, finalize, - cursor, current_dir, + breadcrumbs, }; let source = config.source.bootstrapper(&ctx)?; @@ -175,8 +163,10 @@ pub fn run(args: &Args) -> Result<(), Error> { let sink = config.sink.bootstrapper(&ctx)?; + let cursor = cursor.bootstrapper(&ctx)?; + let retries = define_gasket_policy(config.retries.as_ref()); - let runtime = bootstrap(source, filters, sink, retries)?; + let runtime = bootstrap(source, filters, sink, cursor, retries)?; info!("oura is running..."); diff --git a/src/cursor/file.rs b/src/cursor/file.rs new file mode 100644 index 00000000..1719f852 --- /dev/null +++ b/src/cursor/file.rs @@ -0,0 +1,153 @@ +use std::path::PathBuf; + +use gasket::framework::*; +use pallas::network::miniprotocols::Point; +use serde::Deserialize; +use tokio::select; + +use crate::framework::*; + +fn breadcrumbs_to_data(crumbs: &Breadcrumbs) -> Vec<(u64, String)> { + crumbs + .points() + .into_iter() + .filter_map(|p| match p { + Point::Origin => None, + Point::Specific(slot, hash) => Some((slot, hex::encode(hash))), + }) + .collect() +} + +fn breadcrumbs_from_data(data: Vec<(u64, String)>, max: usize) -> Result { + let points: Vec<_> = data + .into_iter() + .map::, _>(|(slot, hash)| { + let hash = hex::decode(hash).map_err(Error::custom)?; + Ok(Point::Specific(slot, hash)) + }) + .collect::>()?; + + Ok(Breadcrumbs::from_points(points, max)) +} + +pub enum Unit { + Track(Point), + Flush, +} + +#[derive(Default)] +pub struct Worker {} + +#[async_trait::async_trait(?Send)] +impl gasket::framework::Worker for Worker { + async fn bootstrap(_: &Stage) -> Result { + Ok(Default::default()) + } + + async fn schedule(&mut self, stage: &mut Stage) -> Result, WorkerError> { + select! { + msg = stage.track.recv() => { + let msg = msg.or_panic()?; + Ok(WorkSchedule::Unit(Unit::Track(msg.payload))) + } + msg = stage.flush.recv() => { + msg.or_panic()?; + Ok(WorkSchedule::Unit(Unit::Flush)) + } + } + } + + async fn execute(&mut self, unit: &Unit, stage: &mut Stage) -> Result<(), WorkerError> { + match unit { + Unit::Track(x) => stage.breadcrumbs.track(x.clone()), + Unit::Flush => { + let file = std::fs::File::options() + .write(true) + .create(true) + .append(false) + .truncate(true) + .open(&stage.path) + .or_panic()?; + + let data = breadcrumbs_to_data(&stage.breadcrumbs); + serde_json::to_writer_pretty(&file, &data).or_panic()?; + } + } + + Ok(()) + } +} + +#[derive(Stage)] +#[stage(name = "cursor", unit = "Unit", worker = "Worker")] +pub struct Stage { + path: std::path::PathBuf, + + breadcrumbs: Breadcrumbs, + + pub track: gasket::messaging::InputPort, + + pub flush: gasket::messaging::TimerPort, + + #[metric] + tracked_slot: gasket::metrics::Gauge, + + #[metric] + flush_count: gasket::metrics::Counter, +} + +const DEFAULT_MAX_BREADCRUMBS: usize = 10; +const DEFAULT_FLUSH_INTERVAL: usize = 10; + +#[derive(Default, Debug, Deserialize)] +pub struct Config { + pub path: Option, + pub max_breadcrumbs: Option, + pub flush_interval: Option, +} + +impl Config { + fn define_path(&self) -> Result { + let path = self.path.clone(); + + let path = match path { + Some(x) => x, + None => std::env::current_dir() + .map_err(Error::custom)? + .join("cursor.json"), + }; + + Ok(path) + } + + pub fn initial_load(&self) -> Result { + let path = self.define_path()?; + + let max_breadcrumbs = self.max_breadcrumbs.unwrap_or(DEFAULT_MAX_BREADCRUMBS); + + if path.is_file() { + let file = std::fs::File::open(&path).map_err(|err| Error::Custom(err.to_string()))?; + let data: Vec<(u64, String)> = serde_json::from_reader(&file).map_err(Error::custom)?; + let crumbs = breadcrumbs_from_data(data, max_breadcrumbs)?; + + Ok(crumbs) + } else { + Ok(Breadcrumbs::new(max_breadcrumbs)) + } + } + + pub fn bootstrapper(self, ctx: &Context) -> Result { + let flush_interval = self.flush_interval.unwrap_or(DEFAULT_FLUSH_INTERVAL as u64); + + let stage = Stage { + path: self.define_path()?, + breadcrumbs: ctx.breadcrumbs.clone(), + tracked_slot: Default::default(), + flush_count: Default::default(), + track: Default::default(), + flush: gasket::messaging::TimerPort::from_secs(flush_interval), + }; + + Ok(stage) + } +} diff --git a/src/cursor/memory.rs b/src/cursor/memory.rs new file mode 100644 index 00000000..cebccf22 --- /dev/null +++ b/src/cursor/memory.rs @@ -0,0 +1,55 @@ +use gasket::framework::*; +use pallas::network::miniprotocols::Point; +use serde::Deserialize; + +use crate::framework::*; + +#[derive(Default)] +pub struct Worker {} + +#[async_trait::async_trait(?Send)] +impl gasket::framework::Worker for Worker { + async fn bootstrap(_: &Stage) -> Result { + Ok(Default::default()) + } + + async fn schedule(&mut self, stage: &mut Stage) -> Result, WorkerError> { + let msg = stage.track.recv().await.or_panic()?; + Ok(WorkSchedule::Unit(msg.payload)) + } + + async fn execute(&mut self, unit: &Point, stage: &mut Stage) -> Result<(), WorkerError> { + stage.breadcrumbs.track(unit.clone()); + Ok(()) + } +} + +#[derive(Stage)] +#[stage(name = "cursor", unit = "Point", worker = "Worker")] +pub struct Stage { + breadcrumbs: Breadcrumbs, + + pub track: gasket::messaging::InputPort, + + #[metric] + tracked_slot: gasket::metrics::Gauge, +} + +#[derive(Default, Debug, Deserialize)] +pub struct Config; + +impl Config { + pub fn initial_load(&self) -> Result { + Ok(Breadcrumbs::new(30)) + } + + pub fn bootstrapper(self, ctx: &Context) -> Result { + let stage = Stage { + breadcrumbs: ctx.breadcrumbs.clone(), + tracked_slot: Default::default(), + track: Default::default(), + }; + + Ok(stage) + } +} diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs new file mode 100644 index 00000000..67d92603 --- /dev/null +++ b/src/cursor/mod.rs @@ -0,0 +1,60 @@ +use gasket::{messaging::InputPort, runtime::Tether}; +use pallas::network::miniprotocols::Point; +use serde::Deserialize; + +use crate::framework::*; + +pub mod file; +pub mod memory; + +pub type MaxBreadcrums = usize; + +pub enum Bootstrapper { + Memory(memory::Stage), + File(file::Stage), +} + +impl Bootstrapper { + pub fn borrow_track(&mut self) -> &mut InputPort { + match self { + Bootstrapper::Memory(x) => &mut x.track, + Bootstrapper::File(x) => &mut x.track, + } + } + + pub fn spawn(self, policy: gasket::runtime::Policy) -> Tether { + match self { + Bootstrapper::Memory(x) => gasket::runtime::spawn_stage(x, policy), + Bootstrapper::File(x) => gasket::runtime::spawn_stage(x, policy), + } + } +} + +#[derive(Deserialize)] +#[serde(tag = "type")] +pub enum Config { + Memory(memory::Config), + File(file::Config), +} + +impl Config { + pub fn initial_load(&self) -> Result { + match self { + Config::Memory(x) => x.initial_load(), + Config::File(x) => x.initial_load(), + } + } + + pub fn bootstrapper(self, ctx: &Context) -> Result { + match self { + Config::Memory(c) => Ok(Bootstrapper::Memory(c.bootstrapper(ctx)?)), + Config::File(c) => Ok(Bootstrapper::File(c.bootstrapper(ctx)?)), + } + } +} + +impl Default for Config { + fn default() -> Self { + Config::Memory(memory::Config) + } +} diff --git a/src/filters/legacy_v1/mod.rs b/src/filters/legacy_v1/mod.rs index d8d5247c..f9b2dfbd 100644 --- a/src/filters/legacy_v1/mod.rs +++ b/src/filters/legacy_v1/mod.rs @@ -42,7 +42,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { ChainEvent::Apply(point, Record::CborBlock(cbor)) => { let mut writer = EventWriter::new( point.clone(), - stage.output.clone(), + &stage.output, &stage.config, &stage.genesis, &mut buffer, @@ -53,7 +53,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { ChainEvent::Reset(point) => { let mut writer = EventWriter::new( point.clone(), - stage.output.clone(), + &stage.output, &stage.config, &stage.genesis, &mut buffer, diff --git a/src/filters/legacy_v1/prelude.rs b/src/filters/legacy_v1/prelude.rs index d55dc1a8..8b133275 100644 --- a/src/filters/legacy_v1/prelude.rs +++ b/src/filters/legacy_v1/prelude.rs @@ -11,7 +11,7 @@ use super::Config; pub struct EventWriter<'a> { context: EventContext, point: Point, - output: MapperOutputPort, + output: &'a MapperOutputPort, pub(crate) config: &'a Config, pub(crate) genesis: &'a GenesisValues, buffer: &'a mut Vec, @@ -20,7 +20,7 @@ pub struct EventWriter<'a> { impl<'a> EventWriter<'a> { pub fn new( point: Point, - output: MapperOutputPort, + output: &'a MapperOutputPort, config: &'a Config, genesis: &'a GenesisValues, buffer: &'a mut Vec, @@ -61,7 +61,7 @@ impl<'a> EventWriter<'a> { EventWriter { context: extra_context, point: self.point.clone(), - output: self.output.clone(), + output: self.output, config: self.config, genesis: self.genesis, buffer: self.buffer, diff --git a/src/filters/mod.rs b/src/filters/mod.rs index 16f6f80c..89e2beba 100644 --- a/src/filters/mod.rs +++ b/src/filters/mod.rs @@ -1,7 +1,4 @@ -use gasket::{ - messaging::{RecvPort, SendPort}, - runtime::Tether, -}; +use gasket::runtime::Tether; use serde::Deserialize; use crate::framework::*; @@ -32,40 +29,40 @@ pub enum Bootstrapper { Deno(deno::Stage), } -impl StageBootstrapper for Bootstrapper { - fn connect_input(&mut self, adapter: InputAdapter) { +impl Bootstrapper { + pub fn borrow_input(&mut self) -> &mut FilterInputPort { match self { - Bootstrapper::Noop(p) => p.input.connect(adapter), - Bootstrapper::SplitBlock(p) => p.input.connect(adapter), - Bootstrapper::Dsl(p) => p.input.connect(adapter), - Bootstrapper::Json(p) => p.input.connect(adapter), - Bootstrapper::LegacyV1(p) => p.input.connect(adapter), - Bootstrapper::Wasm(p) => p.input.connect(adapter), - Bootstrapper::ParseCbor(p) => p.input.connect(adapter), - Bootstrapper::MatchPattern(p) => p.input.connect(adapter), + Bootstrapper::Noop(p) => &mut p.input, + Bootstrapper::SplitBlock(p) => &mut p.input, + Bootstrapper::Dsl(p) => &mut p.input, + Bootstrapper::Json(p) => &mut p.input, + Bootstrapper::LegacyV1(p) => &mut p.input, + Bootstrapper::Wasm(p) => &mut p.input, + Bootstrapper::ParseCbor(p) => &mut p.input, + Bootstrapper::MatchPattern(p) => &mut p.input, #[cfg(feature = "deno")] - Bootstrapper::Deno(p) => p.input.connect(adapter), + Bootstrapper::Deno(p) => &mut p.input, } } - fn connect_output(&mut self, adapter: OutputAdapter) { + pub fn borrow_output(&mut self) -> &mut FilterOutputPort { match self { - Bootstrapper::Noop(p) => p.output.connect(adapter), - Bootstrapper::SplitBlock(p) => p.output.connect(adapter), - Bootstrapper::Dsl(p) => p.output.connect(adapter), - Bootstrapper::Json(p) => p.output.connect(adapter), - Bootstrapper::LegacyV1(p) => p.output.connect(adapter), - Bootstrapper::Wasm(p) => p.output.connect(adapter), - Bootstrapper::ParseCbor(p) => p.output.connect(adapter), - Bootstrapper::MatchPattern(p) => p.output.connect(adapter), + Bootstrapper::Noop(p) => &mut p.output, + Bootstrapper::SplitBlock(p) => &mut p.output, + Bootstrapper::Dsl(p) => &mut p.output, + Bootstrapper::Json(p) => &mut p.output, + Bootstrapper::LegacyV1(p) => &mut p.output, + Bootstrapper::Wasm(p) => &mut p.output, + Bootstrapper::ParseCbor(p) => &mut p.output, + Bootstrapper::MatchPattern(p) => &mut p.output, #[cfg(feature = "deno")] - Bootstrapper::Deno(p) => p.output.connect(adapter), + Bootstrapper::Deno(p) => &mut p.output, } } - fn spawn(self, policy: gasket::runtime::Policy) -> Tether { + pub fn spawn(self, policy: gasket::runtime::Policy) -> Tether { match self { Bootstrapper::Noop(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::SplitBlock(x) => gasket::runtime::spawn_stage(x, policy), diff --git a/src/framework/cursor.rs b/src/framework/cursor.rs deleted file mode 100644 index 6ad24b70..00000000 --- a/src/framework/cursor.rs +++ /dev/null @@ -1,45 +0,0 @@ -use std::{ - collections::VecDeque, - sync::{Arc, RwLock}, -}; - -use pallas::network::miniprotocols::Point; - -const HARDCODED_BREADCRUMBS: usize = 20; - -type State = VecDeque; - -// TODO: include exponential breadcrumbs logic here -#[derive(Clone)] -pub struct Cursor(Arc>); - -impl Cursor { - pub fn new(state: State) -> Self { - Self(Arc::new(RwLock::new(state))) - } - - pub fn is_empty(&self) -> bool { - let v = self.0.read().unwrap(); - v.is_empty() - } - - pub fn clone_state(&self) -> State { - let v = self.0.read().unwrap(); - v.clone() - } - - pub fn latest_known_point(&self) -> Option { - let state = self.0.read().unwrap(); - state.front().cloned() - } - - pub fn add_breadcrumb(&self, value: Point) { - let mut state = self.0.write().unwrap(); - - state.push_front(value); - - if state.len() > HARDCODED_BREADCRUMBS { - state.pop_back(); - } - } -} diff --git a/src/framework/mod.rs b/src/framework/mod.rs index 6e8c2c71..9dc85005 100644 --- a/src/framework/mod.rs +++ b/src/framework/mod.rs @@ -2,22 +2,68 @@ use pallas::network::miniprotocols::Point; use serde::Deserialize; +use serde_json::{json, Value as JsonValue}; +use std::collections::VecDeque; use std::fmt::Debug; use std::path::PathBuf; +pub use crate::cursor::Config as CursorConfig; + // we use UtxoRpc as our canonical representation of a parsed Tx pub use utxorpc::proto::cardano::v1::Tx as ParsedTx; // we use GenesisValues from Pallas as our ChainConfig pub use pallas::ledger::traverse::wellknown::GenesisValues; -pub mod cursor; pub mod errors; pub mod legacy_v1; -pub use cursor::*; pub use errors::*; +#[derive(Clone)] +pub struct Breadcrumbs { + state: VecDeque, + max: usize, +} + +impl Breadcrumbs { + pub fn new(max: usize) -> Self { + Self { + state: Default::default(), + max, + } + } + + pub fn from_points(points: Vec, max: usize) -> Self { + Self { + state: VecDeque::from_iter(points), + max, + } + } + + pub fn is_empty(&self) -> bool { + self.state.is_empty() + } + + pub fn track(&mut self, point: Point) { + // if we have a rollback, retain only older points + self.state + .retain(|p| p.slot_or_default() < point.slot_or_default()); + + // add the new point we're tracking + self.state.push_front(point); + + // if we have too many points, remove the older ones + if self.state.len() > self.max { + self.state.pop_back(); + } + } + + pub fn points(&self) -> Vec { + self.state.iter().map(Clone::clone).collect() + } +} + #[derive(Deserialize, Clone)] #[serde(tag = "type", rename_all = "lowercase")] pub enum ChainConfig { @@ -49,13 +95,11 @@ impl From for GenesisValues { pub struct Context { pub chain: ChainConfig, pub intersect: IntersectConfig, - pub cursor: Cursor, pub finalize: Option, pub current_dir: PathBuf, + pub breadcrumbs: Breadcrumbs, } -use serde_json::{json, Value as JsonValue}; - #[derive(Debug, Clone)] pub enum Record { CborBlock(Vec), @@ -193,21 +237,13 @@ impl From for JsonValue { } } -pub type SourceOutputPort = gasket::messaging::tokio::OutputPort; -pub type FilterInputPort = gasket::messaging::tokio::InputPort; -pub type FilterOutputPort = gasket::messaging::tokio::OutputPort; -pub type MapperInputPort = gasket::messaging::tokio::InputPort; -pub type MapperOutputPort = gasket::messaging::tokio::OutputPort; -pub type SinkInputPort = gasket::messaging::tokio::InputPort; - -pub type OutputAdapter = gasket::messaging::tokio::ChannelSendAdapter; -pub type InputAdapter = gasket::messaging::tokio::ChannelRecvAdapter; - -pub trait StageBootstrapper { - fn connect_output(&mut self, adapter: OutputAdapter); - fn connect_input(&mut self, adapter: InputAdapter); - fn spawn(self, policy: gasket::runtime::Policy) -> gasket::runtime::Tether; -} +pub type SourceOutputPort = gasket::messaging::OutputPort; +pub type FilterInputPort = gasket::messaging::InputPort; +pub type FilterOutputPort = gasket::messaging::OutputPort; +pub type MapperInputPort = gasket::messaging::InputPort; +pub type MapperOutputPort = gasket::messaging::OutputPort; +pub type SinkInputPort = gasket::messaging::InputPort; +pub type SinkCursorPort = gasket::messaging::OutputPort; #[derive(Debug, Deserialize, Clone)] #[serde(tag = "type", content = "value")] diff --git a/src/lib.rs b/src/lib.rs index 42aeeccb..36bfe792 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod cursor; pub mod filters; pub mod framework; pub mod sinks; diff --git a/src/sinks/assert/mod.rs b/src/sinks/assert/mod.rs index 1f03fc12..8a296d99 100644 --- a/src/sinks/assert/mod.rs +++ b/src/sinks/assert/mod.rs @@ -105,7 +105,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -115,9 +115,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-assert", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -136,13 +136,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/aws_lambda.rs b/src/sinks/aws_lambda.rs index 84adc04f..abee8b5a 100644 --- a/src/sinks/aws_lambda.rs +++ b/src/sinks/aws_lambda.rs @@ -50,7 +50,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -60,9 +60,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-aws-lambda", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -78,13 +78,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/aws_s3.rs b/src/sinks/aws_s3.rs index f3959d8a..d128cb5c 100644 --- a/src/sinks/aws_s3.rs +++ b/src/sinks/aws_s3.rs @@ -69,7 +69,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -79,9 +79,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-aws-s3", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -99,13 +99,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/aws_sqs.rs b/src/sinks/aws_sqs.rs index 7f154957..235ad707 100644 --- a/src/sinks/aws_sqs.rs +++ b/src/sinks/aws_sqs.rs @@ -68,7 +68,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -78,9 +78,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-aws-sqs", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -97,13 +97,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/elasticsearch.rs b/src/sinks/elasticsearch.rs index a2c42e6a..d61b8d7c 100644 --- a/src/sinks/elasticsearch.rs +++ b/src/sinks/elasticsearch.rs @@ -87,7 +87,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(slot as i64); - stage.cursor.add_breadcrumb(point); + stage.cursor.send(point.into()).await.or_panic()?; Ok(()) } @@ -98,9 +98,9 @@ impl gasket::framework::Worker for Worker { pub struct Stage { config: Config, genesis: GenesisValues, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -139,10 +139,10 @@ impl Config { let stage = Stage { config: self, genesis: ctx.chain.clone().into(), - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/file_rotate.rs b/src/sinks/file_rotate.rs index 8eb527d9..a157079f 100644 --- a/src/sinks/file_rotate.rs +++ b/src/sinks/file_rotate.rs @@ -20,7 +20,7 @@ pub struct Worker { impl gasket::framework::Worker for Worker { async fn bootstrap(stage: &Stage) -> Result { let output_path = match &stage.config.output_path { - Some(x) => PathBuf::try_from(x).map_err(Error::config).or_panic()?, + Some(x) => PathBuf::from(x), None => stage.current_dir.clone(), }; @@ -73,7 +73,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -84,9 +84,9 @@ impl gasket::framework::Worker for Worker { pub struct Stage { config: Config, current_dir: PathBuf, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -118,10 +118,10 @@ impl Config { let stage = Stage { config: self, current_dir: ctx.current_dir.clone(), - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/gcp_cloudfunction.rs b/src/sinks/gcp_cloudfunction.rs index f3483cbf..4dec2a2a 100644 --- a/src/sinks/gcp_cloudfunction.rs +++ b/src/sinks/gcp_cloudfunction.rs @@ -218,7 +218,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -232,9 +232,9 @@ impl gasket::framework::Worker for Worker { )] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -253,13 +253,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/gcp_pubsub.rs b/src/sinks/gcp_pubsub.rs index a85786c2..9ee3591b 100644 --- a/src/sinks/gcp_pubsub.rs +++ b/src/sinks/gcp_pubsub.rs @@ -52,7 +52,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -62,9 +62,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-gcp-pubsub", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -79,13 +79,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs index 88d0007c..14596871 100644 --- a/src/sinks/kafka.rs +++ b/src/sinks/kafka.rs @@ -69,7 +69,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -79,9 +79,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-kafka", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -105,13 +105,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index e3c84047..059a5fe0 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -1,4 +1,4 @@ -use gasket::{messaging::RecvPort, runtime::Tether}; +use gasket::runtime::Tether; use serde::Deserialize; use crate::framework::*; @@ -82,54 +82,92 @@ pub enum Bootstrapper { ElasticSearch(elasticsearch::Stage), } -impl StageBootstrapper for Bootstrapper { - fn connect_output(&mut self, _: OutputAdapter) { - panic!("attempted to use sink stage as sender"); +impl Bootstrapper { + pub fn borrow_input(&mut self) -> &mut SinkInputPort { + match self { + Bootstrapper::Terminal(p) => &mut p.input, + Bootstrapper::Stdout(p) => &mut p.input, + Bootstrapper::Noop(p) => &mut p.input, + Bootstrapper::Assert(p) => &mut p.input, + + #[cfg(feature = "sink-file-rotate")] + Bootstrapper::FileRotate(p) => &mut p.input, + + #[cfg(feature = "sink-webhook")] + Bootstrapper::WebHook(p) => &mut p.input, + + #[cfg(feature = "sink-rabbitmq")] + Bootstrapper::Rabbitmq(p) => &mut p.input, + + #[cfg(feature = "sink-kafka")] + Bootstrapper::Kafka(p) => &mut p.input, + + #[cfg(feature = "sink-aws-sqs")] + Bootstrapper::AwsSqs(p) => &mut p.input, + + #[cfg(feature = "sink-aws-lambda")] + Bootstrapper::AwsLambda(p) => &mut p.input, + + #[cfg(feature = "sink-aws-s3")] + Bootstrapper::AwsS3(p) => &mut p.input, + + #[cfg(feature = "sink-gcp-pubsub")] + Bootstrapper::GcpPubSub(p) => &mut p.input, + + #[cfg(feature = "sink-gcp-cloudfunction")] + Bootstrapper::GcpCloudFunction(p) => &mut p.input, + + #[cfg(feature = "sink-redis")] + Bootstrapper::Redis(p) => &mut p.input, + + #[cfg(feature = "sink-elasticsearch")] + Bootstrapper::ElasticSearch(p) => &mut p.input, + } } - fn connect_input(&mut self, adapter: InputAdapter) { + pub fn borrow_cursor(&mut self) -> &mut SinkCursorPort { match self { - Bootstrapper::Terminal(p) => p.input.connect(adapter), - Bootstrapper::Stdout(p) => p.input.connect(adapter), - Bootstrapper::Noop(p) => p.input.connect(adapter), - Bootstrapper::Assert(p) => p.input.connect(adapter), + Bootstrapper::Terminal(p) => &mut p.cursor, + Bootstrapper::Stdout(p) => &mut p.cursor, + Bootstrapper::Noop(p) => &mut p.cursor, + Bootstrapper::Assert(p) => &mut p.cursor, #[cfg(feature = "sink-file-rotate")] - Bootstrapper::FileRotate(p) => p.input.connect(adapter), + Bootstrapper::FileRotate(p) => &mut p.cursor, #[cfg(feature = "sink-webhook")] - Bootstrapper::WebHook(p) => p.input.connect(adapter), + Bootstrapper::WebHook(p) => &mut p.cursor, #[cfg(feature = "sink-rabbitmq")] - Bootstrapper::Rabbitmq(p) => p.input.connect(adapter), + Bootstrapper::Rabbitmq(p) => &mut p.cursor, #[cfg(feature = "sink-kafka")] - Bootstrapper::Kafka(p) => p.input.connect(adapter), + Bootstrapper::Kafka(p) => &mut p.cursor, #[cfg(feature = "sink-aws-sqs")] - Bootstrapper::AwsSqs(p) => p.input.connect(adapter), + Bootstrapper::AwsSqs(p) => &mut p.cursor, #[cfg(feature = "sink-aws-lambda")] - Bootstrapper::AwsLambda(p) => p.input.connect(adapter), + Bootstrapper::AwsLambda(p) => &mut p.cursor, #[cfg(feature = "sink-aws-s3")] - Bootstrapper::AwsS3(p) => p.input.connect(adapter), + Bootstrapper::AwsS3(p) => &mut p.cursor, #[cfg(feature = "sink-gcp-pubsub")] - Bootstrapper::GcpPubSub(p) => p.input.connect(adapter), + Bootstrapper::GcpPubSub(p) => &mut p.cursor, #[cfg(feature = "sink-gcp-cloudfunction")] - Bootstrapper::GcpCloudFunction(p) => p.input.connect(adapter), + Bootstrapper::GcpCloudFunction(p) => &mut p.cursor, #[cfg(feature = "sink-redis")] - Bootstrapper::Redis(p) => p.input.connect(adapter), + Bootstrapper::Redis(p) => &mut p.cursor, #[cfg(feature = "sink-elasticsearch")] - Bootstrapper::ElasticSearch(p) => p.input.connect(adapter), + Bootstrapper::ElasticSearch(p) => &mut p.cursor, } } - fn spawn(self, policy: gasket::runtime::Policy) -> Tether { + pub fn spawn(self, policy: gasket::runtime::Policy) -> Tether { match self { Bootstrapper::Terminal(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::Stdout(x) => gasket::runtime::spawn_stage(x, policy), diff --git a/src/sinks/noop.rs b/src/sinks/noop.rs index be6785cc..7c628462 100644 --- a/src/sinks/noop.rs +++ b/src/sinks/noop.rs @@ -28,7 +28,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(unit.slot_or_default() as i64); - stage.cursor.add_breadcrumb(unit.clone()); + stage.cursor.send(unit.clone().into()).await.or_panic()?; Ok(()) } @@ -37,9 +37,8 @@ impl gasket::framework::Worker for Worker { #[derive(Stage)] #[stage(name = "sink-noop", unit = "Point", worker = "Worker")] pub struct Stage { - cursor: Cursor, - pub input: FilterInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -52,12 +51,12 @@ pub struct Stage { pub struct Config {} impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _: &Context) -> Result { let stage = Stage { - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index 60bfef76..f77f67ec 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -53,7 +53,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -63,9 +63,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-rabbitmq", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -82,13 +82,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/redis.rs b/src/sinks/redis.rs index ed939b63..b4a9e77f 100644 --- a/src/sinks/redis.rs +++ b/src/sinks/redis.rs @@ -72,7 +72,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -82,9 +82,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-redis", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -106,13 +106,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/stdout.rs b/src/sinks/stdout.rs index 4cad7861..5ead4ee5 100644 --- a/src/sinks/stdout.rs +++ b/src/sinks/stdout.rs @@ -39,7 +39,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -48,9 +48,8 @@ impl gasket::framework::Worker for Worker { #[derive(Stage)] #[stage(name = "sink-stdout", unit = "ChainEvent", worker = "Worker")] pub struct Stage { - cursor: Cursor, - pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -63,12 +62,12 @@ pub struct Stage { pub struct Config; impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _: &Context) -> Result { let stage = Stage { - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/terminal/mod.rs b/src/sinks/terminal/mod.rs index 1ec7050c..607d764e 100644 --- a/src/sinks/terminal/mod.rs +++ b/src/sinks/terminal/mod.rs @@ -77,7 +77,7 @@ impl gasket::framework::Worker for Worker { self.stdout.execute(Print(line)).or_panic()?; stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point); + stage.cursor.send(point.into()).await.or_panic()?; Ok(()) } @@ -87,8 +87,8 @@ impl gasket::framework::Worker for Worker { #[stage(name = "filter", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, + pub cursor: SinkCursorPort, pub input: MapperInputPort, #[metric] @@ -106,13 +106,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _: &Context) -> Result { let stage = Stage { config: self, ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), - cursor: ctx.cursor.clone(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sinks/webhook.rs b/src/sinks/webhook.rs index 07cd3acd..d0a3aaf9 100644 --- a/src/sinks/webhook.rs +++ b/src/sinks/webhook.rs @@ -72,7 +72,7 @@ impl gasket::framework::Worker for Worker { stage.ops_count.inc(1); stage.latest_block.set(point.slot_or_default() as i64); - stage.cursor.add_breadcrumb(point.clone()); + stage.cursor.send(point.clone().into()).await.or_panic()?; Ok(()) } @@ -82,9 +82,9 @@ impl gasket::framework::Worker for Worker { #[stage(name = "sink-webhook", unit = "ChainEvent", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, pub input: MapperInputPort, + pub cursor: SinkCursorPort, #[metric] ops_count: gasket::metrics::Counter, @@ -109,13 +109,13 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), input: Default::default(), + cursor: Default::default(), }; Ok(stage) diff --git a/src/sources/mod.rs b/src/sources/mod.rs index a50dfca8..bed3d715 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -1,4 +1,4 @@ -use gasket::{messaging::SendPort, runtime::Tether}; +use gasket::runtime::Tether; use serde::Deserialize; use crate::framework::*; @@ -26,25 +26,21 @@ pub enum Bootstrapper { UtxoRPC(utxorpc::Stage), } -impl StageBootstrapper for Bootstrapper { - fn connect_output(&mut self, adapter: OutputAdapter) { +impl Bootstrapper { + pub fn borrow_output(&mut self) -> &mut SourceOutputPort { match self { - Bootstrapper::N2N(p) => p.output.connect(adapter), - Bootstrapper::N2C(p) => p.output.connect(adapter), + Bootstrapper::N2N(p) => &mut p.output, + Bootstrapper::N2C(p) => &mut p.output, #[cfg(feature = "aws")] - Bootstrapper::S3(p) => p.output.connect(adapter), + Bootstrapper::S3(p) => &mut p.output, #[cfg(feature = "source-utxorpc")] - Bootstrapper::UtxoRPC(p) => p.output.connect(adapter), + Bootstrapper::UtxoRPC(p) => &mut p.output, } } - fn connect_input(&mut self, _: InputAdapter) { - panic!("attempted to use source stage as receiver"); - } - - fn spawn(self, policy: gasket::runtime::Policy) -> Tether { + pub fn spawn(self, policy: gasket::runtime::Policy) -> Tether { match self { Bootstrapper::N2N(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::N2C(x) => gasket::runtime::spawn_stage(x, policy), diff --git a/src/sources/n2c.rs b/src/sources/n2c.rs index d823e6fc..ade9ea78 100644 --- a/src/sources/n2c.rs +++ b/src/sources/n2c.rs @@ -24,7 +24,7 @@ pub struct Stage { intersect: IntersectConfig, - cursor: Cursor, + breadcrumbs: Breadcrumbs, pub output: SourceOutputPort, @@ -63,12 +63,13 @@ async fn intersect_from_config( Ok(()) } -async fn intersect_from_cursor(peer: &mut NodeClient, cursor: &Cursor) -> Result<(), WorkerError> { - let points = cursor.clone_state(); - +async fn intersect_from_breadcrumbs( + peer: &mut NodeClient, + breadcrumbs: &Breadcrumbs, +) -> Result<(), WorkerError> { let (intersect, _) = peer .chainsync() - .find_intersect(points.into()) + .find_intersect(breadcrumbs.points()) .await .or_restart()?; @@ -92,16 +93,16 @@ impl Worker { let block = MultiEraBlock::decode(cbor).or_panic()?; let slot = block.slot(); let hash = block.hash(); + let point = Point::Specific(slot, hash.to_vec()); debug!(slot, %hash, "chain sync roll forward"); - let evt = ChainEvent::Apply( - pallas::network::miniprotocols::Point::Specific(slot, hash.to_vec()), - Record::CborBlock(cbor.to_vec()), - ); + let evt = ChainEvent::Apply(point.clone(), Record::CborBlock(cbor.to_vec())); stage.output.send(evt.into()).await.or_panic()?; + stage.breadcrumbs.track(point.clone()); + stage.chain_tip.set(tip.0.slot_or_default() as i64); Ok(()) @@ -118,6 +119,8 @@ impl Worker { .await .or_panic()?; + stage.breadcrumbs.track(point.clone()); + stage.chain_tip.set(tip.0.slot_or_default() as i64); Ok(()) @@ -139,10 +142,10 @@ impl gasket::framework::Worker for Worker { .await .or_retry()?; - if stage.cursor.is_empty() { + if stage.breadcrumbs.is_empty() { intersect_from_config(&mut peer_session, &stage.intersect).await?; } else { - intersect_from_cursor(&mut peer_session, &stage.cursor).await?; + intersect_from_breadcrumbs(&mut peer_session, &stage.breadcrumbs).await?; } let worker = Self { peer_session }; @@ -194,9 +197,9 @@ impl Config { pub fn bootstrapper(self, ctx: &Context) -> Result { let stage = Stage { config: self, + breadcrumbs: ctx.breadcrumbs.clone(), chain: ctx.chain.clone().into(), intersect: ctx.intersect.clone(), - cursor: ctx.cursor.clone(), output: Default::default(), ops_count: Default::default(), chain_tip: Default::default(), diff --git a/src/sources/n2n.rs b/src/sources/n2n.rs index f46bb2f3..aa6e5d5a 100644 --- a/src/sources/n2n.rs +++ b/src/sources/n2n.rs @@ -22,7 +22,7 @@ pub struct Stage { intersect: IntersectConfig, - cursor: Cursor, + breadcrumbs: Breadcrumbs, pub output: SourceOutputPort, @@ -70,12 +70,13 @@ async fn intersect_from_config( Ok(()) } -async fn intersect_from_cursor(peer: &mut PeerClient, cursor: &Cursor) -> Result<(), WorkerError> { - let points = cursor.clone_state(); - +async fn intersect_from_breadcrumbs( + peer: &mut PeerClient, + breadcrumbs: &Breadcrumbs, +) -> Result<(), WorkerError> { let (intersect, _) = peer .chainsync() - .find_intersect(points.into()) + .find_intersect(breadcrumbs.points()) .await .or_restart()?; @@ -99,13 +100,14 @@ impl Worker { let header = to_traverse(header).or_panic()?; let slot = header.slot(); let hash = header.hash(); + let point = Point::Specific(slot, hash.to_vec()); debug!(slot, %hash, "chain sync roll forward"); let block = self .peer_session .blockfetch() - .fetch_single(Point::Specific(slot, hash.to_vec())) + .fetch_single(point.clone()) .await .or_retry()?; @@ -116,6 +118,8 @@ impl Worker { stage.output.send(evt.into()).await.or_panic()?; + stage.breadcrumbs.track(point); + stage.chain_tip.set(tip.0.slot_or_default() as i64); Ok(()) @@ -132,6 +136,8 @@ impl Worker { .await .or_panic()?; + stage.breadcrumbs.track(point.clone()); + stage.chain_tip.set(tip.0.slot_or_default() as i64); Ok(()) @@ -161,10 +167,10 @@ impl gasket::framework::Worker for Worker { .await .or_retry()?; - if stage.cursor.is_empty() { + if stage.breadcrumbs.is_empty() { intersect_from_config(&mut peer_session, &stage.intersect).await?; } else { - intersect_from_cursor(&mut peer_session, &stage.cursor).await?; + intersect_from_breadcrumbs(&mut peer_session, &stage.breadcrumbs).await?; } let worker = Self { peer_session }; @@ -216,9 +222,9 @@ impl Config { pub fn bootstrapper(self, ctx: &Context) -> Result { let stage = Stage { config: self, + breadcrumbs: ctx.breadcrumbs.clone(), chain: ctx.chain.clone().into(), intersect: ctx.intersect.clone(), - cursor: ctx.cursor.clone(), output: Default::default(), ops_count: Default::default(), chain_tip: Default::default(), diff --git a/src/sources/utxorpc.rs b/src/sources/utxorpc.rs index f72d913a..f5bffd96 100644 --- a/src/sources/utxorpc.rs +++ b/src/sources/utxorpc.rs @@ -15,10 +15,20 @@ use utxorpc::proto::sync::v1::{BlockRef, DumpHistoryRequest, FollowTipRequest, F use crate::framework::*; +fn point_to_blockref(point: Point) -> Option { + match point { + Point::Origin => None, + Point::Specific(slot, hash) => Some(BlockRef { + index: slot, + hash: hash.into(), + }), + } +} + pub struct Worker { client: ChainSyncServiceClient, stream: Option>, - block_ref: Option, + intersect: Option, max_items_per_page: u32, } @@ -143,7 +153,7 @@ impl Worker { async fn next_dump_history(&mut self) -> Result>, WorkerError> { let dump_history_request = DumpHistoryRequest { - start_token: self.block_ref.clone(), + start_token: self.intersect.clone(), max_items: self.max_items_per_page, ..Default::default() }; @@ -155,7 +165,7 @@ impl Worker { .or_restart()? .into_inner(); - self.block_ref = result.next_token; + self.intersect = result.next_token; if !result.block.is_empty() { let actions: Vec = result.block.into_iter().map(Action::Apply).collect(); @@ -175,22 +185,17 @@ impl gasket::framework::Worker for Worker { .await .or_panic()?; - let mut point: Option<(u64, Vec)> = match stage.intersect.clone() { - IntersectConfig::Point(slot, hash) => Some((slot, hash.into())), - _ => None, + let intersect: Vec<_> = if stage.breadcrumbs.is_empty() { + stage.intersect.points().unwrap_or_default() + } else { + stage.breadcrumbs.points() }; - if let Some(latest_point) = stage.cursor.latest_known_point() { - point = match latest_point { - Point::Specific(slot, hash) => Some((slot, hash)), - _ => None, - }; - } - - let block_ref = point.map(|(slot, hash)| BlockRef { - index: slot, - hash: hash.into(), - }); + let intersect = intersect + .into_iter() + .filter_map(point_to_blockref) + .collect::>() + .pop(); let max_items_per_page = stage.config.max_items_per_page.unwrap_or(20); @@ -198,12 +203,12 @@ impl gasket::framework::Worker for Worker { client, stream: None, max_items_per_page, - block_ref, + intersect, }) } async fn schedule(&mut self, _: &mut Stage) -> Result>, WorkerError> { - if self.block_ref.is_some() { + if self.intersect.is_some() { return self.next_dump_history().await; } @@ -223,7 +228,7 @@ impl gasket::framework::Worker for Worker { #[stage(name = "source-utxorpc", unit = "Vec", worker = "Worker")] pub struct Stage { config: Config, - cursor: Cursor, + breadcrumbs: Breadcrumbs, intersect: IntersectConfig, pub output: SourceOutputPort, #[metric] @@ -242,7 +247,7 @@ impl Config { pub fn bootstrapper(self, ctx: &Context) -> Result { let stage = Stage { config: self, - cursor: ctx.cursor.clone(), + breadcrumbs: ctx.breadcrumbs.clone(), intersect: ctx.intersect.clone(), output: Default::default(), ops_count: Default::default(),