diff --git a/crates/tycho-common/src/models/mod.rs b/crates/tycho-common/src/models/mod.rs index e3c55f5020..e79bb4a07c 100644 --- a/crates/tycho-common/src/models/mod.rs +++ b/crates/tycho-common/src/models/mod.rs @@ -187,6 +187,20 @@ impl Chain { } } + /// Returns the block time in seconds for the chain. + pub fn block_time(&self) -> f64 { + match self { + Chain::Ethereum => 12.0, + Chain::ZkSync => 1.0, + Chain::Arbitrum => 0.25, + Chain::Base => 2.0, + Chain::Bsc => 0.45, + Chain::Unichain => 1.0, + Chain::Polygon => 2.0, + Chain::Starknet => 4.0, + } + } + /// Returns a default TVL threshold in native token units for the given tier. /// /// Values are approximate and target a USD-equivalent range, not a precise conversion. diff --git a/crates/tycho-indexer/src/extractor/factory.rs b/crates/tycho-indexer/src/extractor/factory.rs new file mode 100644 index 0000000000..973f6d1052 --- /dev/null +++ b/crates/tycho-indexer/src/extractor/factory.rs @@ -0,0 +1,584 @@ +use std::{collections::HashMap, path::Path, sync::Arc}; + +use anyhow::{format_err, Context}; +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::Client; +use prost::Message; +use serde::Deserialize; +use tokio::{ + runtime::Handle, + sync::{mpsc::Sender, oneshot, Mutex}, +}; +use tracing::info; +use tycho_common::{ + models::{Address, Chain, ExtractorIdentity, FinancialType, ImplementationType, ProtocolType}, + Bytes, +}; +use tycho_ethereum::{ + rpc::EthereumRpcClient, + services::{ + account_extractor::EVMAccountExtractor, entrypoint_tracer::tracer::EVMEntrypointService, + token_pre_processor::EthereumTokenPreProcessor, + }, +}; +use tycho_storage::postgres::cache::CachedGateway; + +use crate::{ + extractor::{ + chain_state::ChainState, + dynamic_contract_indexer::{ + dci::DynamicContractIndexer, hooks::hooks_dci_builder::UniswapV4HookDCIBuilder, + }, + post_processors::POST_PROCESSOR_REGISTRY, + protocol_cache::ProtocolMemoryCache, + protocol_extractor::{ExtractorPgGateway, ProtocolExtractor}, + runner::{compute_start_block, DCIPlugin, ExtractorRunner, SubscriptionsMap}, + DeltaCommand, ExtractionError, Extractor, + }, + pb::sf::substreams::v1::Package, + substreams::{stream::SubstreamsStream, SubstreamsEndpoint}, +}; + +#[derive(Debug, Deserialize, Clone)] +pub struct ProtocolTypeConfig { + name: String, + financial_type: FinancialType, +} + +impl ProtocolTypeConfig { + pub fn new(name: String, financial_type: FinancialType) -> Self { + Self { name, financial_type } + } +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum DCIType { + /// RPC DCI plugin — uses the RPC endpoint to fetch account data. + #[serde(rename = "rpc")] + RPC, + /// UniswapV4Hooks DCI plugin — wraps RPC DCI and generates hook entry points for tracing. + UniswapV4Hooks { pool_manager_address: String }, +} + +#[derive(Debug, Deserialize, Clone, Default)] +pub struct ExtractorConfig { + name: String, + chain: Chain, + implementation_type: ImplementationType, + sync_batch_size: usize, + start_block: i64, + stop_block: Option, + protocol_types: Vec, + spkg: String, + module_name: String, + #[serde(default)] + pub initialized_accounts: Vec, + #[serde(default)] + pub initialized_accounts_block: u64, + #[serde(default)] + pub dci_plugin: Option, + #[serde(default)] + post_processor: Option, + #[serde(default)] + pub(super) max_restarts: Option, +} + +impl ExtractorConfig { + #[allow(clippy::too_many_arguments)] + pub fn new( + name: String, + chain: Chain, + implementation_type: ImplementationType, + sync_batch_size: usize, + start_block: i64, + stop_block: Option, + protocol_types: Vec, + spkg: String, + module_name: String, + initialized_accounts: Vec, + initialized_accounts_block: u64, + post_processor: Option, + dci_plugin: Option, + max_restarts: Option, + ) -> Self { + Self { + name, + chain, + implementation_type, + sync_batch_size, + start_block, + stop_block, + protocol_types, + spkg, + module_name, + initialized_accounts, + initialized_accounts_block, + post_processor, + dci_plugin, + max_restarts, + } + } + + pub fn name(&self) -> &str { + &self.name + } +} + +/// Holds the config and all dependencies needed to build an extractor from scratch. +/// +/// Designed for repeated use: each call to `build_runner` produces a fresh `ProtocolExtractor` +/// with a new `ReorgBuffer` and DCI plugin — suitable for restart after failure. +/// +/// Reused across restarts: +/// - `protocol_cache`: populated once at construction; Arc-based so cloning is cheap and all runs +/// share the same live cache. The TTL mechanism refreshes stale entries automatically. +/// - `chain_state`: estimated once at construction (block number via RPC); `Copy` so each run gets +/// its own copy at no cost. +/// - `cached_gw`: each `build_runner` call creates a fresh instance via `new_instance()` (fresh +/// `open_tx` and LRU cache; shared write channel to `DBCacheWriteExecutor`). +/// - `token_pre_processor`, `rpc_client`: stateless RPC wrappers. +pub struct ExtractorFactory { + pub(super) config: ExtractorConfig, + endpoint_url: String, + s3_bucket: Option, + token: String, + chain: Chain, + cached_gw: CachedGateway, + token_pre_processor: EthereumTokenPreProcessor, + rpc_client: EthereumRpcClient, + database_insert_batch_size: usize, + partial_blocks: bool, + runtime_handle: Option, + protocol_cache: ProtocolMemoryCache, + chain_state: ChainState, +} + +impl ExtractorFactory { + /// Creates the factory, making one RPC call to fetch the current block number and populating + /// the protocol cache from the database. + /// + /// Both `chain_state` and `protocol_cache` are reused across all subsequent restarts. + #[allow(clippy::too_many_arguments)] + pub async fn create( + config: ExtractorConfig, + endpoint_url: String, + s3_bucket: Option, + token: String, + chain: Chain, + cached_gw: CachedGateway, + token_pre_processor: EthereumTokenPreProcessor, + rpc_client: EthereumRpcClient, + database_insert_batch_size: usize, + partial_blocks: bool, + runtime_handle: Option, + ) -> Result { + let block_number = rpc_client + .get_block_number() + .await + .map_err(|e| ExtractionError::Setup(format!("Failed to get block number: {e}")))?; + let chain_state = ChainState::new( + chrono::Local::now().naive_utc(), + block_number, + chain.block_time().ceil() as i64, // round up + ); + + let protocol_cache = ProtocolMemoryCache::new( + config.chain, + chrono::Duration::seconds(900), + Arc::new(cached_gw.clone()), + ); + protocol_cache.populate().await?; + + Ok(Self { + config, + endpoint_url, + s3_bucket, + token, + chain, + cached_gw, + token_pre_processor, + rpc_client, + database_insert_batch_size, + partial_blocks, + runtime_handle, + protocol_cache, + chain_state, + }) + } + + /// Builds a fresh, ready-to-run [`ExtractorRunner`]. + /// + /// Creates a fresh gateway instance, constructs the DCI plugin if configured, and establishes + /// the Substreams stream from the last committed block (or the config start block on first + /// run). The protocol cache and chain state are reused from factory construction. + pub async fn build_runner( + &self, + ws_subscriptions: Arc>, + pending_deltas_tx: Option>, + stop_rx: oneshot::Receiver<()>, + ) -> Result { + let fresh_gw = self.cached_gw.new_instance(); + + // Protocol types from config. + let protocol_types: HashMap = self + .config + .protocol_types + .iter() + .map(|pt| { + ( + pt.name.clone(), + ProtocolType::new( + pt.name.clone(), + pt.financial_type.clone(), + None, + self.config.implementation_type.clone(), + ), + ) + }) + .collect(); + + // Storage gateway for this extractor. + let gw = ExtractorPgGateway::new( + &self.config.name, + self.config.chain, + self.config.sync_batch_size, + fresh_gw.clone(), + ); + + // Optional post-processor. + let post_processor = self + .config + .post_processor + .as_deref() + .map(|name| { + POST_PROCESSOR_REGISTRY + .get(name) + .cloned() + .ok_or_else(|| { + ExtractionError::Setup(format!( + "Post processor '{name}' not found in registry" + )) + }) + }) + .transpose()?; + + // Optional DCI plugin. + let dci_plugin = match self.config.dci_plugin.as_ref() { + None => None, + Some(DCIType::RPC) => { + let rpc_dci = Self::create_rpc_dci( + &self.rpc_client, + self.config.chain, + self.config.name.clone(), + &fresh_gw, + ) + .await?; + Some(DCIPlugin::Standard(rpc_dci)) + } + Some(DCIType::UniswapV4Hooks { pool_manager_address }) => { + // random address to deploy our mini router to + let router_address = Address::from("0x2e234DAe75C793f67A35089C9d99245E1C58470b"); + let pool_manager = Address::from(pool_manager_address.as_str()); + + let base_dci = Self::create_rpc_dci( + &self.rpc_client, + self.config.chain, + self.config.name.clone(), + &fresh_gw, + ) + .await?; + + let mut hooks_dci = UniswapV4HookDCIBuilder::new( + base_dci, + &self.rpc_client, + router_address, + pool_manager, + fresh_gw.clone(), + self.config.chain, + ) + .pause_after_retries(3) + .max_retries(5) + .build()?; + + hooks_dci.initialize().await?; + Some(DCIPlugin::UniswapV4Hooks(Box::new(hooks_dci))) + } + }; + + // Build the protocol extractor. + let extractor = Arc::new( + ProtocolExtractor::>::new( + gw, + self.database_insert_batch_size, + &self.config.name, + self.config.chain, + self.chain_state, + self.config.name.clone(), + self.protocol_cache.clone(), + protocol_types, + self.token_pre_processor.clone(), + post_processor, + dci_plugin, + ) + .await?, + ); + + // Ensure the spkg file is present (download from S3 if needed). + ensure_spkg(&self.config.spkg, self.s3_bucket.as_deref()).await?; + + let content = std::fs::read(&self.config.spkg) + .with_context(|| format_err!("read package from file '{}'", self.config.spkg)) + .map_err(|err| ExtractionError::SubstreamsError(err.to_string()))?; + let spkg = Package::decode(content.as_ref()) + .context("decode spkg") + .map_err(|err| ExtractionError::SubstreamsError(err.to_string()))?; + + let endpoint = Arc::new( + SubstreamsEndpoint::new(&self.endpoint_url, Some(self.token.clone())) + .await + .map_err(|err| ExtractionError::SubstreamsError(err.to_string()))?, + ); + + // Determine the start block. + // + // We resume from (last_committed + 1) rather than using a cursor so that a restarted + // extractor always replays at least from the last finalized block. The cursor is only + // maintained inside SubstreamsStream for hot reconnections within a single run. + let last_block = extractor + .get_last_processed_block() + .await; + let start_block = compute_start_block(last_block.as_ref(), self.config.start_block)?; + if let Some(block) = &last_block { + info!( + start_block, + last_committed_block = block.number, + config_start_block = self.config.start_block, + "Fresh start: resuming from block after last committed" + ); + } + + let stream = SubstreamsStream::new( + endpoint, + None, // No cursor on fresh start. + Some(spkg), + self.config.module_name.clone(), + start_block, + self.config.stop_block.unwrap_or(0) as u64, + false, // final_block_only: not exposed in config, always false. + extractor.get_id().to_string(), + self.partial_blocks, + ); + + Ok(ExtractorRunner::new( + extractor, + stream, + ws_subscriptions, + pending_deltas_tx, + stop_rx, + self.runtime_handle.clone(), + self.partial_blocks, + )) + } + + pub fn extractor_id(&self) -> ExtractorIdentity { + ExtractorIdentity::new(self.chain, &self.config.name) + } + + /// Creates a RPC-based `DynamicContractIndexer` with account extractor and tracer configured. + async fn create_rpc_dci( + rpc_client: &EthereumRpcClient, + chain: Chain, + extractor_name: String, + cached_gw: &CachedGateway, + ) -> Result< + DynamicContractIndexer, + ExtractionError, + > { + let account_extractor = EVMAccountExtractor::new(rpc_client, chain); + + let tracer_rpc_client = if let Ok(tracer_rpc_url) = std::env::var("TRACE_RPC_URL") { + EthereumRpcClient::new(&tracer_rpc_url).map_err(|err| { + ExtractionError::Setup(format!( + "Failed to create RPC client for {tracer_rpc_url}: {err}" + )) + })? + } else { + rpc_client.clone() + }; + + let max_retries = std::env::var("TRACE_MAX_RETRIES") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3); + + let retry_delay_ms = std::env::var("TRACE_RETRY_DELAY_MS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(200); + + let tracer = + EVMEntrypointService::new_with_config(&tracer_rpc_client, max_retries, retry_delay_ms); + + let mut rpc_dci = DynamicContractIndexer::new( + chain, + extractor_name, + cached_gw.clone(), + account_extractor, + tracer, + ); + rpc_dci.initialize().await?; + + Ok(rpc_dci) + } +} + +async fn ensure_spkg(spkg_path: &str, s3_bucket: Option<&str>) -> Result<(), ExtractionError> { + if !Path::new(spkg_path).exists() { + download_file_from_s3( + s3_bucket.ok_or_else(|| { + ExtractionError::Setup(format!("Missing spkg and s3 bucket config for {spkg_path}")) + })?, + spkg_path, + Path::new(spkg_path), + ) + .await + .map_err(|e| { + ExtractionError::Setup(format!("Failed to download {spkg_path} from s3. {e}")) + })?; + } + Ok(()) +} + +async fn download_file_from_s3( + bucket: &str, + key: &str, + download_path: &Path, +) -> anyhow::Result<()> { + info!("Downloading file from s3: {}/{} to {:?}", bucket, key, download_path); + + let region_provider = RegionProviderChain::default_provider().or_else("eu-central-1"); + let config = aws_config::from_env() + .region(region_provider) + .load() + .await; + let client = Client::new(&config); + + let resp = client + .get_object() + .bucket(bucket) + .key(key) + .send() + .await?; + + let data = resp + .body + .collect() + .await + .with_context(|| format!("Failed to read S3 response body for {key}"))?; + + if let Some(parent) = download_path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Failed to create directories for {parent:?}"))?; + } + + std::fs::write(download_path, data.into_bytes()) + .with_context(|| format!("Failed to write {download_path:?}"))?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extractor_config_without_dci_plugin() { + let yaml = r#" +name: uniswap_v2 +chain: ethereum +implementation_type: Custom +sync_batch_size: 1000 +start_block: 10008300 +protocol_types: + - name: uniswap_v2_pool + financial_type: Swap +spkg: substreams/ethereum-uniswap-v2/ethereum-uniswap-v2-v0.3.0.spkg +module_name: map_pool_events +"#; + + let config: ExtractorConfig = + serde_yaml::from_str(yaml).expect("Failed to deserialize YAML"); + + assert_eq!(config.name, "uniswap_v2"); + assert!(config.dci_plugin.is_none()); + } + + #[test] + fn test_dci_extractor_config() { + let yaml = r#" +name: uniswap_v3 +chain: ethereum +implementation_type: Custom +sync_batch_size: 1000 +start_block: 12369621 +protocol_types: + - name: uniswap_v3_pool + financial_type: Swap +spkg: substreams/ethereum-uniswap-v3/ethereum-uniswap-v3-logs-only-0.1.1.spkg +module_name: map_protocol_changes +dci_plugin: + type: rpc +"#; + + let config: ExtractorConfig = + serde_yaml::from_str(yaml).expect("Failed to deserialize YAML"); + + assert_eq!(config.name, "uniswap_v3"); + assert!( + matches!(config.dci_plugin, Some(DCIType::RPC)), + "Expected RPC DCI plugin but got {:?}", + config.dci_plugin + ); + } + + #[test] + fn test_uniswap_v4_hooks_dci_extractor_config() { + let yaml = r#" +name: uniswap_v4 +chain: ethereum +implementation_type: Custom +sync_batch_size: 1000 +start_block: 21688329 +protocol_types: + - name: uniswap_v4_pool + financial_type: Swap +spkg: substreams/ethereum-uniswap-v4/ethereum-uniswap-v4-v0.2.1.spkg +module_name: map_protocol_changes +dci_plugin: + type: uniswap_v4_hooks + router_address: "0x2e234DAe75C793f67A35089C9d99245E1C58470b" + pool_manager_address: "0x000000000004444c5dc75cB358380D2e3dE08A90" +"#; + + let config: ExtractorConfig = + serde_yaml::from_str(yaml).expect("Failed to deserialize YAML"); + + assert_eq!(config.name, "uniswap_v4"); + assert_eq!(config.chain, Chain::Ethereum); + assert_eq!(config.sync_batch_size, 1000); + assert_eq!(config.start_block, 21688329); + assert_eq!(config.protocol_types.len(), 1); + assert_eq!(config.protocol_types[0].name, "uniswap_v4_pool"); + + let dci_plugin = config + .dci_plugin + .expect("Expected dci_plugin to be set"); + match dci_plugin { + DCIType::UniswapV4Hooks { pool_manager_address } => { + assert_eq!(pool_manager_address, "0x000000000004444c5dc75cB358380D2e3dE08A90"); + } + _ => panic!("Expected UniswapV4Hooks DCI plugin but got RPC"), + } + } +} diff --git a/crates/tycho-indexer/src/extractor/mod.rs b/crates/tycho-indexer/src/extractor/mod.rs index dad1c82f8b..c51a2fe084 100644 --- a/crates/tycho-indexer/src/extractor/mod.rs +++ b/crates/tycho-indexer/src/extractor/mod.rs @@ -33,6 +33,7 @@ use crate::{ pub mod chain_state; mod dynamic_contract_indexer; +pub mod factory; pub mod models; pub mod post_processors; pub mod protobuf_deserialisation; @@ -40,6 +41,7 @@ pub mod protocol_cache; pub mod protocol_extractor; pub mod reorg_buffer; pub mod runner; +pub mod supervisor; pub mod token_analysis_cron; mod u256_num; @@ -75,6 +77,28 @@ pub enum ExtractionError { DCICacheError(#[from] DCICacheError), } +impl ExtractionError { + /// Returns a static label for each variant, used as a Prometheus metric label. + pub fn variant_name(&self) -> &'static str { + match self { + Self::Setup(_) => "setup", + Self::DecodeError(_) => "decode", + Self::ProtobufError(_) => "protobuf", + Self::Empty => "empty", + Self::Unknown(_) => "unknown", + Self::Storage(_) => "storage", + Self::SubstreamsError(_) => "substreams", + Self::ServiceError(_) => "service", + Self::MergeError(_) => "merge", + Self::ReorgBufferError(_) => "reorg_buffer", + Self::PartialBlockBufferError(_) => "partial_block_buffer", + Self::TracingError(_) => "tracing", + Self::AccountExtractionError(_) => "account_extraction", + Self::DCICacheError(_) => "dci_cache", + } + } +} + #[derive(Error, Debug)] pub enum RPCError { #[error("RPC setup error: {0}")] @@ -85,6 +109,17 @@ pub enum RPCError { pub type ExtractorMsg = Arc; +/// Commands sent from an extractor's runner or supervisor to `PendingDeltas` over the +/// per-extractor channel. +/// +/// Using a single typed channel (rather than a separate reset side-channel) gives an ordering +/// guarantee: `ExtractorRestarted` always arrives after every `Block` message the runner sent +/// before it stopped. +pub enum DeltaCommand { + Block(ExtractorMsg), + ExtractorRestarted(String), +} + #[automock] #[async_trait] pub trait Extractor: Send + Sync { diff --git a/crates/tycho-indexer/src/extractor/runner.rs b/crates/tycho-indexer/src/extractor/runner.rs index 1d3301c9e8..5c4207e00a 100644 --- a/crates/tycho-indexer/src/extractor/runner.rs +++ b/crates/tycho-indexer/src/extractor/runner.rs @@ -1,17 +1,12 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; -use anyhow::{format_err, Context, Result}; use async_trait::async_trait; -use aws_config::meta::region::RegionProviderChain; -use aws_sdk_s3::Client; use metrics::gauge; -use prost::Message; -use serde::Deserialize; use tokio::{ runtime::Handle, sync::{ mpsc::{self, error::SendError, Receiver, Sender}, - Mutex, + oneshot, Mutex, }, task::JoinHandle, }; @@ -19,38 +14,23 @@ use tokio_stream::StreamExt; use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; use tycho_common::{ models::{ - blockchain::BlockAggregatedChanges, Address, Chain, ExtractorIdentity, FinancialType, - ImplementationType, ProtocolType, + blockchain::{Block, BlockAggregatedChanges}, + ExtractorIdentity, }, traits::AccountExtractor, - Bytes, -}; -use tycho_ethereum::{ - rpc::EthereumRpcClient, - services::{ - account_extractor::EVMAccountExtractor, entrypoint_tracer::tracer::EVMEntrypointService, - token_pre_processor::EthereumTokenPreProcessor, - }, }; +use tycho_ethereum::services::entrypoint_tracer::tracer::EVMEntrypointService; use tycho_storage::postgres::cache::CachedGateway; use crate::{ extractor::{ - chain_state::ChainState, dynamic_contract_indexer::{ - dci::DynamicContractIndexer, - hooks::{hook_dci::UniswapV4HookDCI, hooks_dci_builder::UniswapV4HookDCIBuilder}, + dci::DynamicContractIndexer, hooks::hook_dci::UniswapV4HookDCI, }, - post_processors::POST_PROCESSOR_REGISTRY, - protocol_cache::ProtocolMemoryCache, - protocol_extractor::{ExtractorPgGateway, ProtocolExtractor}, - ExtractionError, Extractor, ExtractorExtension, ExtractorMsg, - }, - pb::sf::substreams::{rpc::v2::BlockScopedData, v1::Package}, - substreams::{ - stream::{BlockResponse, SubstreamsStream}, - SubstreamsEndpoint, + DeltaCommand, ExtractionError, Extractor, ExtractorExtension, ExtractorMsg, }, + pb::sf::substreams::rpc::v2::BlockScopedData, + substreams::stream::{BlockResponse, SubstreamsStream}, }; /// Enum to handle both standard DCI and UniswapV4 Hook DCI @@ -107,14 +87,15 @@ impl ExtractorExtension for DCIPlugin { } } } + pub enum ControlMessage { Stop, Subscribe(Sender), } -/// A trait for a message sender that can be used to subscribe to messages +/// A trait for a message sender that can be used to subscribe to messages. /// -/// Extracted out of the [ExtractorHandle] to allow for easier testing +/// Extracted out of [`ExtractorHandle`] to allow for easier testing. #[async_trait] pub trait MessageSender: Send + Sync { async fn subscribe(&self) -> Result, SendError>; @@ -127,7 +108,7 @@ pub struct ExtractorHandle { } impl ExtractorHandle { - fn new(id: ExtractorIdentity, control_tx: Sender) -> Self { + pub fn new(id: ExtractorIdentity, control_tx: Sender) -> Self { Self { id, control_tx } } @@ -137,7 +118,6 @@ impl ExtractorHandle { #[instrument(skip(self))] pub async fn stop(&self) -> Result<(), ExtractionError> { - // TODO: send a oneshot along here and wait for it self.control_tx .send(ControlMessage::Stop) .await @@ -164,23 +144,26 @@ impl MessageSender for ExtractorHandle { match send_result { Ok(Ok(())) => Ok(rx), Ok(Err(e)) => Err(e), - // TODO: use a better error type that let's us return this as an error. + // TODO: use a better error type that lets us return this as an error. Err(_) => panic!("Subscription timed out!"), } } } // Define the SubscriptionsMap type alias -type SubscriptionsMap = HashMap>; +pub(crate) type SubscriptionsMap = HashMap>; pub struct ExtractorRunner { extractor: Arc, substreams: SubstreamsStream, - subscriptions: Arc>, - next_subscriber_id: u64, - control_rx: Receiver, + /// WS subscribers — managed by the supervisor, shared across restarts. + ws_subscriptions: Arc>, + /// Dedicated channel for PendingDeltasBuffer — survives restarts. + pending_deltas_tx: Option>, + /// Oneshot stop signal from the supervisor. + stop_rx: oneshot::Receiver<()>, /// Handle of the tokio runtime on which the extraction tasks will be run. - /// If 'None' the default runtime will be used. + /// If `None` the default runtime will be used. runtime_handle: Option, partial_blocks: bool, } @@ -189,17 +172,18 @@ impl ExtractorRunner { pub fn new( extractor: Arc, substreams: SubstreamsStream, - subscriptions: Arc>, - control_rx: Receiver, + ws_subscriptions: Arc>, + pending_deltas_tx: Option>, + stop_rx: oneshot::Receiver<()>, runtime_handle: Option, partial_blocks: bool, ) -> Self { ExtractorRunner { extractor, substreams, - subscriptions, - next_subscriber_id: 0, - control_rx, + ws_subscriptions, + pending_deltas_tx, + stop_rx, runtime_handle, partial_blocks, } @@ -230,16 +214,9 @@ impl ExtractorRunner { let should_continue = async { tokio::select! { - Some(ctrl) = self.control_rx.recv() => { - match ctrl { - ControlMessage::Stop => { - warn!("Stop signal received; exiting!"); - return Ok(false); - }, - ControlMessage::Subscribe(sender) => { - self.subscribe(sender).await; - }, - } + _ = &mut self.stop_rx => { + warn!("Stop signal received; exiting!"); + return Ok(false); } val = self.substreams.next().instrument(info_span!("substreams_waiting")) => { match val { @@ -287,7 +264,11 @@ impl ExtractorRunner { })?; for msg in msgs { trace!("Propagating block data message."); - Self::propagate_msg(&self.subscriptions, msg).await + Self::propagate_msg( + &self.ws_subscriptions, + self.pending_deltas_tx.as_ref(), + msg, + ).await; } let duration_ms = start_time.elapsed().as_millis() as f64; @@ -306,11 +287,15 @@ impl ExtractorRunner { } Some(Ok(BlockResponse::Undo(undo_signal))) => { partials_in_block = 0; - info!(block=?&undo_signal.last_valid_block, "Revert requested!"); + info!(block=?&undo_signal.last_valid_block, "Revert requested!"); match self.extractor.handle_revert(undo_signal.clone()).await { Ok(Some(msg)) => { trace!("Propagating block undo message."); - Self::propagate_msg(&self.subscriptions, msg).await + Self::propagate_msg( + &self.ws_subscriptions, + self.pending_deltas_tx.as_ref(), + msg, + ).await; } Ok(None) => { trace!("No message to propagate."); @@ -348,18 +333,6 @@ impl ExtractorRunner { }) } - #[instrument(skip_all)] - async fn subscribe(&mut self, sender: Sender) { - let subscriber_id = self.next_subscriber_id; - self.next_subscriber_id += 1; - tracing::Span::current().record("subscriber_id", subscriber_id); - info!(?subscriber_id, "New subscription"); - self.subscriptions - .lock() - .await - .insert(subscriber_id, sender); - } - /// Processes block-scoped data from the stream: always sends the input to the extractor, /// then optionally adds a partial copy of the message (for full blocks with partials enabled) /// and/or the result of collect_and_process_full_block (for final partials). @@ -419,8 +392,22 @@ impl ExtractorRunner { // TODO: add message tracing_id to the log #[instrument(skip_all, fields(subscriber_count))] - async fn propagate_msg(subscribers: &Arc>, message: ExtractorMsg) { + async fn propagate_msg( + subscribers: &Arc>, + pending_deltas_tx: Option<&Sender>, + message: ExtractorMsg, + ) { trace!(msg = %message, "Propagating message to subscribers."); + + if let Some(tx) = pending_deltas_tx { + if let Err(err) = tx + .send(DeltaCommand::Block(message.clone())) + .await + { + error!(error = %err, "Failed to send to PendingDeltas channel"); + } + } + // TODO: rename variable here instead let arced_message = message; @@ -452,504 +439,30 @@ impl ExtractorRunner { } } -#[derive(Debug, Deserialize, Clone)] -pub struct ProtocolTypeConfig { - name: String, - financial_type: FinancialType, -} - -impl ProtocolTypeConfig { - pub fn new(name: String, financial_type: FinancialType) -> Self { - Self { name, financial_type } - } -} - -#[derive(Debug, Deserialize, Clone, Default)] -pub struct ExtractorConfig { - name: String, - chain: Chain, - implementation_type: ImplementationType, - sync_batch_size: usize, - start_block: i64, - stop_block: Option, - protocol_types: Vec, - spkg: String, - module_name: String, - #[serde(default)] - pub initialized_accounts: Vec, - #[serde(default)] - pub initialized_accounts_block: u64, - #[serde(default)] - pub post_processor: Option, - #[serde(default)] - pub dci_plugin: Option, -} - -impl ExtractorConfig { - #[allow(clippy::too_many_arguments)] - pub fn new( - name: String, - chain: Chain, - implementation_type: ImplementationType, - sync_batch_size: usize, - start_block: i64, - stop_block: Option, - protocol_types: Vec, - spkg: String, - module_name: String, - initialized_accounts: Vec, - initialized_accounts_block: u64, - post_processor: Option, - dci_plugin: Option, - ) -> Self { - Self { - name, - chain, - implementation_type, - sync_batch_size, - start_block, - stop_block, - protocol_types, - spkg, - module_name, - initialized_accounts, - initialized_accounts_block, - post_processor, - dci_plugin, - } - } -} - -#[derive(Debug, Deserialize, Clone)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum DCIType { - /// RPC DCI plugin - uses the RPC endpoint to fetch the account data - #[serde(rename = "rpc")] - RPC, - /// UniswapV4Hooks DCI plugin - wrapper for the RPC DCI plugin that generates hook entrypoints - /// for tracing - UniswapV4Hooks { pool_manager_address: String }, -} - -pub struct ExtractorBuilder { - config: ExtractorConfig, - endpoint_url: String, - s3_bucket: Option, - token: String, - extractor: Option>, - database_insert_batch_size: Option, - final_block_only: bool, - partial_blocks: bool, - /// Handle of the tokio runtime on which the extraction tasks will be run. - /// If 'None' the default runtime will be used. - runtime_handle: Option, -} - -impl ExtractorBuilder { - pub fn new( - config: &ExtractorConfig, - endpoint_url: &str, - s3_bucket: Option<&str>, - substreams_api_token: &str, - ) -> Self { - Self { - config: config.clone(), - endpoint_url: endpoint_url.to_owned(), - s3_bucket: s3_bucket.map(ToString::to_string), - token: substreams_api_token.to_string(), - extractor: None, - database_insert_batch_size: None, - final_block_only: false, - partial_blocks: false, - runtime_handle: None, - } - } - - /// Set the substreams endpoint url - pub fn endpoint_url(mut self, val: &str) -> Self { - val.clone_into(&mut self.endpoint_url); - self - } - - pub fn module_name(mut self, val: &str) -> Self { - val.clone_into(&mut self.config.module_name); - self - } - - pub fn start_block(mut self, val: i64) -> Self { - self.config.start_block = val; - self - } - - pub fn token(mut self, val: &str) -> Self { - val.clone_into(&mut self.token); - self - } - - pub fn only_final_blocks(mut self) -> Self { - self.final_block_only = true; - self - } - - pub fn set_runtime(mut self, runtime: Handle) -> Self { - self.runtime_handle = Some(runtime); - self - } - - pub fn partial_blocks(mut self, val: bool) -> Self { - self.partial_blocks = val; - self - } - - /// Set the global database insert batch size - pub fn database_insert_batch_size(mut self, database_insert_batch_size: usize) -> Self { - self.database_insert_batch_size = Some(database_insert_batch_size); - self - } - - #[cfg(test)] - pub fn set_extractor(mut self, val: Arc) -> Self { - self.extractor = Some(val); - self - } - - async fn ensure_spkg(&self) -> Result<(), ExtractionError> { - // Pull spkg from s3 and copy it at `spkg_path` - if !Path::new(&self.config.spkg).exists() { - download_file_from_s3( - self.s3_bucket.as_ref().ok_or_else(|| { - ExtractionError::Setup(format!( - "Missing spkg and s3 bucket config for {}", - self.config.spkg - )) - })?, - &self.config.spkg, - Path::new(&self.config.spkg), - ) - .await - .map_err(|e| { - ExtractionError::Setup(format!( - "Failed to download {} from s3. {}", - self.config.spkg, e - )) - })?; - } - Ok(()) - } - - /// Creates a rpc DynamicContractIndexer with account extractor and tracer configured - async fn create_rpc_dci( - rpc_client: &EthereumRpcClient, - chain: Chain, - extractor_name: String, - cached_gw: &CachedGateway, - ) -> Result< - DynamicContractIndexer, - ExtractionError, - > { - let account_extractor = EVMAccountExtractor::new(rpc_client, chain); - - // Tracer uses dedicated TRACE_RPC_URL if available, and falls back to the main - // rpc client otherwise. - let tracer_rpc_client = if let Ok(tracer_rpc_url) = std::env::var("TRACE_RPC_URL") { - EthereumRpcClient::new(&tracer_rpc_url).map_err(|err| { - ExtractionError::Setup(format!( - "Failed to create RPC client for {tracer_rpc_url}: {err}" - )) - })? - } else { - rpc_client.clone() - }; - - let max_retries = std::env::var("TRACE_MAX_RETRIES") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(3); - - let retry_delay_ms = std::env::var("TRACE_RETRY_DELAY_MS") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(200); - - let tracer = - EVMEntrypointService::new_with_config(&tracer_rpc_client, max_retries, retry_delay_ms); - - let mut rpc_dci = DynamicContractIndexer::new( - chain, - extractor_name, - cached_gw.clone(), - account_extractor, - tracer, - ); - rpc_dci.initialize().await?; - - Ok(rpc_dci) - } - - pub async fn build( - mut self, - chain_state: ChainState, - cached_gw: &CachedGateway, - token_pre_processor: &EthereumTokenPreProcessor, - protocol_cache: &ProtocolMemoryCache, - rpc_client: &EthereumRpcClient, - ) -> Result { - let protocol_types = self - .config - .protocol_types - .iter() - .map(|pt| { - ( - pt.name.clone(), - ProtocolType::new( - pt.name.clone(), - pt.financial_type.clone(), - None, - self.config.implementation_type.clone(), - ), - ) - }) - .collect(); - - let gw = ExtractorPgGateway::new( - &self.config.name, - self.config.chain, - self.config.sync_batch_size, - cached_gw.clone(), - ); - - let post_processor = self - .config - .post_processor - .as_ref() - .map(|name| { - POST_PROCESSOR_REGISTRY - .get(name) - .cloned() - .ok_or_else(|| { - ExtractionError::Setup(format!( - "Post processor '{name}' not found in registry" - )) - }) - }) - .transpose()?; - - let dci_plugin = if let Some(ref dci_type) = self.config.dci_plugin { - Some(match dci_type { - DCIType::RPC => { - let rpc_dci = Self::create_rpc_dci( - rpc_client, - self.config.chain, - self.config.name.clone(), - cached_gw, - ) - .await?; - - DCIPlugin::Standard(rpc_dci) - } - DCIType::UniswapV4Hooks { pool_manager_address } => { - // random address to deploy our mini router to - let router_address = - Address::from("0x2e234DAe75C793f67A35089C9d99245E1C58470b"); - let pool_manager = Address::from(pool_manager_address.as_str()); - - let base_dci = Self::create_rpc_dci( - rpc_client, - self.config.chain, - self.config.name.clone(), - cached_gw, - ) - .await?; - - let mut hooks_dci = UniswapV4HookDCIBuilder::new( - base_dci, - rpc_client, - router_address, - pool_manager, - cached_gw.clone(), - self.config.chain, - ) - .pause_after_retries(3) - .max_retries(5) - .build()?; - - hooks_dci.initialize().await?; - DCIPlugin::UniswapV4Hooks(Box::new(hooks_dci)) - } - }) - } else { - None - }; - - let database_insert_batch_size = self - .database_insert_batch_size - .unwrap_or_default(); - - self.extractor = Some(Arc::new( - ProtocolExtractor::>::new( - gw, - database_insert_batch_size, - &self.config.name, - self.config.chain, - chain_state, - self.config.name.clone(), - protocol_cache.clone(), - protocol_types, - token_pre_processor.clone(), - post_processor, - dci_plugin, - ) - .await?, - )); - - Ok(self) - } - - /// Converts this builder into a ready-to-run ExtractorRunner and its associated handle. - /// - /// This method completes the extractor setup process by: - /// - Ensuring the Substreams package (.spkg) file is available, downloading from S3 if - /// necessary - /// - Creating a Substreams endpoint connection with authentication - /// - Setting up the data stream with the configured module, block range, and cursor - /// - Initializing control channels for managing the extractor lifecycle - /// - /// # Returns - /// - /// A tuple containing: - /// - `ExtractorRunner`: The main component that processes blockchain data from the stream - /// - `ExtractorHandle`: A control interface for stopping the extractor and subscribing to its - /// output - /// - /// # Errors - /// - /// Returns `ExtractionError` if: - /// - The extractor was not properly configured - /// - The Substreams package file cannot be accessed or downloaded - /// - The Substreams endpoint connection cannot be established - /// - Package decoding fails due to corrupted or invalid data - #[instrument(name = "extractor_runner_build", skip(self), fields(extractor_id))] - pub async fn into_runner(self) -> Result<(ExtractorRunner, ExtractorHandle), ExtractionError> { - let extractor = self - .extractor - .clone() - .expect("Extractor not set"); - let extractor_id = extractor.get_id(); - - tracing::Span::current().record("id", format!("{extractor_id}")); - - self.ensure_spkg().await?; - - let content = std::fs::read(&self.config.spkg) - .context(format_err!("read package from file '{}'", self.config.spkg)) - .map_err(|err| ExtractionError::SubstreamsError(err.to_string()))?; - let spkg = Package::decode(content.as_ref()) - .context("decode command") - .map_err(|err| ExtractionError::SubstreamsError(err.to_string()))?; - let endpoint = Arc::new( - SubstreamsEndpoint::new(&self.endpoint_url, Some(self.token)) - .await - .map_err(|err| ExtractionError::SubstreamsError(err.to_string()))?, - ); - - // Determine the start block for the Substreams stream. - // - // We never pass a cursor on fresh start (process restart). Instead, we - // resume from the block after the last one committed to DB. This is safe - // because we only commit finalized block -1 to the DB. So we know last committed block + 1 - // is finalized. - let last_block = extractor - .get_last_processed_block() - .await; - // `None` means no blocks have been committed for this protocol yet (fresh - // indexing), so fall back to the configured start block. - let start_block = last_block - .as_ref() - .map(|b| { - let next = b - .number - .checked_add(1) - .ok_or_else(|| ExtractionError::Setup("block number overflow".to_string()))?; - i64::try_from(next) - .map_err(|_| ExtractionError::Setup("block number exceeds i64".to_string())) - }) - .transpose()? - .unwrap_or(self.config.start_block); - - if let Some(block) = &last_block { - info!( - start_block, - last_committed_block = block.number, - config_start_block = self.config.start_block, - "Fresh start: resuming from block after last committed" - ); +/// Returns the block number to start streaming from. +/// +/// If a block has already been committed to the DB, resumes from the next one. +/// Otherwise falls back to `config_start_block`. +pub(crate) fn compute_start_block( + last_block: Option<&Block>, + config_start_block: i64, +) -> Result { + match last_block { + None => Ok(config_start_block), + Some(block) => { + let next = block + .number + .checked_add(1) + .ok_or_else(|| ExtractionError::Setup("block number overflow".to_string()))?; + i64::try_from(next) + .map_err(|_| ExtractionError::Setup("block number exceeds i64".to_string())) } - - let stream = SubstreamsStream::new( - endpoint, - None, // No cursor on fresh start; stream tracks cursor for hot reconnections - Some(spkg), - self.config.module_name, - start_block, - self.config.stop_block.unwrap_or(0) as u64, - self.final_block_only, - extractor_id.to_string(), - self.partial_blocks, - ); - - let (ctrl_tx, ctrl_rx) = mpsc::channel(128); - let runner = ExtractorRunner::new( - extractor, - stream, - Arc::new(Mutex::new(HashMap::new())), - ctrl_rx, - self.runtime_handle, - self.partial_blocks, - ); - - Ok((runner, ExtractorHandle::new(extractor_id, ctrl_tx))) } } -async fn download_file_from_s3( - bucket: &str, - key: &str, - download_path: &Path, -) -> anyhow::Result<()> { - info!("Downloading file from s3: {}/{} to {:?}", bucket, key, download_path); - - let region_provider = RegionProviderChain::default_provider().or_else("eu-central-1"); - - let config = aws_config::from_env() - .region(region_provider) - .load() - .await; - - let client = Client::new(&config); - - let resp = client - .get_object() - .bucket(bucket) - .key(key) - .send() - .await?; - - let data = resp.body.collect().await.unwrap(); - - // Ensure the directory exists - if let Some(parent) = download_path.parent() { - std::fs::create_dir_all(parent) - .context(format!("Failed to create directories for {parent:?}"))?; - } - - std::fs::write(download_path, data.into_bytes()).unwrap(); - - Ok(()) -} - #[cfg(test)] mod test { - use tycho_common::models::blockchain::BlockAggregatedChanges; + use tycho_common::models::{blockchain::BlockAggregatedChanges, Chain}; use super::*; use crate::{extractor::MockExtractor, pb::sf::substreams::v1::Clock}; @@ -974,116 +487,12 @@ mod test { } } - #[test] - fn test_extractor_config_without_dci_plugin() { - let yaml = r#" -name: uniswap_v2 -chain: ethereum -implementation_type: Custom -sync_batch_size: 1000 -start_block: 10008300 -protocol_types: - - name: uniswap_v2_pool - financial_type: Swap -spkg: substreams/ethereum-uniswap-v2/ethereum-uniswap-v2-v0.3.0.spkg -module_name: map_pool_events -"#; - - let config: ExtractorConfig = - serde_yaml::from_str(yaml).expect("Failed to deserialize YAML"); - - // Verify basic fields - assert_eq!(config.name, "uniswap_v2"); - - // Verify DCI plugin is None (optional field) - assert!(config.dci_plugin.is_none()); - } - - #[test] - fn test_dci_extractor_config() { - let yaml = r#" -name: uniswap_v3 -chain: ethereum -implementation_type: Custom -sync_batch_size: 1000 -start_block: 12369621 -protocol_types: - - name: uniswap_v3_pool - financial_type: Swap -spkg: substreams/ethereum-uniswap-v3/ethereum-uniswap-v3-logs-only-0.1.1.spkg -module_name: map_protocol_changes -dci_plugin: - type: rpc -"#; - - let config: ExtractorConfig = - serde_yaml::from_str(yaml).expect("Failed to deserialize YAML"); - - // Verify basic fields - assert_eq!(config.name, "uniswap_v3"); - - // Verify DCI plugin is RPC - assert!( - matches!(config.dci_plugin, Some(DCIType::RPC)), - "Expected RPC DCI plugin but got {:?}", - config.dci_plugin - ); - } - - #[test] - fn test_uniswap_v4_hooks_dci_extractor_config() { - let yaml = r#" -name: uniswap_v4 -chain: ethereum -implementation_type: Custom -sync_batch_size: 1000 -start_block: 21688329 -protocol_types: - - name: uniswap_v4_pool - financial_type: Swap -spkg: substreams/ethereum-uniswap-v4/ethereum-uniswap-v4-v0.2.1.spkg -module_name: map_protocol_changes -dci_plugin: - type: uniswap_v4_hooks - router_address: "0x2e234DAe75C793f67A35089C9d99245E1C58470b" - pool_manager_address: "0x000000000004444c5dc75cB358380D2e3dE08A90" -"#; - - let config: ExtractorConfig = - serde_yaml::from_str(yaml).expect("Failed to deserialize YAML"); - - // Verify basic fields - assert_eq!(config.name, "uniswap_v4"); - assert_eq!(config.chain, Chain::Ethereum); - assert_eq!(config.sync_batch_size, 1000); - assert_eq!(config.start_block, 21688329); - - // Verify protocol types - assert_eq!(config.protocol_types.len(), 1); - assert_eq!(config.protocol_types[0].name, "uniswap_v4_pool"); - - // Verify DCI plugin configuration - let dci_plugin = config - .dci_plugin - .expect("Expected dci_plugin to be set"); - match dci_plugin { - DCIType::UniswapV4Hooks { pool_manager_address } => { - assert_eq!(pool_manager_address, "0x000000000004444c5dc75cB358380D2e3dE08A90"); - } - _ => { - panic!("Expected UniswapV4Hooks DCI plugin but got RPC"); - } - } - } - fn one_msg() -> ExtractorMsg { Arc::new(BlockAggregatedChanges::default()) } #[tokio::test] async fn test_process_block_data_partial_blocks_disabled() { - // When partial_blocks is false: handle_tick_scoped_data is called with data as-is; - // collect_and_process_full_block is not called. One message from handle_tick_scoped_data. let data = make_block_scoped_data(false, None, None); let mut mock = MockExtractor::new(); mock.expect_handle_tick_scoped_data() @@ -1102,8 +511,6 @@ dci_plugin: #[tokio::test] async fn test_process_block_data_final_partial() { - // When partial_blocks is true and is_last_partial == true: handle_tick_scoped_data with - // data, then collect_and_process_full_block. Two messages (one from each). let data = make_block_scoped_data(true, Some(2), Some(true)); let mut mock = MockExtractor::new(); mock.expect_handle_tick_scoped_data() @@ -1128,8 +535,6 @@ dci_plugin: #[tokio::test] async fn test_process_block_data_full_block() { - // When partial_blocks is true and message is full block: handle_tick_scoped_data - // receives data as-is; runner adds a partial copy of the returned message. let data = make_block_scoped_data(false, None, None); let mut mock = MockExtractor::new(); mock.expect_handle_tick_scoped_data() @@ -1150,8 +555,6 @@ dci_plugin: #[tokio::test] async fn test_process_block_data_middle_partial() { - // When partial_blocks is true and message is a non-final partial: only - // handle_tick_scoped_data; collect_and_process_full_block is not called. One message. let data = make_block_scoped_data(true, Some(1), Some(false)); let mut mock = MockExtractor::new(); mock.expect_handle_tick_scoped_data() @@ -1169,155 +572,24 @@ dci_plugin: assert_eq!(msgs.len(), 1); } - #[tokio::test] - async fn test_extractor_runner_builder_fresh_start_no_db_state() { - // No DB state: get_last_processed_block returns None, so the stream - // starts from the config start_block with no cursor. - let mut mock_extractor = MockExtractor::new(); - mock_extractor - .expect_get_last_processed_block() - .returning(|| None); - mock_extractor - .expect_get_id() - .returning(ExtractorIdentity::default); - - // Build the ExtractorRunnerBuilder - let extractor = Arc::new(mock_extractor); - let builder = ExtractorBuilder::new( - &ExtractorConfig { - name: "test_module".to_owned(), - implementation_type: ImplementationType::Vm, - protocol_types: vec![ProtocolTypeConfig { - name: "test_module_pool".to_owned(), - financial_type: FinancialType::Swap, - }], - spkg: "./test/spkg/substreams-ethereum-quickstart-v1.0.0.spkg".to_owned(), - module_name: "test_module".to_owned(), - ..Default::default() - }, - "https://mainnet.eth.streamingfast.io", - None, - "test_token", - ) - .token("test_token") - .set_extractor(extractor); - - // Run the builder - let (runner, _handle) = builder.into_runner().await.unwrap(); - - // Wait for the handle to complete - match runner.run().await { - Ok(_) => { - info!("ExtractorRunnerBuilder completed successfully"); - } - Err(err) => { - error!(error = %err, "ExtractorRunnerBuilder failed"); - panic!("ExtractorRunnerBuilder failed"); - } - } - } - - #[tokio::test] - async fn test_start_block_no_db_state() { - use crate::substreams::mock::start_mock_substreams; - - let (captured, addr) = start_mock_substreams().await; - - let mut mock_extractor = MockExtractor::new(); - mock_extractor - .expect_get_last_processed_block() - .returning(|| None); - mock_extractor - .expect_get_id() - .returning(ExtractorIdentity::default); - - let extractor = Arc::new(mock_extractor); - let builder = ExtractorBuilder::new( - &ExtractorConfig { - name: "test_module".to_owned(), - implementation_type: ImplementationType::Vm, - protocol_types: vec![ProtocolTypeConfig { - name: "test_module_pool".to_owned(), - financial_type: FinancialType::Swap, - }], - spkg: "./test/spkg/substreams-ethereum-quickstart-v1.0.0.spkg".to_owned(), - module_name: "test_module".to_owned(), - start_block: 42, - ..Default::default() - }, - &format!("http://{addr}"), - None, - "test_token", - ) - .token("test_token") - .set_extractor(extractor); - - let (runner, _handle) = builder.into_runner().await.unwrap(); - let handle = runner.run(); - handle.await.unwrap().unwrap(); - - let requests = captured.lock().unwrap(); - assert_eq!(requests.len(), 1, "expected exactly one gRPC request"); - assert_eq!(requests[0].start_block_num, 42); - assert!(requests[0].start_cursor.is_empty(), "fresh start should have no cursor"); + #[test] + fn test_compute_start_block_no_db_state() { + // No committed block: fall back to the config start block. + assert_eq!(compute_start_block(None, 42), Ok(42)); } - #[tokio::test] - async fn test_start_block_with_db_state() { + #[test] + fn test_compute_start_block_with_db_state() { use chrono::NaiveDateTime; - use tycho_common::models::blockchain::Block; - - use crate::substreams::mock::start_mock_substreams; - - let (captured, addr) = start_mock_substreams().await; - - let mut mock_extractor = MockExtractor::new(); - mock_extractor - .expect_get_last_processed_block() - .returning(|| { - Some(Block::new( - 1000, - Chain::Ethereum, - vec![0x01].into(), - vec![0x00].into(), - NaiveDateTime::default(), - )) - }); - mock_extractor - .expect_get_id() - .returning(ExtractorIdentity::default); - - let extractor = Arc::new(mock_extractor); - let builder = ExtractorBuilder::new( - &ExtractorConfig { - name: "test_module".to_owned(), - implementation_type: ImplementationType::Vm, - protocol_types: vec![ProtocolTypeConfig { - name: "test_module_pool".to_owned(), - financial_type: FinancialType::Swap, - }], - spkg: "./test/spkg/substreams-ethereum-quickstart-v1.0.0.spkg".to_owned(), - module_name: "test_module".to_owned(), - start_block: 500, - ..Default::default() - }, - &format!("http://{addr}"), - None, - "test_token", - ) - .token("test_token") - .set_extractor(extractor); - - let (runner, _handle) = builder.into_runner().await.unwrap(); - let handle = runner.run(); - handle.await.unwrap().unwrap(); - let requests = captured.lock().unwrap(); - assert_eq!(requests.len(), 1, "expected exactly one gRPC request"); - assert_eq!( - requests[0].start_block_num, 1001, - "should use last_committed + 1, not config's start_block" + let block = Block::new( + 1000, + Chain::Ethereum, + vec![0x01].into(), + vec![0x00].into(), + NaiveDateTime::default(), ); - assert!(requests[0].start_cursor.is_empty(), "fresh start should have no cursor"); + // Should resume from last_committed + 1, ignoring config start block. + assert_eq!(compute_start_block(Some(&block), 500), Ok(1001)); } } diff --git a/crates/tycho-indexer/src/extractor/supervisor.rs b/crates/tycho-indexer/src/extractor/supervisor.rs new file mode 100644 index 0000000000..92faa6f43c --- /dev/null +++ b/crates/tycho-indexer/src/extractor/supervisor.rs @@ -0,0 +1,240 @@ +use std::sync::Arc; + +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, Mutex, +}; +use tracing::{error, info, warn}; +use tycho_common::models::ExtractorIdentity; + +pub use crate::extractor::factory::{DCIType, ExtractorConfig, ProtocolTypeConfig}; +use crate::extractor::{ + factory::ExtractorFactory, + runner::{ControlMessage, ExtractorHandle, SubscriptionsMap}, + DeltaCommand, ExtractionError, +}; + +/// Long-lived per-extractor task that owns the factory and manages restart lifecycle. +/// +/// The supervisor: +/// - Builds an extractor and runner via its factory. +/// - Runs the runner and waits for it to exit. +/// - On failure: clears WS subscriptions, sends `DeltaCommand::ExtractorRestarted` to +/// `PendingDeltas` applies exponential backoff, then rebuilds from scratch. +/// - Forwards `ControlMessage::Subscribe` from the `ExtractorHandle` to the WS subscription map. +/// - Forwards `ControlMessage::Stop` by signalling the runner's stop channel. +pub struct ExtractorSupervisor { + factory: ExtractorFactory, + ctrl_tx: Sender, + control_rx: Receiver, + ws_subscriptions: Arc>, + pending_deltas_tx: Sender, + id: ExtractorIdentity, + max_restarts: Option, + next_subscriber_id: u64, +} + +impl ExtractorSupervisor { + pub fn new( + factory: ExtractorFactory, + ws_subscriptions: Arc>, + pending_deltas_tx: Sender, + ) -> Self { + let id = factory.extractor_id(); + let max_restarts: Option = factory.config.max_restarts; + let (ctrl_tx, control_rx) = mpsc::channel(128); + Self { + factory, + ctrl_tx, + control_rx, + ws_subscriptions, + pending_deltas_tx, + id, + max_restarts, + next_subscriber_id: 0, + } + } + + /// Returns an [`ExtractorHandle`] that can be used to subscribe or stop this extractor. + pub fn handle(&self) -> ExtractorHandle { + ExtractorHandle::new(self.id.clone(), self.ctrl_tx.clone()) + } + + /// Runs the supervision loop. Returns when the extractor has been stopped via + /// `ControlMessage::Stop` or has exhausted all restart attempts. + pub async fn run(mut self) -> Result<(), ExtractionError> { + let mut restart_count: u32 = 0; + + loop { + let (stop_tx, stop_rx) = oneshot::channel(); + let runner = match self + .factory + .build_runner( + self.ws_subscriptions.clone(), + Some(self.pending_deltas_tx.clone()), + stop_rx, + ) + .await + { + Ok(r) => r, + Err(err) => { + error!(extractor = %self.id, error = %err, "Failed to build extractor"); + metrics::counter!( + "extractor_restart_failed", + "extractor" => self.id.name.clone() + ) + .increment(1); + return Err(err); + } + }; + + let mut run_handle = runner.run(); + + // Drive the runner, handling control messages in parallel. + let runner_result = loop { + tokio::select! { + result = &mut run_handle => { + break result; + } + Some(ctrl) = self.control_rx.recv() => { + match ctrl { + ControlMessage::Stop => { + info!(extractor = %self.id, "Stop signal received by supervisor"); + let _ = stop_tx.send(()); + let result = run_handle.await; + return match result { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(e), + Err(join_err) => Err(ExtractionError::Unknown( + format!("Runner panicked: {join_err}") + )), + }; + } + ControlMessage::Subscribe(sender) => { + let subscriber_id = self.next_subscriber_id; + self.next_subscriber_id += 1; + info!( + extractor = %self.id, + subscriber_id, + "New WS subscription via supervisor" + ); + self.ws_subscriptions + .lock() + .await + .insert(subscriber_id, sender); + } + } + } + } + }; + + // Runner exited — classify the result. + match runner_result { + Ok(Ok(())) => { + info!(extractor = %self.id, "Extractor exited gracefully"); + metrics::counter!( + "extractor_stopped", + "extractor" => self.id.name.clone(), + "reason" => "graceful" + ) + .increment(1); + return Ok(()); + } + Ok(Err(ref err)) => { + error!( + extractor = %self.id, + error = %err, + restart_count, + "Extractor failed" + ); + metrics::counter!( + "extractor_stopped", + "extractor" => self.id.name.clone(), + "reason" => err.variant_name() + ) + .increment(1); + } + Err(ref join_err) => { + error!( + extractor = %self.id, + error = %join_err, + "Extractor task panicked" + ); + metrics::counter!( + "extractor_stopped", + "extractor" => self.id.name.clone(), + "reason" => "panic" + ) + .increment(1); + } + } + + if self + .max_restarts + .is_some_and(|max| restart_count >= max) + { + error!( + extractor = %self.id, + max_restarts = ?self.max_restarts, + "Extractor permanently stopped — restart limit reached" + ); + metrics::counter!( + "extractor_permanently_stopped", + "extractor" => self.id.name.clone() + ) + .increment(1); + return runner_result + .map_err(|e| ExtractionError::Unknown(format!("Runner panicked: {e}")))?; + } + + // Clear WS subscriptions — clients must reconnect after a restart. + // TODO: can we keep the ws connections alive and handle this on the client side? + { + let mut subs = self.ws_subscriptions.lock().await; + let count = subs.len(); + subs.clear(); + if count > 0 { + info!( + extractor = %self.id, + dropped_subscribers = count, + "Cleared WS subscriptions before restart" + ); + } + } + + // Signal PendingDeltas to reset its buffer for this extractor. + // Sent on the same per-extractor channel as block messages, so it is guaranteed to + // arrive after all blocks the runner emitted before failing. + if let Err(err) = self + .pending_deltas_tx + .send(DeltaCommand::ExtractorRestarted(self.id.name.clone())) + .await + { + warn!( + extractor = %self.id, + error = %err, + "Failed to send ExtractorRestarted to PendingDeltas" + ); + } + + // Exponential backoff: 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 14400s + // (capped at 4 hours). + let exp = restart_count.min(7); // 120 * 2^7 = 14400s = 4 hours, cap here to avoid overflow. + let backoff = std::time::Duration::from_secs(120 * 2u64.pow(exp)); + warn!( + extractor = %self.id, + ?backoff, + restart_count, + "Waiting for backoff before restarting extractor" + ); + tokio::time::sleep(backoff).await; + warn!( + extractor = %self.id, + ?backoff, + restart_count, + "Restarting extractor after backoff" + ); + restart_count += 1; + } + } +} diff --git a/crates/tycho-indexer/src/main.rs b/crates/tycho-indexer/src/main.rs index 4c8e6724fc..b689695463 100644 --- a/crates/tycho-indexer/src/main.rs +++ b/crates/tycho-indexer/src/main.rs @@ -53,12 +53,9 @@ use tycho_ethereum::{ use tycho_indexer::{ cli::{AnalyzeTokenArgs, Cli, Command, GlobalArgs, IndexArgs, RunSpkgArgs, SubstreamsArgs}, extractor::{ - chain_state::ChainState, - protocol_cache::ProtocolMemoryCache, - runner::{ - DCIType, ExtractorBuilder, ExtractorConfig, ExtractorHandle, ExtractorRunner, - ProtocolTypeConfig, - }, + factory::ExtractorFactory, + runner::ExtractorHandle, + supervisor::{DCIType, ExtractorConfig, ExtractorSupervisor, ProtocolTypeConfig}, token_analysis_cron::analyze_tokens, ExtractionError, }, @@ -271,12 +268,34 @@ fn run_indexer(global_args: GlobalArgs, index_args: IndexArgs) -> Result<(), Ext let extractor_ctrl_tx = control_tx.clone(); extraction_runtime.spawn(async move { - let (res, _, _) = select_all(extraction_tasks).await; + // Wait for ALL supervisors to complete (each manages its own restarts). + // Only signal the main thread once all extractors are permanently stopped. + let results = futures03::future::join_all(extraction_tasks).await; + let mut any_error = false; + for result in &results { + match result { + Ok(Ok(())) => {} + Ok(Err(err)) => { + error!(error = %err, "Supervisor exited with error"); + any_error = true; + } + Err(join_err) => { + error!(error = %join_err, "Supervisor task panicked"); + any_error = true; + } + } + } + + let res: Result<(), ExtractionError> = if any_error { + Err(ExtractionError::Unknown( + "All extractors have stopped — at least one with errors".into(), + )) + } else { + Ok(()) + }; if extractor_ctrl_tx.send(res).is_err() { - error!( - "Fatal execution task exited and failed trying to communicate with main thread. Exiting the process..." - ); + error!("Fatal: failed to communicate with main thread. Exiting the process..."); process::exit(1); } }); @@ -285,7 +304,7 @@ fn run_indexer(global_args: GlobalArgs, index_args: IndexArgs) -> Result<(), Ext main_runtime.spawn(async move { let (res, _, _) = select_all(other_tasks).await; - if services_ctrl_tx.send(res).is_err() { + if services_ctrl_tx.send(res.unwrap_or_else(|join_err| Err(ExtractionError::Unknown(format!("Task panicked: {join_err}"))))).is_err() { error!("Fatal service task exited and failed trying to communicate with main thread. Exiting the process..."); process::exit(1); } @@ -295,7 +314,13 @@ fn run_indexer(global_args: GlobalArgs, index_args: IndexArgs) -> Result<(), Ext .recv() .expect("Control channel unexpectedly closed"); - res.expect("A thread panicked. Shutting down Tycho.") + match res { + Ok(()) => Ok(()), + Err(err) => { + error!(error = %err, "Fatal error, shutting down"); + Err(err) + } + } } #[tokio::main] @@ -334,10 +359,11 @@ async fn run_spkg(global_args: GlobalArgs, run_args: RunSpkgArgs) -> Result<(), run_args.initialization_block, None, dci_plugin, + None, ), )])); - let (extraction_tasks, mut other_tasks) = create_indexing_tasks( + let (supervisor_tasks, mut other_tasks) = create_indexing_tasks( &global_args, &run_args.substreams_args, &[Chain::from_str(&run_args.chain).unwrap()], @@ -348,11 +374,12 @@ async fn run_spkg(global_args: GlobalArgs, run_args: RunSpkgArgs) -> Result<(), ) .await?; - let mut all_tasks = extraction_tasks; + let mut all_tasks: Vec>> = supervisor_tasks; all_tasks.append(&mut other_tasks); let (res, _, _) = select_all(all_tasks).await; - res.expect("Extractor- nor ServiceTasks should panic!") + res.expect("Tasks should not panic")?; + Ok(()) } #[tokio::main] @@ -399,13 +426,6 @@ async fn create_indexing_tasks( ) -> Result<(ExtractionTasks, ServerTasks), ExtractionError> { let rpc_client = global_args.rpc.build_client()?; - let block_number = rpc_client - .get_block_number() - .await - .expect("Error getting block number"); - - let chain_state = ChainState::new(chrono::Local::now().naive_utc(), block_number, 12); //TODO: remove hardcoded blocktime - let protocol_systems: Vec = extractors_config .extractors .keys() @@ -433,13 +453,21 @@ async fn create_indexing_tasks( settlement_contract, ); - let (runners, extractor_handles): (Vec<_>, Vec<_>) = - // TODO: accept substreams configuration from cli. - build_all_extractors(&extractors_config, chain_state, chains, &global_args.endpoint_url, global_args.s3_bucket.as_deref(), &substreams_args.substreams_api_token, &cached_gw, global_args.database_insert_batch_size, &token_processor, &rpc_client, extraction_runtime, substreams_args.enable_partial_blocks) - .await - .map_err(|e| ExtractionError::Setup(format!("Failed to create extractors: {e}")))? - .into_iter() - .unzip(); + let (supervisors, extractor_handles, pending_deltas_rxs) = build_all_extractors( + &extractors_config, + chains, + &global_args.endpoint_url, + global_args.s3_bucket.as_deref(), + &substreams_args.substreams_api_token, + &cached_gw, + global_args.database_insert_batch_size, + &token_processor, + &rpc_client, + extraction_runtime, + substreams_args.enable_partial_blocks, + ) + .await + .map_err(|e| ExtractionError::Setup(format!("Failed to create extractors: {e}")))?; let server_url = format!("http://{}:{}", global_args.server_ip, global_args.server_port); let api_key = env::var("AUTH_API_KEY").map_err(|_| { @@ -456,24 +484,28 @@ async fn create_indexing_tasks( .dci_protocols(dci_protocols) .protocol_systems(protocol_systems) .register_extractors(extractor_handles.clone()) + .pending_deltas(pending_deltas_rxs) .run()?; info!(server_url, "Http and Ws server started"); let shutdown_task = tokio::spawn(shutdown_handler(server_handle, extractor_handles, Some(gw_writer_handle))); - let extractor_tasks = runners + let runtime = extraction_runtime + .cloned() + .unwrap_or_else(|| tokio::runtime::Handle::current()); + + let supervisor_tasks = supervisors .into_iter() - .map(|runner| runner.run()) + .map(|supervisor| runtime.spawn(supervisor.run())) .collect::>(); - Ok((extractor_tasks, vec![server_task, shutdown_task])) + Ok((supervisor_tasks, vec![server_task, shutdown_task])) } #[allow(clippy::too_many_arguments)] async fn build_all_extractors( config: &ExtractorConfigs, - chain_state: ChainState, chains: &[Chain], endpoint_url: &str, s3_bucket: Option<&str>, @@ -484,21 +516,22 @@ async fn build_all_extractors( rpc_client: &EthereumRpcClient, runtime: Option<&tokio::runtime::Handle>, partial_blocks: bool, -) -> Result, ExtractionError> { +) -> Result< + ( + Vec, + Vec, + Vec>, + ), + ExtractionError, +> { + let mut supervisors = Vec::new(); let mut extractor_handles = Vec::new(); + let mut pending_deltas_rxs = Vec::new(); let chain = *chains .first() .expect("No chain provided"); - info!("Building protocol cache"); - let protocol_cache = ProtocolMemoryCache::new( - chain, - chrono::Duration::seconds(900), - Arc::new(cached_gw.clone()), - ); - protocol_cache.populate().await?; - for extractor_config in config.extractors.values() { initialize_accounts( extractor_config @@ -511,24 +544,40 @@ async fn build_all_extractors( ) .await; - let runtime = runtime + let runtime_handle = runtime .cloned() .unwrap_or_else(|| tokio::runtime::Handle::current()); - let (runner, handle) = - ExtractorBuilder::new(extractor_config, endpoint_url, s3_bucket, substreams_api_token) - .database_insert_batch_size(database_insert_batch_size) - .partial_blocks(partial_blocks) - .build(chain_state, cached_gw, token_pre_processor, &protocol_cache, rpc_client) - .await? - .set_runtime(runtime) - .into_runner() - .await?; - - extractor_handles.push((runner, handle)); + let factory = ExtractorFactory::create( + extractor_config.clone(), + endpoint_url.to_owned(), + s3_bucket.map(ToString::to_string), + substreams_api_token.to_owned(), + chain, + cached_gw.clone(), + token_pre_processor.clone(), + rpc_client.clone(), + database_insert_batch_size, + partial_blocks, + Some(runtime_handle), + ) + .await?; + + // Create dedicated PendingDeltas channel for this extractor + let (pd_tx, pd_rx) = tokio::sync::mpsc::channel(256); + + // WS subscription map — shared between supervisor and runner + let ws_subscriptions = Arc::new(tokio::sync::Mutex::new(HashMap::new())); + + let supervisor = ExtractorSupervisor::new(factory, ws_subscriptions, pd_tx); + let handle = supervisor.handle(); + + supervisors.push(supervisor); + extractor_handles.push(handle); + pending_deltas_rxs.push(pd_rx); } - Ok(extractor_handles) + Ok((supervisors, extractor_handles, pending_deltas_rxs)) } async fn with_transaction(gw: &CachedGateway, block: &Block, f: F) -> R @@ -662,7 +711,9 @@ async fn shutdown_handler( } for e in extractors.iter() { - e.stop().await.unwrap(); + if let Err(err) = e.stop().await { + warn!(error = %err, "Failed to stop extractor during shutdown"); + } } server_handle.stop(true).await; if let Some(handle) = db_write_executor_handle { diff --git a/crates/tycho-indexer/src/services/deltas_buffer.rs b/crates/tycho-indexer/src/services/deltas_buffer.rs index 13a27de246..7de3ca63d7 100644 --- a/crates/tycho-indexer/src/services/deltas_buffer.rs +++ b/crates/tycho-indexer/src/services/deltas_buffer.rs @@ -9,7 +9,7 @@ use futures03::{stream, StreamExt}; use metrics::gauge; use thiserror::Error; use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, error, instrument, trace, Level}; +use tracing::{debug, error, info, instrument, trace, warn, Level}; use tycho_common::{ models::{ blockchain::BlockAggregatedChanges, @@ -23,7 +23,7 @@ use tycho_common::{ use crate::extractor::{ reorg_buffer::{BlockNumberOrTimestamp, CommitStatus, ReorgBuffer}, - runner::MessageSender, + DeltaCommand, }; /// The `PendingDeltas` struct manages access to the reorg buffers maintained by each extractor. @@ -249,16 +249,19 @@ impl PendingDeltas { ))) } + /// Runs the PendingDeltas message loop. + /// + /// Accepts one receiver per extractor. Each receiver carries [`DeltaCommand`]s: block + /// messages and restart notifications on the same channel. pub async fn run( self, - extractors: impl IntoIterator>, + receivers: Vec>, start_tx: SyncSender<()>, ) -> anyhow::Result<()> { - let mut rxs = Vec::new(); - for extractor in extractors.into_iter() { - let res = ReceiverStream::new(extractor.subscribe().await?); - rxs.push(res); - } + let rxs: Vec> = receivers + .into_iter() + .map(ReceiverStream::new) + .collect(); // Send the start signal to the startup task start_tx @@ -275,18 +278,50 @@ impl PendingDeltas { } }); - let all_messages = stream::select_all(rxs); + let mut all_messages = stream::select_all(rxs); - // What happens if an extractor restarts - it might just end here and be dropped? - // Ideally the Runner should never restart. - all_messages - .for_each(|message| async { - // Skip partial messages - only full-block updates go to the reorg buffer. - if message.partial_block_index.is_none() { - self.insert(message).unwrap(); + loop { + match all_messages.next().await { + Some(DeltaCommand::Block(message)) => { + // Skip partial messages - only full-block updates go to the reorg buffer. + if message.partial_block_index.is_none() { + self.insert(message).map_err(|e| { + error!(error = %e, "Failed to insert into PendingDeltas buffer"); + e + })?; + } } - }) - .await; + Some(DeltaCommand::ExtractorRestarted(extractor_name)) => { + debug!(extractor = %extractor_name, "Resetting PendingDeltas buffer for extractor"); + if let Some(buffer) = self.buffers.get(&extractor_name) { + match buffer.lock() { + Ok(mut guard) => { + *guard = ReorgBuffer::new(); + debug!(extractor = %extractor_name, "PendingDeltas buffer reset"); + } + Err(err) => { + error!( + extractor = %extractor_name, + error = %err, + "Failed to lock buffer for reset" + ); + return Err(PendingDeltasError::LockError( + extractor_name, + err.to_string(), + ) + .into()); + } + } + } else { + warn!(extractor = %extractor_name, "No buffer found for reset — extractor unknown"); + } + } + None => { + info!("All PendingDeltas streams ended"); + break; + } + } + } Ok(()) } diff --git a/crates/tycho-indexer/src/services/mod.rs b/crates/tycho-indexer/src/services/mod.rs index 29bdc05049..672323325b 100644 --- a/crates/tycho-indexer/src/services/mod.rs +++ b/crates/tycho-indexer/src/services/mod.rs @@ -54,6 +54,8 @@ pub struct ServicesBuilder { dci_protocols: Vec, /// Active protocol systems derived from extractor config. protocol_systems: Vec, + /// Pre-built receivers for PendingDeltas (one per extractor). + pending_deltas_rxs: Vec>, } impl ServicesBuilder @@ -72,6 +74,7 @@ where plans_config: PlansConfig::default(), dci_protocols: Vec::new(), protocol_systems: Vec::new(), + pending_deltas_rxs: Vec::new(), } } @@ -120,6 +123,15 @@ where self } + /// Sets the pre-built receivers for PendingDeltas (one per extractor). + pub fn pending_deltas( + mut self, + rxs: Vec>, + ) -> Self { + self.pending_deltas_rxs = rxs; + self + } + /// Starts the Tycho server. Returns a tuple containing a handle for the server and a Tokio /// handle for the tasks. If no extractor tasks are registered, it starts the server without /// running the delta tasks. @@ -141,7 +153,7 @@ where /// Runs the server with both RPC and WebSocket services, and spawns tasks for handling /// pending delta processing. fn start_server_with_deltas( - self, + mut self, openapi: utoipa::openapi::OpenApi, ) -> Result<(ServerHandle, JoinHandle>), ExtractionError> { let pending_deltas = PendingDeltas::new( @@ -149,15 +161,14 @@ where .keys() .map(|e_id| e_id.name.as_str()), ); - let extractor_handles_clone = self - .extractor_handles - .clone() - .into_values(); + + let pending_deltas_rxs = std::mem::take(&mut self.pending_deltas_rxs); + let pending_deltas_clone = pending_deltas.clone(); let (start_tx, start_rx) = mpsc::sync_channel::<()>(1); let deltas_task = tokio::spawn(async move { pending_deltas_clone - .run(extractor_handles_clone, start_tx) + .run(pending_deltas_rxs, start_tx) .await .map_err(|err| ExtractionError::Unknown(err.to_string())) }); diff --git a/crates/tycho-indexer/src/services/ws.rs b/crates/tycho-indexer/src/services/ws.rs index 8c0dcd594a..d77d0ecb72 100644 --- a/crates/tycho-indexer/src/services/ws.rs +++ b/crates/tycho-indexer/src/services/ws.rs @@ -257,8 +257,10 @@ impl WsActor { item.drop_state().into() }; - yield Ok((subscription_id, result)); + yield ExtractorEvent::Message(subscription_id, Box::new(result)); } + // Channel closed: extractor restarted or stopped. + yield ExtractorEvent::ChannelClosed(subscription_id); }; Some((subscription_id, stream, extractor_id_for_future.clone())) @@ -418,19 +420,24 @@ impl Actor for WsActor { } } -/// Handle incoming messages from the extractor and forward them to the WS connection -impl StreamHandler> for WsActor { +enum ExtractorEvent { + Message(Uuid, Box), + ChannelClosed(Uuid), +} + +/// Handle incoming messages from the extractor and forward them to the WS connection. +/// +/// `ExtractorEvent::ChannelClosed` is emitted when the extractor's channel closes (e.g. after a +/// restart). It lets the actor send `SubscriptionEnded` to the client and clean up the +/// subscription without dropping the entire WebSocket connection. +impl StreamHandler for WsActor { #[instrument(skip_all, fields(WsActor.id = %self.id))] - fn handle( - &mut self, - msg: Result<(Uuid, BlockAggregatedChanges), ws::ProtocolError>, - ctx: &mut Self::Context, - ) { - trace!("Message received from extractor"); + fn handle(&mut self, msg: ExtractorEvent, ctx: &mut Self::Context) { match msg { - Ok((subscription_id, deltas)) => { + ExtractorEvent::Message(subscription_id, deltas) => { trace!("Forwarding message to client"); - let msg = WebSocketMessage::BlockAggregatedChanges { deltas, subscription_id }; + let msg = + WebSocketMessage::BlockAggregatedChanges { deltas: *deltas, subscription_id }; let json_str = serde_json::to_string(&msg).unwrap(); // Check if compression is enabled for this subscription @@ -472,11 +479,45 @@ impl StreamHandler> fo ctx.text(json_str); } } - Err(e) => { - error!(error = %e, "Failed to receive message from extractor"); + ExtractorEvent::ChannelClosed(subscription_id) => { + // Extractor channel closed — clean up this subscription and notify the client. + // Other subscriptions on this connection are unaffected. + warn!( + %subscription_id, + "Extractor channel closed; notifying client and ending subscription" + ); + if let Some(sub) = self + .subscriptions + .remove(&subscription_id) + { + self.compression_enabled + .remove(&subscription_id); + gauge!( + "websocket_subscriptions_active", + "user_identity" => self.user_identity.as_deref().unwrap_or("unknown").to_owned(), + "chain" => sub.chain, + "extractor" => sub.extractor, + "partial_blocks" => sub.partial_blocks.to_string(), + ) + .decrement(1); + } + let message = Response::SubscriptionEnded { subscription_id }; + ctx.text(serde_json::to_string(&message).unwrap()); } } } + + fn finished(&mut self, ctx: &mut ::Context) { + // If no subscriptions remain, close the connection. If other subscriptions are still + // active, keep the connection open; they were unaffected by this stream ending. + if self.subscriptions.is_empty() { + ctx.close(Some(ws::CloseReason { + code: ws::CloseCode::Normal, + description: Some("All subscriptions ended".into()), + })); + ctx.stop(); + } + } } /// Handle incoming messages from the WS connection @@ -1624,4 +1665,176 @@ mod tests { .await .expect("Failed to send close message"); } + + /// A sender that immediately closes the channel after subscribe, simulating an extractor + /// that fails or restarts right after the subscription is established. + pub struct ClosingMessageSender; + + #[async_trait] + impl MessageSender for ClosingMessageSender { + async fn subscribe(&self) -> Result, SendError> { + let (_tx, rx) = mpsc::channel::(1); + // _tx is dropped here, closing the channel immediately. + Ok(rx) + } + } + + /// Verifies that when an extractor's channel closes the client receives `SubscriptionEnded` + /// for that subscription and the WebSocket connection stays open. + #[actix_rt::test] + async fn test_subscription_ended_on_channel_close() -> Result<(), String> { + tracing_subscriber::fmt() + .with_test_writer() + .try_init() + .unwrap_or_else(|_| debug!("Subscriber already initialized")); + + let closing_id = ExtractorIdentity::new(Chain::Ethereum, "closing_extractor"); + let live_id = ExtractorIdentity::new(Chain::Ethereum, "live_extractor"); + + let mut subscribers_map = HashMap::new(); + subscribers_map.insert( + closing_id.clone(), + Arc::new(ClosingMessageSender) as Arc, + ); + subscribers_map.insert( + live_id.clone(), + Arc::new(MyMessageSender::new(live_id.clone())) as Arc, + ); + + let app_state = web::Data::new(WsData::new(subscribers_map)); + let server = start_with( + TestServerConfig::default().client_request_timeout(Duration::from_secs(5)), + move || { + App::new() + .wrap(RequestTracing::new()) + .app_data(app_state.clone()) + .service(web::resource("/ws/").route(web::get().to(WsActor::ws_index))) + }, + ); + + let url = server + .url("/ws/") + .to_string() + .replacen("http://", "ws://", 1); + let (mut connection, _) = tokio_tungstenite::connect_async(url) + .await + .expect("Failed to connect"); + + // Subscribe to the live extractor first so we can verify the connection survives. + connection + .send(Message::Text( + serde_json::to_string(&Command::Subscribe { + extractor_id: live_id.clone().into(), + include_state: false, + compression: false, + partial_blocks: false, + }) + .unwrap(), + )) + .await + .expect("Failed to send subscribe"); + wait_for_new_subscription(&mut connection) + .await + .expect("Failed to get live subscription"); + + // Subscribe to the closing extractor — its channel closes immediately. + connection + .send(Message::Text( + serde_json::to_string(&Command::Subscribe { + extractor_id: closing_id.clone().into(), + include_state: false, + compression: false, + partial_blocks: false, + }) + .unwrap(), + )) + .await + .expect("Failed to send subscribe"); + wait_for_new_subscription(&mut connection) + .await + .expect("Failed to get closing subscription"); + + // The channel was already closed, so we should receive SubscriptionEnded. + wait_for_subscription_ended(&mut connection) + .await + .expect("Expected SubscriptionEnded when extractor channel closes"); + + // The live subscription must still be delivering messages — the connection is intact. + wait_for_dummy_message(&mut connection, live_id) + .await + .expect("Live extractor should still send messages after peer subscription ended"); + + connection + .send(Message::Close(Some(CloseFrame { code: CloseCode::Normal, reason: "".into() }))) + .await + .ok(); + + Ok(()) + } + + /// Verifies that when the last subscription's channel closes, the WS connection itself + /// is closed — an idle connection with no subscriptions wastes a client connection slot. + #[actix_rt::test] + async fn test_connection_closed_when_last_subscription_ends() -> Result<(), String> { + tracing_subscriber::fmt() + .with_test_writer() + .try_init() + .unwrap_or_else(|_| debug!("Subscriber already initialized")); + + let closing_id = ExtractorIdentity::new(Chain::Ethereum, "only_extractor"); + let mut subscribers_map = HashMap::new(); + subscribers_map.insert( + closing_id.clone(), + Arc::new(ClosingMessageSender) as Arc, + ); + + let app_state = web::Data::new(WsData::new(subscribers_map)); + let server = start_with( + TestServerConfig::default().client_request_timeout(Duration::from_secs(5)), + move || { + App::new() + .wrap(RequestTracing::new()) + .app_data(app_state.clone()) + .service(web::resource("/ws/").route(web::get().to(WsActor::ws_index))) + }, + ); + + let url = server + .url("/ws/") + .to_string() + .replacen("http://", "ws://", 1); + let (mut connection, _) = tokio_tungstenite::connect_async(url) + .await + .expect("Failed to connect"); + + connection + .send(Message::Text( + serde_json::to_string(&Command::Subscribe { + extractor_id: closing_id.clone().into(), + include_state: false, + compression: false, + partial_blocks: false, + }) + .unwrap(), + )) + .await + .expect("Failed to send subscribe"); + wait_for_new_subscription(&mut connection) + .await + .expect("Failed to get subscription"); + + // The channel closes immediately, so we get SubscriptionEnded. + wait_for_subscription_ended(&mut connection) + .await + .expect("Expected SubscriptionEnded"); + + // With no subscriptions left the actor should stop, closing the connection. + let closed = timeout(Duration::from_secs(2), connection.next()).await; + match closed { + Ok(Some(Ok(Message::Close(_)))) | Ok(None) => {} + other => panic!("Expected connection close, got: {:?}", other), + } + + Ok(()) + } } diff --git a/crates/tycho-storage/src/postgres/cache.rs b/crates/tycho-storage/src/postgres/cache.rs index b304e1793d..7f553a9a3b 100644 --- a/crates/tycho-storage/src/postgres/cache.rs +++ b/crates/tycho-storage/src/postgres/cache.rs @@ -688,6 +688,21 @@ impl CachedGateway { } } + /// Creates a fresh gateway instance sharing the same write channel and connection pool, + /// but with its own `open_tx` and `lru_cache`. + /// + /// Use this instead of `clone()` when the intent is to create an independent writer + /// (e.g. for a restarted extractor) rather than sharing cache state. + pub fn new_instance(&self) -> Self { + CachedGateway { + tx: self.tx.clone(), + open_tx: Arc::new(Mutex::new(None)), + pool: self.pool.clone(), + state_gateway: self.state_gateway.clone(), + lru_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(5).unwrap()))), + } + } + pub async fn get_delta( &self, chain: &Chain,