diff --git a/nomium/shares-logger/Cargo.toml b/nomium/shares-logger/Cargo.toml index 63237a1a80..53896bd226 100644 --- a/nomium/shares-logger/Cargo.toml +++ b/nomium/shares-logger/Cargo.toml @@ -18,6 +18,7 @@ lazy_static = "1.4" config = "0.13" parking_lot = "0.12" nomium-prometheus = { path = "../prometheus" } +rpc_sv2 = { version = "1.0.0", path = "../../roles/roles-utils/rpc" } [dev-dependencies] tokio = { version = "1.0", features = ["rt", "macros", "test-util", "time"] } \ No newline at end of file diff --git a/nomium/shares-logger/src/config/default_config.toml b/nomium/shares-logger/src/config/default_config.toml index d28d3066c6..6601666ee7 100644 --- a/nomium/shares-logger/src/config/default_config.toml +++ b/nomium/shares-logger/src/config/default_config.toml @@ -12,3 +12,11 @@ max_retry_delay_secs = 1 [processing] primary_channel_buffer_size = 10000 backup_check_interval_secs = 1 +block_verification_max_retries = 5 +block_verification_retry_delay_ms = 1000 + +[bitcoin_rpc] +url = "" +port = 0 +user = "" +password = "" diff --git a/nomium/shares-logger/src/config/settings.rs b/nomium/shares-logger/src/config/settings.rs index 58054c8342..172194da01 100644 --- a/nomium/shares-logger/src/config/settings.rs +++ b/nomium/shares-logger/src/config/settings.rs @@ -23,12 +23,23 @@ pub struct ClickhouseSettings { pub struct ProcessingSettings { pub primary_channel_buffer_size: usize, pub backup_check_interval_secs: u64, + pub block_verification_max_retries: u8, + pub block_verification_retry_delay_ms: u64, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct BitcoinRpcSettings { + pub url: String, + pub port: u16, + pub user: String, + pub password: String, } #[derive(Debug, Deserialize, Clone)] pub struct Settings { pub clickhouse: ClickhouseSettings, pub processing: ProcessingSettings, + pub bitcoin_rpc: BitcoinRpcSettings, } impl Settings { diff --git a/nomium/shares-logger/src/services/bitcoin_rpc_service.rs b/nomium/shares-logger/src/services/bitcoin_rpc_service.rs new file mode 100644 index 0000000000..477160d6fd --- /dev/null +++ b/nomium/shares-logger/src/services/bitcoin_rpc_service.rs @@ -0,0 +1,111 @@ +use crate::config::SETTINGS; +use log::{error, info}; +use rpc_sv2::mini_rpc_client::{Auth, MiniRpcClient, RpcError}; + +pub struct BitcoinRpcService { + client: MiniRpcClient, +} + +impl BitcoinRpcService { + pub fn new() -> Result { + let settings = &SETTINGS.bitcoin_rpc; + if settings.url.is_empty() || settings.port == 0 || settings.user.is_empty() || settings.password.is_empty() { + return Err(RpcError::Other( + "Bitcoin RPC settings are not properly configured. \ + Make sure to set SHARES_LOGGER__BITCOIN_RPC__URL, \ + SHARES_LOGGER__BITCOIN_RPC__PORT, \ + SHARES_LOGGER__BITCOIN_RPC__USER, \ + SHARES_LOGGER__BITCOIN_RPC__PASSWORD environment variables.".to_string() + )); + } + let url = format!("{}:{}", settings.url, settings.port); + let auth = Auth::new(settings.user.clone(), settings.password.clone()); + info!(target: "shares", "Initializing Bitcoin RPC client with URL: {}", url); + let client = MiniRpcClient::new(url, auth); + Ok(Self { client }) + } + + pub async fn is_block_in_blockchain(&self, block_hash: &str) -> Result { + info!(target: "shares", "Checking existence of block {} in blockchain", block_hash); + info!(target: "shares", "Using RPC connection: URL={}, User={}", + format!("{}:{}", SETTINGS.bitcoin_rpc.url, SETTINGS.bitcoin_rpc.port), + SETTINGS.bitcoin_rpc.user); + + if block_hash.len() != 64 { + error!(target: "shares", "Invalid block hash format: {}, expected 32-byte hex string (64 characters)", block_hash); + return Err(RpcError::Other(format!("Invalid block hash format: {}", block_hash))); + } + info!(target: "shares", "Sending RPC request getblockheader for block: {}", block_hash); + match self.client.send_json_rpc_request("getblockheader", serde_json::json!([block_hash])).await { + Ok(response) => { + info!(target: "shares", "Received response from RPC server for block {}", block_hash); + info!(target: "shares", "Full response: {}", response); + match serde_json::from_str::(&response) { + Ok(parsed) => { + if parsed.get("result").is_some() { + info!(target: "shares", "Block {} confirmed in blockchain", block_hash); + + if let Some(result) = parsed.get("result") { + if let Some(height) = result.get("height") { + info!(target: "shares", "Block {} height: {}", block_hash, height); + } + if let Some(confirmations) = result.get("confirmations") { + info!(target: "shares", "Number of confirmations for block {}: {}", + block_hash, confirmations); + } + } + Ok(true) + } else if let Some(error) = parsed.get("error") { + let error_code = error.get("code") + .and_then(|c| c.as_i64()) + .unwrap_or(0); + let error_message = error.get("message") + .and_then(|m| m.as_str()) + .unwrap_or("Unknown error"); + info!(target: "shares", "RPC returned error: code={}, message='{}'", + error_code, error_message); + // Code -5 means "Block not found" + if error_code == -5 || error_message.contains("Block not found") { + info!(target: "shares", "Block {} not found in blockchain", block_hash); + return Ok(false); + } + error!(target: "shares", "RPC error when checking block {}: code={}, message='{}'", + block_hash, error_code, error_message); + Err(RpcError::Other( + format!("RPC error: code={}, message='{}'", error_code, error_message) + )) + } else { + error!(target: "shares", "Invalid RPC response format: neither result nor error present"); + Err(RpcError::Other("Invalid RPC response format".into())) + } + }, + Err(e) => { + error!(target: "shares", "Failed to deserialize RPC response: {}", e); + error!(target: "shares", "Raw response: {}", response); + Err(RpcError::Deserialization(format!("JSON parsing error: {}", e))) + } + } + }, + Err(e) => { + error!(target: "shares", "Error executing RPC request for block {}: {:?}", block_hash, e); + match &e { + RpcError::JsonRpc(ref err) => { + error!(target: "shares", "RPC error details: {:?}", err); + if let Some(error) = &err.error { + info!(target: "shares", "RPC returned error: code={}, message='{}'", + error.code, error.message); + if error.code == -5 || error.message.contains("Block not found") { + info!(target: "shares", "Block {} not found in blockchain", block_hash); + return Ok(false); + } + } + }, + _ => { + error!(target: "shares", "Error details: {:?}", e); + } + } + Err(e) + } + } + } +} diff --git a/nomium/shares-logger/src/services/mod.rs b/nomium/shares-logger/src/services/mod.rs index d3aaf1957d..22fb40a8af 100644 --- a/nomium/shares-logger/src/services/mod.rs +++ b/nomium/shares-logger/src/services/mod.rs @@ -1,2 +1,3 @@ pub mod share_processor; -pub mod difficulty; \ No newline at end of file +pub mod difficulty; +pub mod bitcoin_rpc_service; \ No newline at end of file diff --git a/nomium/shares-logger/src/storage/clickhouse/service.rs b/nomium/shares-logger/src/storage/clickhouse/service.rs index 165e7670f5..2dbee30680 100644 --- a/nomium/shares-logger/src/storage/clickhouse/service.rs +++ b/nomium/shares-logger/src/storage/clickhouse/service.rs @@ -6,8 +6,9 @@ use crate::storage::clickhouse::ClickhouseConnectionPool; use crate::traits::ShareStorage; use async_trait::async_trait; use clickhouse::Client; -use log::{info, error}; +use log::{info, error, warn}; use std::sync::Arc; +use rpc_sv2::mini_rpc_client::RpcError; use nomium_prometheus::SHALOG_BATCH_SIZE_CURRENT; @@ -171,12 +172,82 @@ impl ShareStorage for ClickhouseBlockStorage { async fn init(&self) -> Result<(), ClickhouseError> { self.ensure_blocks_table_exists().await } - async fn store_share(&mut self, block: BlockFound) -> Result<(), ClickhouseError> { - self.batch.push(block); + let block_hash_hex = hex::encode(&block.block_hash); + info!(target: "shares", "Processing block {} for storage in DB", block_hash_hex); + let max_retries = SETTINGS.processing.block_verification_max_retries; + let retry_delay_ms = SETTINGS.processing.block_verification_retry_delay_ms; + let rpc_service = match crate::services::bitcoin_rpc_service::BitcoinRpcService::new() { + Ok(service) => service, + Err(e) => { + error!(target: "shares", "Failed to initialize Bitcoin RPC service: {:?}", e); + match &e { + RpcError::Other(msg) => { + error!(target: "shares", "Configuration error: {}", msg); + info!(target: "shares", "Check environment variables for Bitcoin RPC connection"); + }, + _ => { + error!(target: "shares", "Other error during initialization: {:?}", e); + } + } + // Block is not added to batch due to RPC initialization failure + warn!(target: "shares", "Block {} will not be saved to DB due to RPC initialization failure", block_hash_hex); + // TODO: Add custom logic for handling RPC initialization failures (e.g., fallback to alternative verification or notify monitoring) + // Placeholder for future extensions + return Ok(()); + } + }; + + let mut retry_count = 0; + let mut block_exists = false; + let mut last_error = None; + while retry_count < max_retries { + match rpc_service.is_block_in_blockchain(&block_hash_hex).await { + Ok(exists) => { + if exists { + info!(target: "shares", "Block {} confirmed in blockchain on attempt {}, adding to batch", + block_hash_hex, retry_count + 1); + block_exists = true; + break; + } else if retry_count + 1 < max_retries { + info!(target: "shares", "Block {} not found in blockchain on attempt {}, retrying in {}ms", + block_hash_hex, retry_count + 1, retry_delay_ms); + } else { + info!(target: "shares", "Block {} not found in blockchain after {} attempts, skipping", + block_hash_hex, max_retries); + } + }, + Err(e) => { + error!(target: "shares", "Error checking block {} via RPC on attempt {}: {:?}", + block_hash_hex, retry_count + 1, e); + last_error = Some(e); + } + } + retry_count += 1; + if retry_count < max_retries { + tokio::time::sleep(std::time::Duration::from_millis(retry_delay_ms)).await; + } + } + // Processing the result after all attempts + if block_exists { + self.batch.push(block); + } else if let Some(e) = last_error { + error!(target: "shares", "All attempts to check block {} failed with error: {:?}", + block_hash_hex, e); + // Block is not added to batch due to verification error + warn!(target: "shares", "Block {} will not be saved to DB due to verification failure", block_hash_hex); + // TODO: Add custom logic for handling failed verification (e.g., queue for later re-check or notify monitoring) + // Placeholder for future extensions + } else { + error!(target: "shares", "Block {} not found in blockchain after {} attempts", + block_hash_hex, max_retries); + // Block is not added to batch due to not being found in blockchain + warn!(target: "shares", "Block {} will not be saved to DB as it was not found in blockchain", block_hash_hex); + // TODO: Add custom logic for handling blocks not found in blockchain (e.g., queue for later re-check or alternative verification) + // Placeholder for future extensions + } let should_flush = self.batch.len() >= SETTINGS.clickhouse.batch_size - || self.last_flush.elapsed() - >= std::time::Duration::from_secs(SETTINGS.clickhouse.batch_flush_interval_secs); + || self.last_flush.elapsed() >= std::time::Duration::from_secs(SETTINGS.clickhouse.batch_flush_interval_secs); if should_flush { self.flush().await?; } @@ -197,12 +268,10 @@ impl ShareStorage for ClickhouseBlockStorage { let batch_to_flush = self.batch.clone(); let batch_size = batch_to_flush.len(); info!(target: "shares", "Flushing batch of {} block records", batch_size); - let client = self.get_client().await?; let mut batch_inserter = client .insert("blocks") .map_err(|e| ClickhouseError::BatchInsertError(e.to_string()))?; - for block in batch_to_flush.iter() { let clickhouse_block = ClickhouseBlock::from(block.clone()); if let Err(e) = batch_inserter.write(&clickhouse_block).await { @@ -210,7 +279,6 @@ impl ShareStorage for ClickhouseBlockStorage { return Err(ClickhouseError::BatchInsertError(e.to_string())); } } - if let Err(e) = batch_inserter.end().await { error!( "Failed to complete batch insert for blocks: {}. Retrying later.", @@ -218,10 +286,9 @@ impl ShareStorage for ClickhouseBlockStorage { ); return Err(ClickhouseError::BatchInsertError(e.to_string())); } - self.batch.clear(); self.last_flush = std::time::Instant::now(); info!(target: "shares", "Successfully flushed {} block records", batch_size); Ok(()) } -} +} \ No newline at end of file diff --git a/roles/Cargo.lock b/roles/Cargo.lock index 03eb88b09c..cc8a7bfda9 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -3043,6 +3043,7 @@ dependencies = [ "once_cell", "parking_lot", "primitive-types", + "rpc_sv2", "serde", "serde_json", "tokio", diff --git a/roles/env_example b/roles/env_example index 218cf0e507..64160ac006 100644 --- a/roles/env_example +++ b/roles/env_example @@ -34,6 +34,17 @@ SHARES_LOGGER__CLICKHOUSE__MAX_RETRY_DELAY_SECS=1 # Параметры каналов SHARES_LOGGER__PROCESSING__PRIMARY_CHANNEL_BUFFER_SIZE=10000 SHARES_LOGGER__PROCESSING__BACKUP_CHECK_INTERVAL_SECS=1 +# Проверка блока +SHARES_LOGGER__PROCESSING__BLOCK_VERIFICATION_MAX_RETRIES=5 +SHARES_LOGGER__PROCESSING__BLOCK_VERIFICATION_RETRY_DELAY_MS=1000 + +# Bitcoin RPC connection settings +SHARES_LOGGER__BITCOIN_RPC__URL=http://127.0.0.1 +SHARES_LOGGER__BITCOIN_RPC__PORT=28554 +SHARES_LOGGER__BITCOIN_RPC__USER=user +SHARES_LOGGER__BITCOIN_RPC__PASSWORD=pass + +### TPROXY_CONFIG_UPSTREAM_ADDRESS=127.0.0.1 TPROXY_CONFIG_UPSTREAM_PORT=34254 TPROXY_CONFIG_UPSTREAM_AUTHORITY_PUBKEY=9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72 diff --git a/roles/roles-utils/rpc/src/mini_rpc_client.rs b/roles/roles-utils/rpc/src/mini_rpc_client.rs index d0ce084351..a17ebd5217 100644 --- a/roles/roles-utils/rpc/src/mini_rpc_client.rs +++ b/roles/roles-utils/rpc/src/mini_rpc_client.rs @@ -87,7 +87,7 @@ impl MiniRpcClient { } } - async fn send_json_rpc_request( + pub async fn send_json_rpc_request( &self, method: &str, params: serde_json::Value,