From a60b574b884aaad7cd6fb85d6707aa56e30daedc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E9=94=A6=E4=BD=91?= Date: Thu, 4 Jul 2024 10:28:07 +0800 Subject: [PATCH] optimize api. (#23) * optimize api. --- indexer/Cargo.toml | 6 ++- indexer/config.toml | 10 ++-- indexer/src/contract.rs | 114 +++++++++++++++++++++++++--------------- indexer/src/error.rs | 44 +++++++++------- indexer/src/main.rs | 5 ++ scanner/src/db.rs | 2 +- scanner/src/scanner.rs | 7 ++- 7 files changed, 119 insertions(+), 69 deletions(-) diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 41d4834..9ec1d8a 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -7,17 +7,19 @@ edition = "2021" [dependencies] anyhow = "1.0.80" -axum = "0.7.4" +axum = "0.7.5" axum-macros = "0.4.1" env_logger = "0.11.2" ethers = { version = "2.0", features = ["abigen", "legacy"] } log = "0.4.20" +redis = { version = "0.25.4", features = ["json", "tokio-comp"] } rustc-hex = "2.1.0" serde = "1.0.197" serde_json = "1.0.114" -sqlx = { version = "0.7.3", features = ["bigdecimal", "runtime-tokio", "postgres", "chrono", "json"]} +sqlx = { version = "0.7.4", features = ["bigdecimal", "runtime-tokio", "postgres", "chrono", "json"]} tokio = { version = "1.36.0", features = ["full"]} toml = "0.8.11" tower-http = { version = "0.5.2",features = ["cors"] } tracing-subscriber = "0.3.18" url = "2.5.0" +rand = "0.8.5" diff --git a/indexer/config.toml b/indexer/config.toml index fbcb9d1..d1923b3 100644 --- a/indexer/config.toml +++ b/indexer/config.toml @@ -1,5 +1,7 @@ -evm_rpc = "web3 endopoint" -staking = "staking contract address" -reward = "reward contrat address" +evm_rpc = "https://rpc-mainnet.findora.org/" +staking = "0x7a598dEf738a01D771fF92Be33064D5c5E0BC12C" +reward = "0xEDA79C4dA47E9b27820Ef244aa2af7a50657e443" listen = "0.0.0.0:3000" -db_url = "postgres://postgres:12345678@localhost/postgres" +db_url = "postgres://postgres:csq2400306@localhost/postgres" +redis_url = "redis://127.0.0.1/11" + diff --git a/indexer/src/contract.rs b/indexer/src/contract.rs index 019a923..6d9d4f8 100644 --- a/indexer/src/contract.rs +++ b/indexer/src/contract.rs @@ -8,12 +8,17 @@ use axum::extract::{Query, State}; use axum::Json; use ethers::types::H160; use ethers::utils::hex; +use rand::Rng; +use redis::{Commands, Connection, RedisResult, SetExpiry, SetOptions}; use serde::{Deserialize, Serialize}; use sqlx::types::BigDecimal; use sqlx::Row; use std::str::FromStr; use std::sync::Arc; +const KEY_BOUND_PREFIX: &str = "E:BND"; +const KEY_REWARD_PREFIX: &str = "E:RWD"; + #[derive(Serialize, Deserialize)] pub struct ValidatorStatusParams { pub address: String, @@ -38,7 +43,7 @@ pub async fn get_validator_status( }; Ok(Json(res)) } - Err(e) => Err(IndexerError::Custom(e.to_string())), + Err(e) => Err(IndexerError::IndexerCustom(e.to_string())), } } @@ -67,7 +72,7 @@ pub async fn get_validator_data( }; Ok(Json(res)) } - Err(e) => Err(IndexerError::Custom(e.to_string())), + Err(e) => Err(IndexerError::IndexerCustom(e.to_string())), } } @@ -81,16 +86,31 @@ pub async fn get_delegator_bound( State(state): State>, params: Query, ) -> Result> { - let staking = state.staking.clone(); let validator = H160::from_str(¶ms.validator)?; let delegator = H160::from_str(¶ms.delegator)?; - - match staking.delegators(validator, delegator).call().await { - Ok(info) => Ok(Json(BoundResponse { - bound_amount: info.0.to_string(), - unbound_amount: info.1.to_string(), - })), - Err(e) => return Err(IndexerError::Custom(e.to_string())), + let mut conn = state.redis.clone().get_connection()?; + let key = format!("{}:{:?}:{:?}", KEY_BOUND_PREFIX, delegator, validator); + let r: RedisResult = conn.get(&key); + match r { + Ok(data) => { + let resp: BoundResponse = serde_json::from_str(&data).unwrap(); + Ok(Json(resp)) + } + Err(_) => { + let staking = state.staking.clone(); + match staking.delegators(validator, delegator).call().await { + Ok((bound, unbound)) => { + let resp = BoundResponse { + bound_amount: bound.to_string(), + unbound_amount: unbound.to_string(), + }; + let data = serde_json::to_string(&resp).unwrap(); + set_to_redis(&mut conn, &key, &data)?; + Ok(Json(resp)) + } + Err(e) => return Err(IndexerError::IndexerCustom(e.to_string())), + } + } } } @@ -105,15 +125,39 @@ pub async fn get_delegator_reward( ) -> Result> { let reward = state.reward.clone(); let delegator = H160::from_str(¶ms.0.address)?; - - match reward.rewards(delegator).call().await { - Ok(amount) => Ok(Json(RewardResponse { - reward: amount.to_string(), - })), - Err(e) => return Err(IndexerError::Custom(e.to_string())), + let mut conn = state.redis.clone().get_connection()?; + let key = format!("{}:{:?}", KEY_REWARD_PREFIX, delegator); + let r: RedisResult = conn.get(&key); + match r { + Ok(data) => { + let resp: RewardResponse = serde_json::from_str(&data).unwrap(); + Ok(Json(resp)) + } + Err(_) => match reward.rewards(delegator).call().await { + Ok(amount) => { + let resp = RewardResponse { + reward: amount.to_string(), + }; + let data = serde_json::to_string(&resp).unwrap(); + set_to_redis(&mut conn, &key, &data)?; + Ok(Json(resp)) + } + Err(e) => return Err(IndexerError::IndexerCustom(e.to_string())), + }, } } +fn set_to_redis(conn: &mut Connection, key: &str, value: &str) -> Result<()> { + let mut rng = rand::thread_rng(); + let r = rng.gen_range(0..10); + let ms = 10 * 60 * 1000 + r * 1000; // 10 min + r min + let opts = SetOptions::default() + .get(true) + .with_expiration(SetExpiry::PX(ms)); + conn.set_options(&key, value, opts)?; + Ok(()) +} + #[derive(Serialize, Deserialize)] pub struct DelegatorDebtParams { pub validator: String, @@ -131,7 +175,7 @@ pub async fn get_delegator_debt( Ok(amount) => Ok(Json(DebtResponse { debt: amount.to_string(), })), - Err(e) => Err(IndexerError::Custom(e.to_string())), + Err(e) => Err(IndexerError::IndexerCustom(e.to_string())), } } @@ -145,31 +189,17 @@ pub async fn get_delegator_sum( params: Query, ) -> Result> { let mut pool = state.pool.acquire().await?; - - let sql_delegate = r#"SELECT sum(amount) as s FROM evm_delegations WHERE delegator=$1"#; - let sql_undelegate = r#"SELECT sum(amount) as s FROM evm_undelegations WHERE delegator=$1"#; - let sql_claim = r#"SELECT sum(amount) as s FROM evm_coinbase_mint WHERE delegator=$1"#; - - let sum_delegate: BigDecimal = sqlx::query(sql_delegate) - .bind(¶ms.0.address) - .fetch_one(&mut *pool) - .await? - .try_get("s") - .unwrap_or_default(); - - let sum_undelegate: BigDecimal = sqlx::query(sql_undelegate) - .bind(¶ms.0.address) - .fetch_one(&mut *pool) - .await? - .try_get("s") - .unwrap_or_default(); - - let sum_claim: BigDecimal = sqlx::query(sql_claim) - .bind(¶ms.0.address) - .fetch_one(&mut *pool) - .await? - .try_get("s") - .unwrap_or_default(); + let address = params.0.address; + let sql_query = format!( + "SELECT (SELECT sum(amount) FROM evm_delegations WHERE delegator='{}') as sd, \ + (SELECT sum(amount) FROM evm_undelegations WHERE delegator='{}') as sund, \ + (SELECT sum(amount) FROM evm_coinbase_mint WHERE delegator='{}') as sc", + address, address, address + ); + let row = sqlx::query(&sql_query).fetch_one(&mut *pool).await?; + let sum_delegate: BigDecimal = row.try_get("sd").unwrap_or_default(); + let sum_undelegate: BigDecimal = row.try_get("sund").unwrap_or_default(); + let sum_claim: BigDecimal = row.try_get("sc").unwrap_or_default(); Ok(Json(DelegatorSumResponse { delegate: sum_delegate.to_string(), diff --git a/indexer/src/error.rs b/indexer/src/error.rs index b31843b..ad3fcd8 100644 --- a/indexer/src/error.rs +++ b/indexer/src/error.rs @@ -3,47 +3,54 @@ use axum::response::{IntoResponse, Response}; #[derive(Debug)] pub enum IndexerError { - Custom(String), - DBError(sqlx::Error), - IOError(std::io::Error), - TomlDeError(toml::de::Error), - HexError(rustc_hex::FromHexError), - ParseUrlError(url::ParseError), + IndexerCustom(String), + IndexerDBError(sqlx::Error), + IndexerIOError(std::io::Error), + IndexerTomlDeError(toml::de::Error), + IndexerHexError(rustc_hex::FromHexError), + IndexerParseUrlError(url::ParseError), + IndexerRedisError(redis::RedisError), +} + +impl From for IndexerError { + fn from(e: redis::RedisError) -> Self { + IndexerError::IndexerRedisError(e) + } } impl From for IndexerError { fn from(e: String) -> Self { - IndexerError::Custom(e) + IndexerError::IndexerCustom(e) } } impl From for IndexerError { fn from(e: url::ParseError) -> Self { - IndexerError::ParseUrlError(e) + IndexerError::IndexerParseUrlError(e) } } impl From for IndexerError { fn from(e: rustc_hex::FromHexError) -> Self { - IndexerError::HexError(e) + IndexerError::IndexerHexError(e) } } impl From for IndexerError { fn from(e: std::io::Error) -> Self { - IndexerError::IOError(e) + IndexerError::IndexerIOError(e) } } impl From for IndexerError { fn from(e: toml::de::Error) -> Self { - IndexerError::TomlDeError(e) + IndexerError::IndexerTomlDeError(e) } } impl From for IndexerError { fn from(e: sqlx::Error) -> Self { - IndexerError::DBError(e) + IndexerError::IndexerDBError(e) } } @@ -52,12 +59,13 @@ pub type Result = core::result::Result; impl IntoResponse for IndexerError { fn into_response(self) -> Response { let err_msg = match self { - IndexerError::Custom(e) => e, - IndexerError::DBError(e) => e.to_string(), - IndexerError::IOError(e) => e.to_string(), - IndexerError::TomlDeError(e) => e.to_string(), - IndexerError::HexError(e) => e.to_string(), - IndexerError::ParseUrlError(e) => e.to_string(), + IndexerError::IndexerCustom(e) => e, + IndexerError::IndexerDBError(e) => e.to_string(), + IndexerError::IndexerIOError(e) => e.to_string(), + IndexerError::IndexerTomlDeError(e) => e.to_string(), + IndexerError::IndexerHexError(e) => e.to_string(), + IndexerError::IndexerParseUrlError(e) => e.to_string(), + IndexerError::IndexerRedisError(e) => e.to_string(), }; (StatusCode::INTERNAL_SERVER_ERROR, err_msg).into_response() diff --git a/indexer/src/main.rs b/indexer/src/main.rs index 84d9a07..e3c2311 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -48,7 +48,9 @@ struct IndexerConfig { pub reward: String, pub listen: String, pub db_url: String, + pub redis_url: String, } + impl IndexerConfig { pub fn new(file_path: &str) -> Result { let mut f = File::open(file_path)?; @@ -61,6 +63,7 @@ impl IndexerConfig { struct AppState { pub pool: PgPool, + pub redis: redis::Client, pub staking: StakingContract>, pub reward: RewardContract>, } @@ -87,8 +90,10 @@ async fn main() -> Result<()> { .expect("can't connect to database"); info!("Connecting db...ok"); + let redis_client = redis::Client::open(config.redis_url)?; let app_state = Arc::new(AppState { pool, + redis: redis_client, staking, reward, }); diff --git a/scanner/src/db.rs b/scanner/src/db.rs index 057808a..094e9cb 100644 --- a/scanner/src/db.rs +++ b/scanner/src/db.rs @@ -9,7 +9,7 @@ use sqlx::{PgPool, Row}; pub struct Storage { pool: PgPool, } - +#[allow(dead_code)] impl Storage { pub fn new(pool: PgPool) -> Self { Self { pool } diff --git a/scanner/src/scanner.rs b/scanner/src/scanner.rs index c57fd62..b384f05 100644 --- a/scanner/src/scanner.rs +++ b/scanner/src/scanner.rs @@ -6,7 +6,7 @@ use ethers::contract::{parse_log, EthEvent}; use ethers::prelude::{Middleware, TransactionReceipt}; use ethers::providers::{Http, Provider}; use ethers::types::U256; -use ethers::types::{Address, Block, Transaction}; +use ethers::types::{Address, Block}; use ethers::types::{Bytes, TxHash}; use ethers::utils::hex::encode_prefixed; use log::{debug, error, info}; @@ -41,6 +41,7 @@ const EVENT_UPDATE_VALIDATOR_TOPIC: &str = const EVENT_COINBASE_MINT_TOPIC: &str = "0xb2cf206b70e745484dd39dc6b8e6166ce07246bd00baa4bd059f15733b2130e9"; +#[allow(dead_code)] pub struct FindoraRPC { pub url: Url, pub client: Client, @@ -54,6 +55,7 @@ struct EthRpcRequest { pub params: T, } +#[allow(dead_code)] impl EthRpcRequest { pub fn body(method: &str, params: T) -> Self { EthRpcRequest { @@ -72,6 +74,7 @@ struct GetBlockByNumberResponse { pub result: Option, } +#[allow(dead_code)] impl FindoraRPC { pub fn new(timeout: Duration, url: Url) -> Self { let client = ClientBuilder::new().timeout(timeout).build().unwrap(); @@ -445,7 +448,7 @@ impl Scanner { Ok(succeed_cnt.load(Ordering::Acquire)) } - pub async fn run(&self, start: u64, interval: Duration, single: bool) -> Result<()> { + pub async fn run(&self, start: u64, _interval: Duration, single: bool) -> Result<()> { match single { true => { info!("Single syncing...");