Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nomium/shares-logger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy_static = "1.4"
config = "0.13"
parking_lot = "0.12"
nomium-prometheus = { path = "../prometheus" }
rpc_sv2 = { version = "1.0.0", path = "../../roles/roles-utils/rpc" }

[dev-dependencies]
tokio = { version = "1.0", features = ["rt", "macros", "test-util", "time"] }
8 changes: 8 additions & 0 deletions nomium/shares-logger/src/config/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@ max_retry_delay_secs = 1
[processing]
primary_channel_buffer_size = 10000
backup_check_interval_secs = 1
block_verification_max_retries = 5
block_verification_retry_delay_ms = 1000

[bitcoin_rpc]
url = ""
port = 0
user = ""
password = ""
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Information security] Хранение пароля в конфигурационном файле в открытом виде повышает риск утечки учетных данных

11 changes: 11 additions & 0 deletions nomium/shares-logger/src/config/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,23 @@ pub struct ClickhouseSettings {
pub struct ProcessingSettings {
pub primary_channel_buffer_size: usize,
pub backup_check_interval_secs: u64,
pub block_verification_max_retries: u8,
pub block_verification_retry_delay_ms: u64,
}

#[derive(Debug, Deserialize, Clone)]
pub struct BitcoinRpcSettings {
pub url: String,
pub port: u16,
pub user: String,
pub password: String,
}

#[derive(Debug, Deserialize, Clone)]
pub struct Settings {
pub clickhouse: ClickhouseSettings,
pub processing: ProcessingSettings,
pub bitcoin_rpc: BitcoinRpcSettings,
}

impl Settings {
Expand Down
111 changes: 111 additions & 0 deletions nomium/shares-logger/src/services/bitcoin_rpc_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::config::SETTINGS;
use log::{error, info};
use rpc_sv2::mini_rpc_client::{Auth, MiniRpcClient, RpcError};

pub struct BitcoinRpcService {
client: MiniRpcClient,
}

impl BitcoinRpcService {
pub fn new() -> Result<Self, RpcError> {
let settings = &SETTINGS.bitcoin_rpc;
if settings.url.is_empty() || settings.port == 0 || settings.user.is_empty() || settings.password.is_empty() {
return Err(RpcError::Other(
"Bitcoin RPC settings are not properly configured. \
Make sure to set SHARES_LOGGER__BITCOIN_RPC__URL, \
SHARES_LOGGER__BITCOIN_RPC__PORT, \
SHARES_LOGGER__BITCOIN_RPC__USER, \
SHARES_LOGGER__BITCOIN_RPC__PASSWORD environment variables.".to_string()
));
}
let url = format!("{}:{}", settings.url, settings.port);
let auth = Auth::new(settings.user.clone(), settings.password.clone());
info!(target: "shares", "Initializing Bitcoin RPC client with URL: {}", url);
let client = MiniRpcClient::new(url, auth);
Ok(Self { client })
}

pub async fn is_block_in_blockchain(&self, block_hash: &str) -> Result<bool, RpcError> {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code quality] Отсутствуют unit-тесты для обработки ошибок RPC и крайних случаев валидации блоков, снижая надёжность критической логики

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code quality] Метод is_block_in_blockchain превышает 80 строк и содержит вложенную обработку ошибок, что снижает читаемость и усложняет тестирование

info!(target: "shares", "Checking existence of block {} in blockchain", block_hash);
info!(target: "shares", "Using RPC connection: URL={}, User={}",
format!("{}:{}", SETTINGS.bitcoin_rpc.url, SETTINGS.bitcoin_rpc.port),
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Information security] Логирование URL и пользователя Bitcoin RPC может помочь злоумышленнику в подборе учетных данных

SETTINGS.bitcoin_rpc.user);

if block_hash.len() != 64 {
error!(target: "shares", "Invalid block hash format: {}, expected 32-byte hex string (64 characters)", block_hash);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Information security] Отсутствует валидация формата хеша блока (проверяется только длина), возможна обработка некорректных/вредоносных данных

return Err(RpcError::Other(format!("Invalid block hash format: {}", block_hash)));
}
info!(target: "shares", "Sending RPC request getblockheader for block: {}", block_hash);
match self.client.send_json_rpc_request("getblockheader", serde_json::json!([block_hash])).await {
Ok(response) => {
info!(target: "shares", "Received response from RPC server for block {}", block_hash);
info!(target: "shares", "Full response: {}", response);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Information security] Логирование полного ответа от RPC-сервера может привести к утечке конфиденциальных данных (токенов, секретов) через логи

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code quality] Избыточное логирование полного ответа RPC-сервера в production-среде создаёт ненужную нагрузку и потенциальные утечки данных

match serde_json::from_str::<serde_json::Value>(&response) {
Ok(parsed) => {
if parsed.get("result").is_some() {
info!(target: "shares", "Block {} confirmed in blockchain", block_hash);

if let Some(result) = parsed.get("result") {
if let Some(height) = result.get("height") {
info!(target: "shares", "Block {} height: {}", block_hash, height);
}
if let Some(confirmations) = result.get("confirmations") {
info!(target: "shares", "Number of confirmations for block {}: {}",
block_hash, confirmations);
}
}
Ok(true)
} else if let Some(error) = parsed.get("error") {
let error_code = error.get("code")
.and_then(|c| c.as_i64())
.unwrap_or(0);
let error_message = error.get("message")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
info!(target: "shares", "RPC returned error: code={}, message='{}'",
error_code, error_message);
// Code -5 means "Block not found"
if error_code == -5 || error_message.contains("Block not found") {
info!(target: "shares", "Block {} not found in blockchain", block_hash);
return Ok(false);
}
error!(target: "shares", "RPC error when checking block {}: code={}, message='{}'",
block_hash, error_code, error_message);
Err(RpcError::Other(
format!("RPC error: code={}, message='{}'", error_code, error_message)
))
} else {
error!(target: "shares", "Invalid RPC response format: neither result nor error present");
Err(RpcError::Other("Invalid RPC response format".into()))
}
},
Err(e) => {
error!(target: "shares", "Failed to deserialize RPC response: {}", e);
error!(target: "shares", "Raw response: {}", response);
Err(RpcError::Deserialization(format!("JSON parsing error: {}", e)))
}
}
},
Err(e) => {
error!(target: "shares", "Error executing RPC request for block {}: {:?}", block_hash, e);
match &e {
RpcError::JsonRpc(ref err) => {
error!(target: "shares", "RPC error details: {:?}", err);
if let Some(error) = &err.error {
info!(target: "shares", "RPC returned error: code={}, message='{}'",
error.code, error.message);
if error.code == -5 || error.message.contains("Block not found") {
info!(target: "shares", "Block {} not found in blockchain", block_hash);
return Ok(false);
}
}
},
_ => {
error!(target: "shares", "Error details: {:?}", e);
}
}
Err(e)
}
}
}
}
3 changes: 2 additions & 1 deletion nomium/shares-logger/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod share_processor;
pub mod difficulty;
pub mod difficulty;
pub mod bitcoin_rpc_service;
87 changes: 77 additions & 10 deletions nomium/shares-logger/src/storage/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::storage::clickhouse::ClickhouseConnectionPool;
use crate::traits::ShareStorage;
use async_trait::async_trait;
use clickhouse::Client;
use log::{info, error};
use log::{info, error, warn};
use std::sync::Arc;
use rpc_sv2::mini_rpc_client::RpcError;

use nomium_prometheus::SHALOG_BATCH_SIZE_CURRENT;

Expand Down Expand Up @@ -171,12 +172,82 @@ impl ShareStorage<BlockFound> for ClickhouseBlockStorage {
async fn init(&self) -> Result<(), ClickhouseError> {
self.ensure_blocks_table_exists().await
}

async fn store_share(&mut self, block: BlockFound) -> Result<(), ClickhouseError> {
self.batch.push(block);
let block_hash_hex = hex::encode(&block.block_hash);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code quality] Нарушение принципа единственной ответственности: логика проверки блока через RPC внедрена в класс хранилища, связывая уровни приложения

info!(target: "shares", "Processing block {} for storage in DB", block_hash_hex);
let max_retries = SETTINGS.processing.block_verification_max_retries;
let retry_delay_ms = SETTINGS.processing.block_verification_retry_delay_ms;
let rpc_service = match crate::services::bitcoin_rpc_service::BitcoinRpcService::new() {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code quality] При ошибке инициализации RPC-клиента блок немедленно отбрасывается без механизма повторной обработки или уведомления, что ведёт к потере данных

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code quality] Для каждого блока создаётся новый экземпляр BitcoinRpcService вместо переиспользования, что приводит к лишним аллокациям и накладным расходам на установку соединения

Ok(service) => service,
Err(e) => {
error!(target: "shares", "Failed to initialize Bitcoin RPC service: {:?}", e);
match &e {
RpcError::Other(msg) => {
error!(target: "shares", "Configuration error: {}", msg);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Information security] Детальное сообщение об ошибке конфигурации раскрывает структуру настроек, что помогает злоумышленнику в атаке

info!(target: "shares", "Check environment variables for Bitcoin RPC connection");
},
_ => {
error!(target: "shares", "Other error during initialization: {:?}", e);
}
}
// Block is not added to batch due to RPC initialization failure
warn!(target: "shares", "Block {} will not be saved to DB due to RPC initialization failure", block_hash_hex);
// TODO: Add custom logic for handling RPC initialization failures (e.g., fallback to alternative verification or notify monitoring)
// Placeholder for future extensions
return Ok(());
}
};

let mut retry_count = 0;
let mut block_exists = false;
let mut last_error = None;
while retry_count < max_retries {
match rpc_service.is_block_in_blockchain(&block_hash_hex).await {
Ok(exists) => {
if exists {
info!(target: "shares", "Block {} confirmed in blockchain on attempt {}, adding to batch",
block_hash_hex, retry_count + 1);
block_exists = true;
break;
} else if retry_count + 1 < max_retries {
info!(target: "shares", "Block {} not found in blockchain on attempt {}, retrying in {}ms",
block_hash_hex, retry_count + 1, retry_delay_ms);
} else {
info!(target: "shares", "Block {} not found in blockchain after {} attempts, skipping",
block_hash_hex, max_retries);
}
},
Err(e) => {
error!(target: "shares", "Error checking block {} via RPC on attempt {}: {:?}",
block_hash_hex, retry_count + 1, e);
last_error = Some(e);
}
}
retry_count += 1;
if retry_count < max_retries {
tokio::time::sleep(std::time::Duration::from_millis(retry_delay_ms)).await;
}
}
// Processing the result after all attempts
if block_exists {
self.batch.push(block);
} else if let Some(e) = last_error {
error!(target: "shares", "All attempts to check block {} failed with error: {:?}",
block_hash_hex, e);
// Block is not added to batch due to verification error
warn!(target: "shares", "Block {} will not be saved to DB due to verification failure", block_hash_hex);
// TODO: Add custom logic for handling failed verification (e.g., queue for later re-check or notify monitoring)
// Placeholder for future extensions
} else {
error!(target: "shares", "Block {} not found in blockchain after {} attempts",
block_hash_hex, max_retries);
// Block is not added to batch due to not being found in blockchain
warn!(target: "shares", "Block {} will not be saved to DB as it was not found in blockchain", block_hash_hex);
// TODO: Add custom logic for handling blocks not found in blockchain (e.g., queue for later re-check or alternative verification)
// Placeholder for future extensions
}
let should_flush = self.batch.len() >= SETTINGS.clickhouse.batch_size
|| self.last_flush.elapsed()
>= std::time::Duration::from_secs(SETTINGS.clickhouse.batch_flush_interval_secs);
|| self.last_flush.elapsed() >= std::time::Duration::from_secs(SETTINGS.clickhouse.batch_flush_interval_secs);
if should_flush {
self.flush().await?;
}
Expand All @@ -197,31 +268,27 @@ impl ShareStorage<BlockFound> for ClickhouseBlockStorage {
let batch_to_flush = self.batch.clone();
let batch_size = batch_to_flush.len();
info!(target: "shares", "Flushing batch of {} block records", batch_size);

let client = self.get_client().await?;
let mut batch_inserter = client
.insert("blocks")
.map_err(|e| ClickhouseError::BatchInsertError(e.to_string()))?;

for block in batch_to_flush.iter() {
let clickhouse_block = ClickhouseBlock::from(block.clone());
if let Err(e) = batch_inserter.write(&clickhouse_block).await {
error!("Failed to write block during flush: {}. Retrying later.", e);
return Err(ClickhouseError::BatchInsertError(e.to_string()));
}
}

if let Err(e) = batch_inserter.end().await {
error!(
"Failed to complete batch insert for blocks: {}. Retrying later.",
e
);
return Err(ClickhouseError::BatchInsertError(e.to_string()));
}

self.batch.clear();
self.last_flush = std::time::Instant::now();
info!(target: "shares", "Successfully flushed {} block records", batch_size);
Ok(())
}
}
}
1 change: 1 addition & 0 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions roles/env_example
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ SHARES_LOGGER__CLICKHOUSE__MAX_RETRY_DELAY_SECS=1
# Параметры каналов
SHARES_LOGGER__PROCESSING__PRIMARY_CHANNEL_BUFFER_SIZE=10000
SHARES_LOGGER__PROCESSING__BACKUP_CHECK_INTERVAL_SECS=1
# Проверка блока
SHARES_LOGGER__PROCESSING__BLOCK_VERIFICATION_MAX_RETRIES=5
SHARES_LOGGER__PROCESSING__BLOCK_VERIFICATION_RETRY_DELAY_MS=1000

# Bitcoin RPC connection settings
SHARES_LOGGER__BITCOIN_RPC__URL=http://127.0.0.1
SHARES_LOGGER__BITCOIN_RPC__PORT=28554
SHARES_LOGGER__BITCOIN_RPC__USER=user
SHARES_LOGGER__BITCOIN_RPC__PASSWORD=pass
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Information security] Пример файла окружения содержит захардкоженные учетные данные (user/pass), что провоцирует их использование в продакшене


###
TPROXY_CONFIG_UPSTREAM_ADDRESS=127.0.0.1
TPROXY_CONFIG_UPSTREAM_PORT=34254
TPROXY_CONFIG_UPSTREAM_AUTHORITY_PUBKEY=9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72
Expand Down
2 changes: 1 addition & 1 deletion roles/roles-utils/rpc/src/mini_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl MiniRpcClient {
}
}

async fn send_json_rpc_request(
pub async fn send_json_rpc_request(
&self,
method: &str,
params: serde_json::Value,
Expand Down