Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration_tests/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl TestSetup {
None,
)),
"11111111111111111111111111111111".to_string(),
120
);

let message_parser = MessageParser::new();
Expand Down
16 changes: 16 additions & 0 deletions nft_ingester/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@







## Tips

To set a global limit on request execution for PG DB, you can use the URL parameter **statement_timeout**.

Example:
`postgres://user:password@localhost/dbname?statement_timeout=2000`

To limit only API requests use config option **api_query_max_statement_timeout_sec**

24 changes: 18 additions & 6 deletions nft_ingester/src/api/api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use interface::processing_possibility::ProcessingPossibilityChecker;
use interface::proofs::ProofChecker;
use postgre_client::PgClient;
use std::{sync::Arc, time::Instant};
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::{JoinError, JoinSet};

Expand Down Expand Up @@ -32,6 +33,7 @@ use metrics_utils::ApiMetricsConfig;
use rocks_db::Storage;
use serde_json::{json, Value};
use solana_sdk::pubkey::Pubkey;
use tokio::time::timeout;
use usecase::validation::{validate_opt_pubkey, validate_pubkey};

const MAX_ITEMS_IN_BATCH_REQ: usize = 1000;
Expand Down Expand Up @@ -60,6 +62,7 @@ where
storage_service_base_path: Option<String>,
token_price_fetcher: Arc<TPF>,
native_mint_pubkey: String,
max_query_statement_timeout_sec: u64,
}

pub fn not_found() -> DasApiError {
Expand Down Expand Up @@ -90,6 +93,7 @@ where
storage_service_base_path: Option<String>,
token_price_fetcher: Arc<TPF>,
native_mint_pubkey: String,
max_query_statement_timeout_sec: u64,
) -> Self {
DasApi {
pg_client,
Expand All @@ -105,6 +109,7 @@ where
storage_service_base_path,
token_price_fetcher,
native_mint_pubkey,
max_query_statement_timeout_sec,
}
}

Expand All @@ -116,7 +121,7 @@ where
self.pg_client
.check_health()
.await
.map_err(|_| DasApiError::InternalDdError)?;
.map_err(|e| DasApiError::InternalDdError(e.to_string()))?;

self.metrics
.set_latency(label, latency_timer.elapsed().as_millis() as f64);
Expand Down Expand Up @@ -714,7 +719,7 @@ where
Self::validate_basic_pagination(&pagination, self.max_page_limit)?;
Self::validate_options(&options, &query)?;

let res = search_assets(
let res = timeout(Duration::from_secs(self.max_query_statement_timeout_sec), search_assets(
pg_client,
rocks_db,
query,
Expand All @@ -735,9 +740,16 @@ where
self.metrics.clone(),
&self.tree_gaps_checker,
&self.native_mint_pubkey,
)
.await?;

Ok(res)
)).await;

match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => {
Err(DasApiError::InternalDdError(e.to_string()))
},
Err(_) => {
Err(DasApiError::QueryTimedOut)
}
}
}
}
4 changes: 3 additions & 1 deletion nft_ingester/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ pub enum DasApiError {
#[error("Page number is too big. Up to {0} pages are supported with this kind of pagination. Please use a different pagination(before/after/cursor).")]
PageTooBig(usize),
#[error("Internal DB error")]
InternalDdError,
InternalDdError(String),
#[error("CannotServiceRequest")]
CannotServiceRequest,
#[error("MissingOwnerAddress")]
MissingOwnerAddress,
#[error("QueryTimedOut")]
QueryTimedOut,
}

impl From<DasApiError> for jsonrpc_core::Error {
Expand Down
2 changes: 2 additions & 0 deletions nft_ingester/src/api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub async fn start_api(
account_balance_getter: Arc<AccountBalanceGetterImpl>,
storage_service_base_url: Option<String>,
native_mint_pubkey: String,
api_query_max_statement_timeout_sec: u64,
) -> Result<(), DasApiError> {
let response_middleware = RpcResponseMiddleware {};
let request_middleware = RpcRequestMiddleware::new(archives_dir);
Expand Down Expand Up @@ -127,6 +128,7 @@ pub async fn start_api(
red_metrics,
)),
native_mint_pubkey,
api_query_max_statement_timeout_sec
);

run_api(
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub async fn main() -> Result<(), IngesterError> {
account_balance_getter,
args.storage_service_base_url,
args.native_mint_pubkey,
args.api_query_max_statement_timeout_sec
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ pub async fn main() -> Result<(), IngesterError> {
account_balance_getter,
args.storage_service_base_url,
args.native_mint_pubkey,
args.api_max_query_statement_timeout_sec
)
.await
{
Expand Down
7 changes: 7 additions & 0 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub struct IngesterClapArgs {
requires = "rocks_backup_archives_dir"
)]
pub is_restore_rocks_db: bool,

#[clap(long, env, help = "Rocks backup url")]
pub rocks_backup_url: Option<String>,
#[clap(long, env, help = "Rocks backup archives dir")]
Expand Down Expand Up @@ -251,6 +252,9 @@ pub struct IngesterClapArgs {
#[clap(long, env, default_value = "500", help = "#grpc retry interval millis")]
pub rpc_retry_interval_millis: u64,

#[clap(long, env, default_value = "120", help= "Specifies the maximum execution time of a SQL query for API.")]
pub api_max_query_statement_timeout_sec: u64,

#[clap(
long,
env = "INGESTER_METRICS_PORT",
Expand Down Expand Up @@ -430,6 +434,9 @@ pub struct ApiClapArgs {
#[clap(long, env, default_value = "/usr/src/app/heaps", help = "Heap path")]
pub heap_path: String,

#[clap(long, default_value = "120", help= "Specifies the maximum execution time of a SQL query for API.")]
pub api_query_max_statement_timeout_sec: u64,

#[clap(
long,
env = "API_METRICS_PORT",
Expand Down
Loading
Loading