diff --git a/Cargo.lock b/Cargo.lock index 14c65d9f4..28c8a78be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1809,7 +1809,7 @@ dependencies = [ "bitflags 2.9.4", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.13.0", "proc-macro2 1.0.101", "quote 1.0.41", "regex", @@ -1827,7 +1827,7 @@ dependencies = [ "bitflags 2.9.4", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2 1.0.101", @@ -2473,41 +2473,14 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "clickhouse" -version = "0.12.2" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3093f817c4f81c8bd174ed8dd30eac785821a8a7eef27a7dcb7f8cd0d0f6548" +checksum = "52d6ac02411e84914fdf4e0565bfe02fc4bebdf375bd1fc58168bad74b3707a2" dependencies = [ "bstr", "bytes", "cityhash-rs", - "clickhouse-derive 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures", - "futures-channel", - "http-body-util", - "hyper 1.7.0", - "hyper-tls 0.6.0", - "hyper-util", - "lz4_flex", - "replace_with", - "sealed 0.5.0", - "serde", - "static_assertions", - "thiserror 1.0.69", - "time", - "tokio", - "url", - "uuid", -] - -[[package]] -name = "clickhouse" -version = "0.13.3" -source = "git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2#8cf3d2e138dd121367fa10e875d3f91374b075b2" -dependencies = [ - "bstr", - "bytes", - "cityhash-rs", - "clickhouse-derive 0.2.0 (git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2)", + "clickhouse-macros", "clickhouse-types", "futures-channel", "futures-util", @@ -2518,7 +2491,7 @@ dependencies = [ "lz4_flex", "quanta", "replace_with", - "sealed 0.6.0", + "sealed", "serde", "static_assertions", "thiserror 2.0.17", @@ -2541,9 +2514,10 @@ dependencies = [ ] [[package]] -name = "clickhouse-derive" -version = "0.2.0" -source = "git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2#8cf3d2e138dd121367fa10e875d3f91374b075b2" +name = "clickhouse-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6669899e23cb87b43daf7996f0ea3b9c07d0fb933d745bb7b815b052515ae3" dependencies = [ "proc-macro2 1.0.101", "quote 1.0.41", @@ -2554,7 +2528,8 @@ dependencies = [ [[package]] name = "clickhouse-types" version = "0.1.0" -source = "git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2#8cf3d2e138dd121367fa10e875d3f91374b075b2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235f72141cfbe1d2d930d8156a34814c8a3d60491febb9af64cc52a203444764" dependencies = [ "bytes", "thiserror 2.0.17", @@ -3001,7 +2976,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa", - "phf 0.10.1", + "phf 0.11.3", "smallvec", ] @@ -3429,7 +3404,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3803,7 +3778,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5326,7 +5301,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.57.0", + "windows-core 0.62.2", ] [[package]] @@ -7458,7 +7433,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -7585,7 +7560,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77e878c846a8abae00dd069496dbe8751b16ac1c3d6bd2a7283a938e8228f90d" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.4.0", "proc-macro2 1.0.101", "quote 1.0.41", "syn 2.0.106", @@ -7984,9 +7959,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" dependencies = [ - "phf_macros 0.10.0", "phf_shared 0.10.0", - "proc-macro-hack", ] [[package]] @@ -7995,7 +7968,7 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" dependencies = [ - "phf_macros 0.11.3", + "phf_macros", "phf_shared 0.11.3", "serde", ] @@ -8030,20 +8003,6 @@ dependencies = [ "rand 0.8.5", ] -[[package]] -name = "phf_macros" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58fdf3184dd560f160dd73922bea2d5cd6e8f064bf4b13110abd81b03697b4e0" -dependencies = [ - "phf_generator 0.10.0", - "phf_shared 0.10.0", - "proc-macro-hack", - "proc-macro2 1.0.101", - "quote 1.0.41", - "syn 1.0.109", -] - [[package]] name = "phf_macros" version = "0.11.3" @@ -8650,12 +8609,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "0.4.30" @@ -8803,8 +8756,8 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ - "heck 0.4.1", - "itertools 0.11.0", + "heck 0.5.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -8824,7 +8777,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2 1.0.101", "quote 1.0.41", "syn 2.0.106", @@ -8950,7 +8903,7 @@ dependencies = [ "once_cell", "socket2 0.6.1", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -9412,7 +9365,7 @@ dependencies = [ "alloy-transport-http", "bid-scraper", "built", - "clickhouse 0.12.2", + "clickhouse", "ctor", "derivative", "exponential-backoff", @@ -9432,6 +9385,7 @@ dependencies = [ "rbuilder", "rbuilder-config", "rbuilder-primitives", + "rbuilder-utils", "redis", "reqwest 0.12.24", "reth-primitives", @@ -9529,8 +9483,8 @@ dependencies = [ "ahash", "alloy-primitives 1.4.1", "auto_impl", - "clickhouse 0.13.3", - "clickhouse-derive 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "clickhouse", + "clickhouse-derive", "derivative", "derive_more 2.0.1", "dyn-clone", @@ -9544,6 +9498,7 @@ dependencies = [ "reqwest 0.12.24", "reth-tasks 1.8.2", "serde", + "serde_bytes", "serde_json", "serde_with", "sha2 0.10.9", @@ -12958,7 +12913,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -13225,18 +13180,6 @@ dependencies = [ "untrusted 0.9.0", ] -[[package]] -name = "sealed" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" -dependencies = [ - "heck 0.4.1", - "proc-macro2 1.0.101", - "quote 1.0.41", - "syn 2.0.106", -] - [[package]] name = "sealed" version = "0.6.0" @@ -13429,6 +13372,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" +dependencies = [ + "serde", + "serde_core", +] + [[package]] name = "serde_core" version = "1.0.228" @@ -14553,7 +14506,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.1.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -15990,7 +15943,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/crates/rbuilder-operator/Cargo.toml b/crates/rbuilder-operator/Cargo.toml index 030c1b2de..995c6d1ee 100644 --- a/crates/rbuilder-operator/Cargo.toml +++ b/crates/rbuilder-operator/Cargo.toml @@ -25,6 +25,7 @@ systemd-units = { enable = false, start = false, unit-name = "rbuilder-operator" [dependencies] rbuilder-primitives.workspace = true +rbuilder-utils.workspace = true rbuilder-config.workspace = true rbuilder.workspace = true metrics_macros.workspace = true @@ -70,7 +71,7 @@ iceoryx2 = "0.7.0" iceoryx2-bb-container = "0.7.0" prometheus.workspace = true ctor.workspace = true -clickhouse = { version = "0.12.2", features = ["time", "uuid", "native-tls"] } +clickhouse = { version = "0.14.0", features = ["time", "uuid", "native-tls"] } futures-util.workspace = true parking_lot.workspace = true lazy_static.workspace = true diff --git a/crates/rbuilder-operator/src/clickhouse.rs b/crates/rbuilder-operator/src/clickhouse.rs new file mode 100644 index 000000000..d1fc0be87 --- /dev/null +++ b/crates/rbuilder-operator/src/clickhouse.rs @@ -0,0 +1,294 @@ +//! Clickhouse integration to save all the blocks we build and submit to relays. + +use std::{sync::Arc, time::Duration}; + +use alloy_primitives::{utils::format_ether, Address, U256}; +use alloy_rpc_types_beacon::relay::SubmitBlockRequest as AlloySubmitBlockRequest; +use clickhouse::{Client, Row}; +use rbuilder::{ + building::BuiltBlockTrace, + live_builder::{ + block_output::bidding_service_interface::BidObserver, payload_events::MevBoostSlotData, + }, +}; +use rbuilder_primitives::{Order, OrderId}; +use rbuilder_utils::clickhouse::{ + backup::{ + metrics::NullMetrics, + primitives::{ClickhouseIndexableData, ClickhouseRowExt}, + }, + serde::{option_u256, vec_u256}, + spawn_clickhouse_inserter_and_backup, +}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tracing::error; + +use crate::flashbots_config::BuiltBlocksClickhouseConfig; + +/// BlockRow to insert in clickhouse and also as entry type for the indexer since the BlockRow is made from a few &objects so it makes no sense to have a Block type and copy all the fields. +#[derive(Debug, Clone, Serialize, Deserialize, Row)] +pub struct BlockRow { + pub block_number: u64, + pub profit: String, + pub slot: u64, + pub hash: String, + pub gas_limit: u64, + pub gas_used: u64, + pub base_fee: u64, + pub parent_hash: String, + pub proposer_pubkey: String, + pub proposer_fee_recipient: String, + pub builder_pubkey: String, + pub timestamp: u64, + pub timestamp_datetime: i64, + pub orders_closed_at: i64, + pub sealed_at: i64, + pub algorithm: String, + + #[serde(with = "option_u256")] + pub true_value: Option, + #[serde(with = "option_u256")] + pub best_relay_value: Option, + #[serde(with = "option_u256")] + pub block_value: Option, + + pub used_bundle_hashes: Vec, + pub used_bundle_uuids: Vec, + pub used_sbundles_hashes: Vec, + pub delayed_payment_sources: Vec, + + #[serde(with = "vec_u256")] + pub delayed_payment_values: Vec, + + pub delayed_payment_addresses: Vec, +} + +impl ClickhouseRowExt for BlockRow { + type TraceId = String; + const TABLE_NAME: &'static str = "blocks"; + + fn trace_id(&self) -> String { + self.hash.clone() + } + + fn to_row_ref(row: &Self) -> &::Value<'_> { + row + } +} + +impl ClickhouseIndexableData for BlockRow { + type ClickhouseRowType = BlockRow; + + const DATA_NAME: &'static str = ::TABLE_NAME; + + fn trace_id(&self) -> String { + self.hash.clone() + } + + fn to_row(self, _builder_name: String) -> Self::ClickhouseRowType { + self + } +} + +const KILO: u64 = 1024; +const MEGA: u64 = KILO * KILO; + +// Super worst scenario we submit 500 blocks per second so we have 10 seconds of buffer. +// After this having this queued blocks we will start to drop. BlockRow is small enough (in the order of 10K, only hashes/ids, not full orders) so 5K BlockRows is not too much memory. +const BUILT_BLOCKS_CHANNEL_SIZE: usize = 5 * 1024; +const BLOCKS_TABLE_NAME: &str = "blocks"; +const DEFAULT_MAX_DISK_SIZE_MB: u64 = 10 * KILO; +const DEFAULT_MAX_MEMORY_SIZE_MB: u64 = KILO; +#[derive(Debug)] +pub struct BuiltBlocksWriter { + blocks_tx: mpsc::Sender, +} + +impl BuiltBlocksWriter { + pub fn new(config: BuiltBlocksClickhouseConfig, cancellation_token: CancellationToken) -> Self { + let client = Client::default() + .with_url(config.host) + .with_database(config.database) + .with_user(config.username) + .with_password(config.password) + .with_validation(false); // CRITICAL for U256 serialization. + + let task_manager = rbuilder_utils::tasks::TaskManager::current(); + let task_executor = task_manager.executor(); + + let (block_tx, block_rx) = mpsc::channel::(BUILT_BLOCKS_CHANNEL_SIZE); + spawn_clickhouse_inserter_and_backup::( + &client, + block_rx, + &task_executor, + BLOCKS_TABLE_NAME.to_string(), + "".to_string(), // No buildername used in blocks table. + Some(config.disk_database_path), + Some(config.disk_max_size_mb.unwrap_or(DEFAULT_MAX_DISK_SIZE_MB) * MEGA), + config + .memory_max_size_mb + .unwrap_or(DEFAULT_MAX_MEMORY_SIZE_MB) + * MEGA, + BLOCKS_TABLE_NAME, + ); + // Task to forward the cancellation to the task_manager. + tokio::spawn(async move { + cancellation_token.cancelled().await; + // @Pending: Needed to avoid losing blocks but we should try to avoid this. + tokio::time::sleep(Duration::from_secs(1)).await; + task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5)); + }); + Self { + blocks_tx: block_tx, + } + } +} + +fn offset_date_to_clickhouse_timestamp(date: OffsetDateTime) -> i64 { + (date.unix_timestamp_nanos() / 1000) as i64 +} + +fn get_used_sbundles_hashes(built_block_trace: &BuiltBlockTrace) -> Vec { + built_block_trace + .included_orders + .iter() + .flat_map(|exec_result| { + if let Order::ShareBundle(sbundle) = &exec_result.order { + // don't like having special cases (merged vs not merged), can we improve this? + if sbundle.is_merged_order() { + exec_result + .original_order_ids + .iter() + .map(|id| id.to_string()) + .collect() + } else if exec_result.tx_infos.is_empty() { + // non merged empty execution sbundle + vec![] + } else { + // non merged non empty execution sbundle + vec![exec_result.order.id().to_string()] + } + } else { + Vec::new() + } + }) + .collect() +} + +const MEV_VIRTUAL_BLOCKER_SOURCE: &str = "mev_blocker"; +const MEV_VIRTUAL_ADDRESS: Address = Address::ZERO; + +/// (sources, values, addresses) +fn get_delayed_payments( + built_block_trace: &BuiltBlockTrace, +) -> (Vec, Vec, Vec
) { + let mut sources = Vec::new(); + let mut values = Vec::new(); + let mut addresses = Vec::new(); + for res in &built_block_trace.included_orders { + if let Some(delayed_kickback) = &res.delayed_kickback { + if !delayed_kickback.should_pay_in_block { + match res.order.id() { + OrderId::Bundle(uuid) => { + sources.push(uuid.to_string()); + values.push(delayed_kickback.payout_value); + addresses.push(delayed_kickback.recipient); + } + _ => { + error!(order = ?res.order.id(), "Delayed kickback is found for non-bundle"); + } + } + } + } + } + sources.push(MEV_VIRTUAL_BLOCKER_SOURCE.into()); + values.push(built_block_trace.mev_blocker_price); + addresses.push(MEV_VIRTUAL_ADDRESS); + (sources, values, addresses) +} + +impl BidObserver for BuiltBlocksWriter { + fn block_submitted( + &self, + slot_data: &MevBoostSlotData, + submit_block_request: Arc, + built_block_trace: Arc, + builder_name: String, + best_bid_value: U256, + ) { + let slot = slot_data.slot(); + let block_number = slot_data.block(); + let blocks_tx = self.blocks_tx.clone(); + tokio::spawn(async move { + let submit_trace = submit_block_request.bid_trace(); + let execution_payload_v1 = match submit_block_request.as_ref() { + AlloySubmitBlockRequest::Capella(request) => { + &request.execution_payload.payload_inner + } + AlloySubmitBlockRequest::Deneb(request) => { + &request.execution_payload.payload_inner.payload_inner + } + AlloySubmitBlockRequest::Electra(request) => { + &request.execution_payload.payload_inner.payload_inner + } + AlloySubmitBlockRequest::Fulu(request) => { + &request.execution_payload.payload_inner.payload_inner + } + }; + let mut used_bundle_hashes = Vec::new(); + let mut used_bundle_uuids = Vec::new(); + for res in &built_block_trace.included_orders { + if let Order::Bundle(bundle) = &res.order { + used_bundle_hashes + .push(bundle.external_hash.unwrap_or(bundle.hash).to_string()); + used_bundle_uuids.push(bundle.uuid.to_string()); + } + } + let used_sbundles_hashes = get_used_sbundles_hashes(&built_block_trace); + let (delayed_payment_sources, delayed_payment_values, delayed_payment_addresses) = + get_delayed_payments(&built_block_trace); + let delayed_payment_addresses = delayed_payment_addresses + .iter() + .map(|address| address.to_string()) + .collect(); + let block_row = BlockRow { + block_number, + profit: format_ether(built_block_trace.true_bid_value), + slot, + hash: execution_payload_v1.block_hash.to_string(), + gas_limit: submit_trace.gas_limit, + gas_used: submit_trace.gas_used, + base_fee: execution_payload_v1 + .base_fee_per_gas + .try_into() + .unwrap_or_default(), + parent_hash: submit_trace.parent_hash.to_string(), + proposer_pubkey: submit_trace.proposer_pubkey.to_string(), + proposer_fee_recipient: submit_trace.proposer_fee_recipient.to_string(), + builder_pubkey: submit_trace.builder_pubkey.to_string(), + timestamp: execution_payload_v1.timestamp, + timestamp_datetime: execution_payload_v1.timestamp as i64 * 1_000_000, + orders_closed_at: offset_date_to_clickhouse_timestamp( + built_block_trace.orders_closed_at, + ), + sealed_at: offset_date_to_clickhouse_timestamp(built_block_trace.orders_sealed_at), + algorithm: builder_name, + true_value: Some(built_block_trace.true_bid_value), + best_relay_value: Some(best_bid_value), + block_value: Some(submit_trace.value), + used_bundle_hashes, + used_bundle_uuids, + used_sbundles_hashes, + delayed_payment_sources, + delayed_payment_values, + delayed_payment_addresses, + }; + if let Err(err) = blocks_tx.try_send(block_row) { + error!(?err, "Failed to send block to clickhouse"); + } + }); + } +} diff --git a/crates/rbuilder-operator/src/flashbots_config.rs b/crates/rbuilder-operator/src/flashbots_config.rs index 4e245b6f1..6583abeb5 100644 --- a/crates/rbuilder-operator/src/flashbots_config.rs +++ b/crates/rbuilder-operator/src/flashbots_config.rs @@ -45,11 +45,12 @@ use crate::{ SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD, }, build_info::rbuilder_version, + clickhouse::BuiltBlocksWriter, true_block_value_push::best_true_value_observer::BestTrueValueObserver, }; use clickhouse::Client; -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; #[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)] pub struct ClickhouseConfig { @@ -72,6 +73,20 @@ struct TBVPushRedisConfig { pub channel: String, } +/// Config used to record built blocks to clickhouse using a local +/// storage on errors. +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)] +pub struct BuiltBlocksClickhouseConfig { + /// clickhouse host url (starts with http/https) + pub host: String, + pub database: String, + pub username: String, + pub password: String, + pub disk_database_path: PathBuf, + pub disk_max_size_mb: Option, + pub memory_max_size_mb: Option, +} + #[serde_as] #[derive(Debug, Clone, Deserialize, PartialEq, Derivative)] #[serde(default, deny_unknown_fields)] @@ -83,6 +98,8 @@ pub struct FlashbotsConfig { #[serde(flatten)] pub l1_config: L1Config, + /// Clickhouse config for fetching blocks from clickhouse for backtesting. + /// This should not be here.... #[serde(flatten)] clickhouse: ClickhouseConfig, @@ -113,6 +130,9 @@ pub struct FlashbotsConfig { /// For production we always need some tbv push (since it's used by smart-multiplexing.) so: /// !Some(key_registration_url) => Some(tbv_push_redis) tbv_push_redis: Option, + + /// Should always be set on buildernet. + built_blocks_clickhouse_config: Option, } impl LiveBuilderConfig for FlashbotsConfig { @@ -288,8 +308,17 @@ impl FlashbotsConfig { /// - Secure block processor client (using block_processor_key to sign) fn create_block_processor_client( &self, + cancellation_token: &CancellationToken, block_processor_key: Option, ) -> eyre::Result>> { + if let Some(built_blocks_clickhouse_config) = &self.built_blocks_clickhouse_config { + let writer = BuiltBlocksWriter::new( + built_blocks_clickhouse_config.clone(), + cancellation_token.clone(), + ); + return Ok(Some(Box::new(writer))); + } + if let Some(url) = &self.blocks_processor_url { let bid_observer: Box = if let Some( block_processor_key, @@ -335,7 +364,8 @@ impl FlashbotsConfig { }; let bid_observer = RbuilderOperatorBidObserver { - block_processor: self.create_block_processor_client(block_processor_key.clone())?, + block_processor: self + .create_block_processor_client(cancellation_token, block_processor_key.clone())?, tbv_pusher: self.create_tbv_pusher(block_processor_key, cancellation_token)?, }; Ok(Box::new(bid_observer)) diff --git a/crates/rbuilder-operator/src/lib.rs b/crates/rbuilder-operator/src/lib.rs index 180b9cb4b..6b3cd56c7 100644 --- a/crates/rbuilder-operator/src/lib.rs +++ b/crates/rbuilder-operator/src/lib.rs @@ -1,6 +1,7 @@ pub mod bidding_service_wrapper; pub mod blocks_processor; pub mod build_info; +pub mod clickhouse; pub mod flashbots_config; pub mod flashbots_signer; pub mod metrics; diff --git a/crates/rbuilder-utils/Cargo.toml b/crates/rbuilder-utils/Cargo.toml index af8576010..12e9ca869 100644 --- a/crates/rbuilder-utils/Cargo.toml +++ b/crates/rbuilder-utils/Cargo.toml @@ -20,6 +20,7 @@ governor = "0.6.3" ahash.workspace = true reqwest = { workspace = true, features = ["blocking"] } serde_with = { workspace = true, features = ["time_0_3"] } +serde_bytes = "0.11" toml.workspace = true tracing.workspace = true time.workspace = true @@ -43,7 +44,7 @@ tokio = { version = "1.40.0", default-features = false, features = [ "test-util" ] } -clickhouse = { git = "https://github.com/ClickHouse/clickhouse-rs", rev = "8cf3d2e138dd121367fa10e875d3f91374b075b2", features = [ +clickhouse = { version = "0.14.0", features = [ "inserter", "time", "uuid", diff --git a/crates/rbuilder-utils/src/clickhouse/backup/macros.rs b/crates/rbuilder-utils/src/clickhouse/backup/macros.rs deleted file mode 100644 index 5b08591c4..000000000 --- a/crates/rbuilder-utils/src/clickhouse/backup/macros.rs +++ /dev/null @@ -1,54 +0,0 @@ -//! Helpful macros spawning clickhouse indexer tasks. - -// Rationale: a simple text-replacement macro was much more effective compared to fighting the -// compiler with additional trait bounds on the [`clickhouse::Row`] trait. - -#[macro_export] -macro_rules! spawn_clickhouse_inserter { - ($executor:ident, $runner:ident, $name:expr, $target:expr) => {{ - $executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { - let mut shutdown_guard = None; - tokio::select! { - _ = $runner.run_loop() => { - tracing::info!(target: $target, "clickhouse {} indexer channel closed", $name); - } - guard = shutdown => { - tracing::info!(target: $target, "Received shutdown for {} indexer, performing cleanup", $name); - shutdown_guard = Some(guard); - }, - } - - match $runner.end().await { - Ok(quantities) => { - tracing::info!(target: $target, ?quantities, "finalized clickhouse {} inserter", $name); - } - Err(e) => { - tracing::error!(target: $target, ?e, "failed to write end insertion of {} to indexer", $name); - } - } - - drop(shutdown_guard); - }); - }}; -} - -#[macro_export] -macro_rules! spawn_clickhouse_backup { - ($executor:ident, $backup:ident, $name: expr, $target:expr) => {{ - $executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { - let mut shutdown_guard = None; - tokio::select! { - _ = $backup.run() => { - tracing::info!(target: $target, "clickhouse {} backup channel closed", $name); - } - guard = shutdown => { - tracing::info!(target: $target, "Received shutdown for {} backup, performing cleanup", $name); - shutdown_guard = Some(guard); - }, - } - - $backup.end().await; - drop(shutdown_guard); - }); - }}; -} diff --git a/crates/rbuilder-utils/src/clickhouse/backup/mod.rs b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs index 67d800bd3..9d22006f2 100644 --- a/crates/rbuilder-utils/src/clickhouse/backup/mod.rs +++ b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs @@ -1,10 +1,8 @@ -pub mod macros; pub mod metrics; pub mod primitives; use std::{ collections::VecDeque, - marker::PhantomData, path::PathBuf, sync::{Arc, RwLock}, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, @@ -21,7 +19,7 @@ use crate::{ clickhouse::{ backup::{ metrics::Metrics, - primitives::{ClickhouseIndexableOrder, ClickhouseRowExt}, + primitives::{ClickhouseIndexableData, ClickhouseRowExt}, }, indexer::{ default_disk_backup_database_path, MAX_DISK_BACKUP_SIZE_BYTES, @@ -72,7 +70,7 @@ impl FailedCommit { } } -impl Default for FailedCommit { +impl Default for FailedCommit { fn default() -> Self { Self { rows: Vec::new(), @@ -230,15 +228,13 @@ pub(crate) enum DiskBackupError { /// A disk backup for failed commits. This handle to a database allows to write only to one table /// for scoped access. If you want to write to another table, clone it using /// [`Self::clone_with_table`]. -#[derive(Debug)] -pub struct DiskBackup { +#[derive(Debug, Clone)] +pub struct DiskBackup { db: Arc>, config: DiskBackupConfig, - - _marker: PhantomData, } -impl DiskBackup { +impl DiskBackup { pub fn new( config: DiskBackupConfig, task_executor: &TaskExecutor, @@ -253,7 +249,6 @@ impl DiskBackup { let disk_backup = Self { db: Arc::new(RwLock::new(db)), config, - _marker: Default::default(), }; task_executor.spawn({ @@ -265,32 +260,16 @@ impl DiskBackup { Ok(disk_backup) } - - /// Like `clone`, but allows to change the type parameter `U`. - pub fn clone_to(&self) -> DiskBackup { - DiskBackup { - db: self.db.clone(), - config: self.config.clone(), - _marker: Default::default(), - } - } -} - -impl Clone for DiskBackup { - fn clone(&self) -> Self { - Self { - db: self.db.clone(), - config: self.config.clone(), - _marker: Default::default(), - } - } } -impl DiskBackup { +impl DiskBackup { /// Saves a new failed commit to disk. `commit_immediately` indicates whether to force /// durability on write. - fn save(&mut self, data: &FailedCommit) -> Result { - let table_def = Table::new(T::ORDER); + fn save( + &mut self, + data: &FailedCommit, + ) -> Result { + let table_def = Table::new(T::TABLE_NAME); // NOTE: not efficient, but we don't expect to store a lot of data here. let bytes = serde_json::to_vec(&data)?; @@ -314,10 +293,10 @@ impl DiskBackup { } /// Retrieves the oldest failed commit from disk, if any. - fn retrieve_oldest( + fn retrieve_oldest( &mut self, ) -> Result>>, DiskBackupError> { - let table_def = Table::new(T::ORDER); + let table_def = Table::new(T::TABLE_NAME); let reader = self.db.read().expect("not poisoned").begin_read()?; let table = match reader.open_table(table_def) { @@ -353,8 +332,11 @@ impl DiskBackup { } /// Deletes the failed commit with the given key from disk. - fn delete(&mut self, key: DiskBackupKey) -> Result { - let table_def = Table::new(T::ORDER); + fn delete( + &mut self, + key: DiskBackupKey, + ) -> Result { + let table_def = Table::new(T::TABLE_NAME); let mut writer = self.db.write().expect("not poisoned").begin_write()?; writer.set_durability(redb::Durability::Immediate)?; @@ -508,7 +490,7 @@ pub struct Backup { /// By sending backup data less often, we give time gaps for these operation to be performed. rx: mpsc::Receiver>, /// The disk cache of failed commits. - disk_backup: DiskBackup, + disk_backup: DiskBackup, /// The in-memory cache of failed commits. memory_backup: MemoryBackup, /// A clickhouse inserter for committing again the data. @@ -529,7 +511,7 @@ impl Backup { pub fn new( rx: mpsc::Receiver>, inserter: Inserter, - disk_backup: DiskBackup, + disk_backup: DiskBackup, ) -> Self { Self { rx, @@ -553,7 +535,7 @@ impl Backup { /// Backs up a failed commit, first trying to write to disk, then to memory. fn backup(&mut self, failed_commit: FailedCommit) { let quantities = failed_commit.quantities; - tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit"); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit"); #[cfg(any(test, feature = "test-utils"))] if self.use_only_memory_backup { @@ -568,23 +550,27 @@ impl Backup { let start = Instant::now(); match self.disk_backup.save(&failed_commit) { Ok(stats) => { - tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk"); - MetricsType::set_disk_backup_size(stats.size_bytes, stats.total_batches, T::ORDER); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk"); + MetricsType::set_disk_backup_size( + stats.size_bytes, + stats.total_batches, + T::TABLE_NAME, + ); return; } Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit, trying in-memory"); - MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to write commit, trying in-memory"); + MetricsType::increment_backup_disk_errors(T::TABLE_NAME, e.as_ref()); } }; let stats = self.memory_backup.save(failed_commit); - MetricsType::set_memory_backup_size(stats.size_bytes, stats.total_batches, T::ORDER); - tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory"); + MetricsType::set_memory_backup_size(stats.size_bytes, stats.total_batches, T::TABLE_NAME); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory"); if let Some((stats, oldest_quantities)) = self.memory_backup.drop_excess() { - tracing::warn!(target: TARGET, order = T::ORDER, ?stats, "failed commits exceeded max memory backup size, dropping oldest"); + tracing::warn!(target: TARGET, order = T::TABLE_NAME, ?stats, "failed commits exceeded max memory backup size, dropping oldest"); MetricsType::process_backup_data_lost_quantities(&oldest_quantities); // Clear the cached last commit if it was from memory and we just dropped it. self.last_cached = self @@ -597,12 +583,12 @@ impl Backup { /// Retrieves the oldest failed commit, first trying from memory, then from disk. fn retrieve_oldest(&mut self) -> Option> { if let Some(cached) = self.last_cached.take() { - tracing::debug!(target: TARGET, order = T::ORDER, rows = cached.commit.rows.len(), "retrieved last cached failed commit"); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, rows = cached.commit.rows.len(), "retrieved last cached failed commit"); return Some(cached); } if let Some(commit) = self.memory_backup.retrieve_oldest() { - tracing::debug!(target: TARGET, order = T::ORDER, rows = commit.rows.len(), "retrieved oldest failed commit from memory"); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, rows = commit.rows.len(), "retrieved oldest failed commit from memory"); return Some(RetrievedFailedCommit { source: BackupSource::Memory, commit, @@ -612,7 +598,7 @@ impl Backup { match self.disk_backup.retrieve_oldest() { Ok(maybe_commit) => { maybe_commit.inspect(|data| { - tracing::debug!(target: TARGET, order = T::ORDER, rows = data.stats.total_batches, "retrieved oldest failed commit from disk"); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, rows = data.stats.total_batches, "retrieved oldest failed commit from disk"); }) .map(|data| RetrievedFailedCommit { source: BackupSource::Disk(data.key), @@ -620,8 +606,8 @@ impl Backup { }) } Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to retrieve oldest failed commit from disk"); - MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to retrieve oldest failed commit from disk"); + MetricsType::increment_backup_disk_errors(T::TABLE_NAME, e.as_ref()); None } } @@ -634,7 +620,7 @@ impl Backup { if let Err(e) = self.inserter.write(value_ref).await { MetricsType::increment_write_failures(e.to_string()); - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter"); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to write to backup inserter"); continue; } } @@ -644,32 +630,32 @@ impl Backup { async fn purge_commit(&mut self, retrieved: &RetrievedFailedCommit) { if let BackupSource::Disk(key) = retrieved.source { let start = Instant::now(); - match self.disk_backup.delete(key) { + match self.disk_backup.delete::(key) { Ok(stats) => { - tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk"); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk"); MetricsType::set_disk_backup_size( stats.size_bytes, stats.total_batches, - T::ORDER, + T::TABLE_NAME, ); } Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to purge failed commit from disk"); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to purge failed commit from disk"); } } - tracing::debug!(target: TARGET, order = T::ORDER, "purged committed failed commit from disk"); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, "purged committed failed commit from disk"); } } /// Run the backup actor until it is possible to receive messages. /// /// If some data were stored on disk previously, they will be retried first. - pub async fn run(&mut self) { + async fn run(&mut self) { loop { tokio::select! { maybe_failed_commit = self.rx.recv() => { let Some(failed_commit) = maybe_failed_commit else { - tracing::error!(target: TARGET, order = T::ORDER, "backup channel closed"); + tracing::error!(target: TARGET, order = T::TABLE_NAME, "backup channel closed"); break; }; @@ -678,7 +664,7 @@ impl Backup { _ = self.interval.tick() => { let Some(oldest) = self.retrieve_oldest() else { self.interval.reset(); - MetricsType::set_backup_empty_size(T::ORDER); + MetricsType::set_backup_empty_size(T::TABLE_NAME); continue // Nothing to do! }; @@ -687,14 +673,14 @@ impl Backup { let start = Instant::now(); match self.inserter.force_commit().await { Ok(quantities) => { - tracing::info!(target: TARGET, order = T::ORDER, ?quantities, "successfully backed up"); + tracing::info!(target: TARGET, order = T::TABLE_NAME, ?quantities, "successfully backed up"); MetricsType::process_backup_data_quantities(&quantities.into()); MetricsType::record_batch_commit_time(start.elapsed()); self.interval.reset(); self.purge_commit(&oldest).await; } Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup"); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup"); MetricsType::increment_commit_failures(e.to_string()); self.last_cached = Some(oldest); continue; @@ -707,41 +693,63 @@ impl Backup { /// To call on shutdown, tries make a last-resort attempt to post back to Clickhouse all /// in-memory data. - pub async fn end(mut self) { + async fn end(mut self) { for failed_commit in self.memory_backup.failed_commits.drain(..) { for row in &failed_commit.rows { let value_ref = T::to_row_ref(row); if let Err(e) = self.inserter.write(value_ref).await { - tracing::error!( target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter during shutdown"); + tracing::error!( target: TARGET, order = T::TABLE_NAME, ?e, "failed to write to backup inserter during shutdown"); MetricsType::increment_write_failures(e.to_string()); continue; } } if let Err(e) = self.inserter.force_commit().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit backup to CH during shutdown, trying disk"); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to commit backup to CH during shutdown, trying disk"); MetricsType::increment_commit_failures(e.to_string()); } if let Err(e) = self.disk_backup.save(&failed_commit) { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit to disk backup during shutdown"); - MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to write commit to disk backup during shutdown"); + MetricsType::increment_backup_disk_errors(T::TABLE_NAME, e.as_ref()); } } if let Err(e) = self.disk_backup.flush().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to flush disk backup during shutdown"); - MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to flush disk backup during shutdown"); + MetricsType::increment_backup_disk_errors(T::TABLE_NAME, e.as_ref()); } else { - tracing::info!(target: TARGET, order = T::ORDER, "flushed disk backup during shutdown"); + tracing::info!(target: TARGET, order = T::TABLE_NAME, "flushed disk backup during shutdown"); } if let Err(e) = self.inserter.end().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to end backup inserter during shutdown"); + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to end backup inserter during shutdown"); } else { - tracing::info!(target: TARGET, order = T::ORDER, "successfully ended backup inserter during shutdown"); + tracing::info!(target: TARGET, order = T::TABLE_NAME, "successfully ended backup inserter during shutdown"); } } + + /// Spawns the inserter runner on the given task executor. + pub fn spawn(mut self, task_executor: &TaskExecutor, name: String, target: &'static str) + where + MetricsType: Send + Sync + 'static, + for<'a> ::Value<'a>: Sync, + { + task_executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { + let mut shutdown_guard = None; + tokio::select! { + _ = self.run() => { + tracing::info!(target, "clickhouse {} backup channel closed", name); + } + guard = shutdown => { + tracing::info!(target, "Received shutdown for {} backup, performing cleanup", name); + shutdown_guard = Some(guard); + }, + } + self.end().await; + drop(shutdown_guard); + }); + } } #[cfg(any(test, feature = "test-utils"))] @@ -749,9 +757,11 @@ impl Backup { pub fn new_test( rx: mpsc::Receiver>, inserter: Inserter, - disk_backup: DiskBackup, + disk_backup: DiskBackup, use_only_memory_backup: bool, ) -> Self { + use std::marker::PhantomData; + Self { rx, inserter, diff --git a/crates/rbuilder-utils/src/clickhouse/backup/primitives.rs b/crates/rbuilder-utils/src/clickhouse/backup/primitives.rs index 9bc53031b..bb1434a39 100644 --- a/crates/rbuilder-utils/src/clickhouse/backup/primitives.rs +++ b/crates/rbuilder-utils/src/clickhouse/backup/primitives.rs @@ -1,16 +1,18 @@ -use alloy_primitives::B256; use clickhouse::{Row, RowWrite}; use serde::{de::DeserializeOwned, Serialize}; pub trait ClickhouseRowExt: Row + RowWrite + Serialize + DeserializeOwned + Sync + Send + 'static { + /// Type of + type TraceId: std::fmt::Display + Send + Sync; + /// The type of such row, e.g. "bundles" or "bundle_receipts". Used as backup db table name and /// for informational purposes. - const ORDER: &'static str; + const TABLE_NAME: &'static str; /// An identifier of such row. - fn hash(&self) -> B256; + fn trace_id(&self) -> Self::TraceId; /// Internal function that takes the inner row types and extracts the reference needed for /// Clickhouse inserter functions like `Inserter::write`. While a default implementation is not @@ -19,15 +21,15 @@ pub trait ClickhouseRowExt: } /// An high-level order type that can be indexed in clickhouse. -pub trait ClickhouseIndexableOrder: Sized { +pub trait ClickhouseIndexableData: Sized { /// The associated inner row type that can be serialized into Clickhouse data. type ClickhouseRowType: ClickhouseRowExt; /// The type of such order, e.g. "bundles" or "transactions". For informational purposes. - const ORDER: &'static str; + const DATA_NAME: &'static str; - /// An identifier of such order. - fn hash(&self) -> B256; + /// An identifier of such element for when we need to trace it. + fn trace_id(&self) -> ::TraceId; /// Converts such order into the associated Clickhouse row type. fn to_row(self, builder_name: String) -> Self::ClickhouseRowType; diff --git a/crates/rbuilder-utils/src/clickhouse/indexer.rs b/crates/rbuilder-utils/src/clickhouse/indexer.rs index bfc732ed8..296da120d 100644 --- a/crates/rbuilder-utils/src/clickhouse/indexer.rs +++ b/crates/rbuilder-utils/src/clickhouse/indexer.rs @@ -11,13 +11,14 @@ const TARGET: &str = "indexer"; use clickhouse::{ error::Result as ClickhouseResult, inserter::Inserter, Client as ClickhouseClient, Row, }; +use reth_tasks::TaskExecutor; use tokio::sync::mpsc; use crate::{ clickhouse::{ backup::{ metrics::Metrics, - primitives::{ClickhouseIndexableOrder, ClickhouseRowExt}, + primitives::{ClickhouseIndexableData, ClickhouseRowExt}, FailedCommit, }, Quantities, @@ -78,7 +79,9 @@ pub struct ClickhouseInserter { _metrics_phantom: std::marker::PhantomData, } -impl ClickhouseInserter { +impl + ClickhouseInserter +{ pub fn new(inner: Inserter, backup_tx: mpsc::Sender>) -> Self { let rows_backup = Vec::new(); Self { @@ -91,12 +94,12 @@ impl ClickhouseInserter ClickhouseInserter { if quantities == Quantities::ZERO.into() { - tracing::trace!(target: TARGET, order = T::ORDER, "committed to inserter"); + tracing::trace!(target: TARGET, table = T::TABLE_NAME, "committed to inserter"); } else { - tracing::debug!(target: TARGET, order = T::ORDER, ?quantities, "inserted batch to clickhouse"); + tracing::debug!(target: TARGET, table = T::TABLE_NAME, ?quantities, "inserted batch to clickhouse"); MetricsType::process_quantities(&quantities.into()); MetricsType::record_batch_commit_time(start.elapsed()); // Clear the backup rows. @@ -125,13 +128,13 @@ impl ClickhouseInserter { MetricsType::increment_commit_failures(e.to_string()); - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit bundle to clickhouse"); + tracing::error!(target: TARGET, table = T::TABLE_NAME, ?e, "failed to commit bundle to clickhouse"); let rows = std::mem::take(&mut self.rows_backup); let failed_commit = FailedCommit::new(rows, pending); if let Err(e) = self.backup_tx.try_send(failed_commit) { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to send rows backup"); + tracing::error!(target: TARGET, table = T::TABLE_NAME, ?e, "failed to send rows backup"); } } } @@ -146,7 +149,7 @@ impl ClickhouseInserter std::fmt::Debug for ClickhouseInserter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ClickhouseInserter") - .field("inserter", &T::ORDER.to_string()) + .field("inserter", &T::TABLE_NAME.to_string()) .field("rows_backup_len", &self.rows_backup.len()) .finish() } @@ -154,7 +157,7 @@ impl std::fmt::Debug for ClickhouseInserter { +pub struct InserterRunner { /// The channel from which we can receive new orders to index. rx: mpsc::Receiver, /// The underlying Clickhouse inserter. @@ -163,18 +166,18 @@ pub struct InserterRunner { builder_name: String, } -impl std::fmt::Debug +impl std::fmt::Debug for InserterRunner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("InserterRunner") - .field("inserter", &T::ORDER.to_string()) + .field("inserter", &T::DATA_NAME.to_string()) .field("rx", &self.rx) .finish() } } -impl InserterRunner { +impl InserterRunner { pub fn new( rx: mpsc::Receiver, inserter: ClickhouseInserter, @@ -188,27 +191,59 @@ impl InserterRunner ClickhouseResult { + async fn end(self) -> ClickhouseResult { self.inserter.end().await } + + /// Spawns the inserter runner on the given task executor. + pub fn spawn(mut self, task_executor: &TaskExecutor, name: String, target: &'static str) + where + T: Send + Sync + 'static, + MetricsType: Send + Sync + 'static, + for<'a> ::Value<'a>: Sync, + { + task_executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { + let mut shutdown_guard = None; + tokio::select! { + _ = self.run_loop() => { + tracing::info!(target, "clickhouse {name} indexer channel closed"); + } + guard = shutdown => { + tracing::info!(target, "Received shutdown for {name} indexer, performing cleanup"); + shutdown_guard = Some(guard); + }, + } + + match self.end().await { + Ok(quantities) => { + tracing::info!(target, ?quantities, "finalized clickhouse {} inserter", name); + } + Err(e) => { + tracing::error!(target, ?e, "failed to write end insertion of {} to indexer", name); + } + } + drop(shutdown_guard); + + }); + } } /// The configuration used in a [`ClickhouseClient`]. diff --git a/crates/rbuilder-utils/src/clickhouse/mod.rs b/crates/rbuilder-utils/src/clickhouse/mod.rs index 0176f1472..c4580d982 100644 --- a/crates/rbuilder-utils/src/clickhouse/mod.rs +++ b/crates/rbuilder-utils/src/clickhouse/mod.rs @@ -1,6 +1,21 @@ pub mod backup; pub mod indexer; -use serde::{Deserialize, Serialize}; +pub mod serde; +use std::{path::PathBuf, time::Duration}; + +use ::serde::{Deserialize, Serialize}; +use clickhouse::Client; +use reth_tasks::TaskExecutor; +use tokio::sync::mpsc; + +use crate::clickhouse::{ + backup::{ + metrics::Metrics, + primitives::{ClickhouseIndexableData, ClickhouseRowExt}, + Backup, DiskBackup, DiskBackupConfig, MemoryBackupConfig, + }, + indexer::{default_inserter, ClickhouseInserter, InserterRunner}, +}; /// Equilalent of `clickhouse::inserter::Quantities` with more traits derived. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] @@ -38,3 +53,55 @@ impl From for clickhouse::inserter::Quantities { } } } + +/// Size of the channel buffer for the backup input channel. +/// If we get more than this number of failed commits queued the inserter thread will block. +const BACKUP_INPUT_CHANNEL_BUFFER_SIZE: usize = 128; +const CLICKHOUSE_INSERT_TIMEOUT: Duration = Duration::from_secs(2); +const CLICKHOUSE_END_TIMEOUT: Duration = Duration::from_secs(4); + +/// Main func to spawn the clickhouse inserter and backup tasks. +#[allow(clippy::too_many_arguments)] +pub fn spawn_clickhouse_inserter_and_backup< + DataType: ClickhouseIndexableData + Send + Sync + 'static, + RowType: ClickhouseRowExt, + MetricsType: Metrics + Send + Sync + 'static, +>( + client: &Client, + data_rx: mpsc::Receiver, + task_executor: &TaskExecutor, + clickhouse_table_name: String, + builder_name: String, + disk_database_path: Option>, + disk_max_size_bytes: Option, + memory_max_size_bytes: u64, + tracing_target: &'static str, +) where + for<'a> ::Value<'a>: Sync, +{ + let backup_table_name = RowType::TABLE_NAME.to_string(); + let disk_backup = DiskBackup::new( + DiskBackupConfig::new() + .with_path(disk_database_path) + .with_max_size_bytes(disk_max_size_bytes), // 1 GiB + task_executor, + ) + .expect("could not create disk backup"); + let (failed_commit_tx, failed_commit_rx) = mpsc::channel(BACKUP_INPUT_CHANNEL_BUFFER_SIZE); + let inserter = default_inserter(client, &clickhouse_table_name); + let inserter = ClickhouseInserter::<_, MetricsType>::new(inserter, failed_commit_tx); + // Node name is not used for Blocks. + let inserter_runner = InserterRunner::new(data_rx, inserter, builder_name); + + let backup = Backup::<_, MetricsType>::new( + failed_commit_rx, + client.inserter(&clickhouse_table_name).with_timeouts( + Some(CLICKHOUSE_INSERT_TIMEOUT), + Some(CLICKHOUSE_END_TIMEOUT), + ), + disk_backup.clone(), + ) + .with_memory_backup_config(MemoryBackupConfig::new(memory_max_size_bytes)); + inserter_runner.spawn(task_executor, backup_table_name.clone(), tracing_target); + backup.spawn(task_executor, backup_table_name, tracing_target); +} diff --git a/crates/rbuilder-utils/src/clickhouse/serde.rs b/crates/rbuilder-utils/src/clickhouse/serde.rs new file mode 100644 index 000000000..ab45efb98 --- /dev/null +++ b/crates/rbuilder-utils/src/clickhouse/serde.rs @@ -0,0 +1,82 @@ +//! Serde helpers for Clickhouse. +pub mod u256 { + use alloy_primitives::U256; + use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize as _}; + + /// EVM U256 is represented in big-endian, but ClickHouse expects little-endian. + pub fn serialize(u256: &U256, serializer: S) -> Result { + let buf: [u8; 32] = u256.to_le_bytes(); + buf.serialize(serializer) + } + + /// Deserialize U256 following ClickHouse RowBinary format. + /// + /// ClickHouse stores U256 in little-endian, we have to convert it back to big-endian. + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let buf: [u8; 32] = Deserialize::deserialize(deserializer)?; + Ok(U256::from_le_bytes(buf)) + } +} + +pub mod option_u256 { + use alloy_primitives::U256; + use serde::{de::Deserializer, ser::Serializer, Deserialize}; + + pub fn serialize( + maybe_u256: &Option, + serializer: S, + ) -> Result { + if let Some(u256) = maybe_u256 { + let buf: [u8; 32] = u256.to_le_bytes(); + serializer.serialize_some(&buf) + } else { + serializer.serialize_none() + } + } + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let option: Option<[u8; 32]> = Deserialize::deserialize(deserializer)?; + Ok(option.map(U256::from_le_bytes)) + } +} + +pub mod vec_u256 { + use alloy_primitives::U256; + use serde::{ + de::Deserializer, + ser::{SerializeSeq, Serializer}, + Deserialize, + }; + + /// Serialize Vec following ClickHouse RowBinary format. + /// + /// EVM U256 is represented in big-endian, but ClickHouse expects little-endian. + pub fn serialize(u256es: &[U256], serializer: S) -> Result { + // It consists of a LEB128 length prefix followed by the raw bytes of each U256 in + // little-endian order. + + // + let mut seq = serializer.serialize_seq(Some(u256es.len()))?; + for u256 in u256es { + let buf: [u8; 32] = u256.to_le_bytes(); + seq.serialize_element(&buf)?; + } + seq.end() + } + + /// Deserialize Vec following ClickHouse RowBinary format. + /// + /// ClickHouse stores U256 in little-endian, we have to convert it back to big-endian. + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let vec: Vec<[u8; 32]> = Deserialize::deserialize(deserializer)?; + Ok(vec.into_iter().map(U256::from_le_bytes).collect()) + } +} diff --git a/crates/rbuilder-utils/src/lib.rs b/crates/rbuilder-utils/src/lib.rs index 3de0c1df0..7badcf364 100644 --- a/crates/rbuilder-utils/src/lib.rs +++ b/crates/rbuilder-utils/src/lib.rs @@ -2,6 +2,7 @@ pub mod backoff; pub mod clickhouse; pub mod format; pub mod metrics; +pub mod serde; pub mod tasks { pub use reth_tasks::*; } diff --git a/crates/rbuilder-utils/src/serde/mod.rs b/crates/rbuilder-utils/src/serde/mod.rs new file mode 100644 index 000000000..c25f0c207 --- /dev/null +++ b/crates/rbuilder-utils/src/serde/mod.rs @@ -0,0 +1 @@ +//! Non specific serde helpers.