diff --git a/Cargo.lock b/Cargo.lock index 01b91880..0d0f609b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,13 +1290,16 @@ dependencies = [ "futures", "interface", "plerkle_serialization", + "serde_json", "solana-client", "solana-program", "solana-sdk", "solana-transaction-status", "tokio", + "tokio-retry", "tracing", "usecase", + "warp", ] [[package]] @@ -4700,6 +4703,24 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multer" version = "3.1.0" @@ -4800,7 +4821,7 @@ dependencies = [ "moka", "mpl-bubblegum", "mpl-token-metadata", - "multer", + "multer 3.1.0", "num-bigint 0.4.5", "num-traits", "plerkle_messenger", @@ -6777,6 +6798,12 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -7894,8 +7921,8 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite", - "tungstenite", + "tokio-tungstenite 0.20.1", + "tungstenite 0.20.1", "url", ] @@ -9735,10 +9762,22 @@ dependencies = [ "rustls 0.21.12", "tokio", "tokio-rustls 0.24.1", - "tungstenite", + "tungstenite 0.20.1", "webpki-roots 0.25.4", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -10160,6 +10199,25 @@ dependencies = [ "webpki-roots 0.24.0", ] +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand 0.8.5", + "sha1 0.10.6", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typed-builder" version = "0.5.1" @@ -10435,6 +10493,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper", + "log", + "mime", + "mime_guess", + "multer 2.1.0", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite 0.21.0", + "tokio-util 0.7.11", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/backfill_rpc/Cargo.toml b/backfill_rpc/Cargo.toml index 0fdcc7b6..225aa998 100644 --- a/backfill_rpc/Cargo.toml +++ b/backfill_rpc/Cargo.toml @@ -19,6 +19,11 @@ plerkle_serialization = { workspace = true } flatbuffers = { workspace = true } tracing = { workspace = true } usecase = { path = "../usecase" } +tokio-retry = { workspace = true } [features] rpc_tests = [] + +[dev-dependencies] +serde_json = { workspace = true } +warp = "0.3.7" diff --git a/backfill_rpc/src/block_producer.rs b/backfill_rpc/src/block_producer.rs index 5799d2af..20f6fe03 100644 --- a/backfill_rpc/src/block_producer.rs +++ b/backfill_rpc/src/block_producer.rs @@ -8,7 +8,7 @@ use solana_transaction_status::{TransactionDetails, UiConfirmedBlock}; use tracing::error; use usecase::bigtable::is_bubblegum_transaction_encoded; -use crate::rpc::{BackfillRPC, GET_TX_RETRIES}; +use crate::rpc::{BackfillRPC, MAX_RPC_RETRIES}; const SECONDS_TO_RETRY_GET_BLOCK: u64 = 5; @@ -19,7 +19,7 @@ impl BlockProducer for BackfillRPC { slot: u64, _backup_provider: Option>, ) -> Result { - let mut counter = GET_TX_RETRIES; + let mut counter = MAX_RPC_RETRIES; loop { let mut encoded_block = match self diff --git a/backfill_rpc/src/rpc.rs b/backfill_rpc/src/rpc.rs index c59d3bdf..b209ca93 100644 --- a/backfill_rpc/src/rpc.rs +++ b/backfill_rpc/src/rpc.rs @@ -1,4 +1,13 @@ -use std::{str::FromStr, sync::Arc, time::Duration}; +use std::{ + future::Future, + ops::Deref, + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use async_trait::async_trait; use entities::models::{BufferedTransaction, SignatureWithSlot}; @@ -8,9 +17,14 @@ use interface::{ error::UsecaseError, slot_getter::FinalizedSlotGetter, solana_rpc::TransactionsGetter, }; use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; +#[cfg(test)] +use solana_client::rpc_sender::RpcSender; use solana_client::{ - nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, + client_error::{ClientError, ClientErrorKind}, + nonblocking::rpc_client::RpcClient, + rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, + rpc_request::RpcError, }; use solana_program::pubkey::Pubkey; use solana_sdk::{ @@ -18,16 +32,65 @@ use solana_sdk::{ signature::Signature, }; use solana_transaction_status::UiTransactionEncoding; +use tokio::sync::Notify; +use tokio_retry::{strategy::ExponentialBackoff, RetryIf}; const MAX_SIGNATURES_LIMIT: usize = 50_000_000; -pub(crate) const GET_TX_RETRIES: usize = 7; +const INITIAL_BACKOFF_DELAY_MS: u64 = 1_000; +pub(crate) const MAX_RPC_RETRIES: usize = 7; pub struct BackfillRPC { - pub(crate) client: Arc, + pub(crate) client: BackoffRpcClient, } impl BackfillRPC { pub fn connect(addr: String) -> Self { - Self { client: Arc::new(RpcClient::new(addr)) } + let client = RpcClient::new(addr); + Self { client: BackoffRpcClient::new_with_default_backoff(client) } + } + + pub(crate) async fn get_signatures_by_address( + &self, + until: Option, + before: Option, + address: &Pubkey, + ) -> Result, UsecaseError> { + let signatures = self + .client + .execute_with_backoff(|| async { + self.client + .get_signatures_for_address_with_config( + address, + GetConfirmedSignaturesForAddress2Config { + until, + commitment: Some(CommitmentConfig { + commitment: CommitmentLevel::Finalized, + }), + before, + ..Default::default() + }, + ) + .await + }) + .await; + signatures + .map_err(Into::::into)? + .into_iter() + .map(|response| { + Ok(SignatureWithSlot { + signature: Signature::from_str(&response.signature)?, + slot: response.slot, + }) + }) + .collect::, UsecaseError>>() + } + + #[cfg(test)] + pub(crate) fn new_with_sender_and_constant_backoff( + sender: impl RpcSender + Send + Sync + 'static, + ) -> Self { + let client = RpcClient::new_sender(sender, Default::default()); + + Self { client: BackoffRpcClient::new_test(client) } } } @@ -75,7 +138,7 @@ impl TransactionsGetter for BackfillRPC { let client = self.client.clone(); async move { let mut response = Ok(BufferedTransaction::default()); - for _ in 0..GET_TX_RETRIES { + for _ in 0..MAX_RPC_RETRIES { response = client .get_transaction_with_config( &signature, @@ -123,43 +186,20 @@ impl TransactionsGetter for BackfillRPC { } } -impl BackfillRPC { - pub(crate) async fn get_signatures_by_address( - &self, - until: Option, - before: Option, - address: &Pubkey, - ) -> Result, UsecaseError> { - self.client - .get_signatures_for_address_with_config( - address, - GetConfirmedSignaturesForAddress2Config { - until, - commitment: Some(CommitmentConfig { commitment: CommitmentLevel::Finalized }), - before, - ..Default::default() - }, - ) - .await - .map_err(Into::::into)? - .into_iter() - .map(|response| { - Ok(SignatureWithSlot { - signature: Signature::from_str(&response.signature)?, - slot: response.slot, - }) - }) - .collect::, UsecaseError>>() - } -} - #[async_trait] impl FinalizedSlotGetter for BackfillRPC { async fn get_finalized_slot(&self) -> Result { - Ok(self + let slot = self .client - .get_slot_with_commitment(CommitmentConfig { commitment: CommitmentLevel::Finalized }) - .await?) + .execute_with_backoff(|| async move { + self.client + .get_slot_with_commitment(CommitmentConfig { + commitment: CommitmentLevel::Finalized, + }) + .await + }) + .await?; + Ok(slot) } async fn get_finalized_slot_no_error(&self) -> u64 { match self.get_finalized_slot().await { @@ -173,53 +213,192 @@ impl FinalizedSlotGetter for BackfillRPC { } } -#[cfg(feature = "rpc_tests")] -#[tokio::test] -async fn test_rpc_get_signatures_by_address() { - let client = BackfillRPC::connect("https://api.mainnet-beta.solana.com".to_string()); - let signatures = client - .get_signatures_by_address( - Some(Signature::default()), - None, - &Pubkey::from_str("Vote111111111111111111111111111111111111111").unwrap(), - ) - .await - .unwrap(); - - assert_eq!(signatures.len(), 1000) +#[derive(Clone)] +pub struct BackoffRpcClient { + client: Arc, + backoff: ExponentialBackoff, + is_in_backoff: Arc, + backoff_finish_notify: Arc, +} + +impl BackoffRpcClient { + pub fn new_with_default_backoff(client: RpcClient) -> Self { + // 2 seconds base delay + let backoff = ExponentialBackoff::from_millis(2 * INITIAL_BACKOFF_DELAY_MS / 1000) + .factor(1000) + .max_delay(Duration::from_secs(30)); + + Self { + client: Arc::new(client), + backoff, + is_in_backoff: Arc::new(AtomicBool::new(false)), + backoff_finish_notify: Arc::new(Notify::new()), + } + } + + #[cfg(test)] + pub(crate) fn new_test(client: RpcClient) -> Self { + // constant 1 second interval + let backoff = ExponentialBackoff::from_millis(1).factor(1000); + + Self { + client: Arc::new(client), + backoff, + is_in_backoff: Arc::new(AtomicBool::new(false)), + backoff_finish_notify: Arc::new(Notify::new()), + } + } + + pub async fn execute_with_backoff< + R, + T: Future>, + F: FnMut() -> T, + >( + &self, + call: F, + ) -> Result { + if self.is_in_backoff.load(Ordering::Relaxed) { + self.backoff_finish_notify.notified().await; + } + let result = + RetryIf::spawn(self.backoff.clone().take(MAX_RPC_RETRIES), call, |e: &ClientError| { + let kind = e.kind(); + match kind { + ClientErrorKind::RpcError(RpcError::RpcRequestError(s)) => { + let retry = s.contains("429 Too Many Requests"); + if retry { + self.is_in_backoff.store(true, Ordering::Relaxed); + } + retry + }, + _ => false, + } + }) + .await; + self.backoff_finish_notify.notify_one(); + result + } +} + +impl Deref for BackoffRpcClient { + type Target = RpcClient; + + fn deref(&self) -> &Self::Target { + &self.client + } } -#[cfg(feature = "rpc_tests")] -#[tokio::test] -async fn test_rpc_get_txs_by_signatures() { - let client = BackfillRPC::connect("https://docs-demo.solana-mainnet.quiknode.pro/".to_string()); - let signatures = vec![ +#[cfg(all(test, feature = "rpc_tests"))] +mod tests { + use solana_client::{ + client_error::reqwest, + rpc_request::RpcRequest, + rpc_sender::{RpcSender, RpcTransportStats}, + }; + use warp::Filter; + + use super::*; + + struct TooManyRequestsRpcSender(String); + + #[async_trait] + impl RpcSender for TooManyRequestsRpcSender { + async fn send( + &self, + request: RpcRequest, + _params: serde_json::Value, + ) -> Result { + let err = reqwest::get(format!("http://{}/", self.0)) + .await + .expect("request to test server to succeed"); + Err(ClientError::new_with_request( + ClientErrorKind::Reqwest( + err.error_for_status().expect_err("429 server must indeed return 429"), + ), + request, + )) + } + fn get_transport_stats(&self) -> RpcTransportStats { + Default::default() + } + fn url(&self) -> String { + Default::default() + } + } + + #[tokio::test] + async fn test_rpc_get_signatures_by_address() { + let client = BackfillRPC::connect("https://api.mainnet-beta.solana.com".to_string()); + let signatures = client + .get_signatures_by_address( + Some(Signature::default()), + None, + &Pubkey::from_str("Vote111111111111111111111111111111111111111").unwrap(), + ) + .await + .unwrap(); + + assert_eq!(signatures.len(), 1000) + } + + #[tokio::test] + async fn test_backoff_client_failure() { + // Create a route that matches requests to "/" (the root path). + let route = warp::path::end().map(|| { + warp::reply::with_status( + warp::reply::json(&serde_json::Value::Null), + warp::http::StatusCode::TOO_MANY_REQUESTS, + ) + }); + + let (addr, server) = warp::serve(route).bind_ephemeral(([127, 0, 0, 1], 0)); + let _server_handle = tokio::spawn(server); + let client = BackfillRPC::new_with_sender_and_constant_backoff(TooManyRequestsRpcSender( + addr.to_string(), + )); + let err = client + .get_signatures_by_address( + Some(Signature::default()), + None, + &Pubkey::from_str("Vote111111111111111111111111111111111111111").unwrap(), + ) + .await + .unwrap_err(); + assert!(err.to_string().contains("429")); + } + + #[tokio::test] + async fn test_rpc_get_txs_by_signatures() { + let client = + BackfillRPC::connect("https://docs-demo.solana-mainnet.quiknode.pro/".to_string()); + let signatures = vec![ Signature::from_str("2H4c1LcgWG2VuxE4rb318spyiMe1Aet5AysQHAB3Pm3z9nadxJH4C1GZD8yMeAgjdzojmLZGQppuiZqG2oKrtwF2").unwrap(), Signature::from_str("265JP2HS6DwJPS4Htk2msUbxbrdeHLFVXUTFVSZ7CyMrHM8xXTxZJpLpt67kKHPAUVtEj7i3fWb5Z9vqMHREHmVm").unwrap(), ]; - let txs = client.get_txs_by_signatures(signatures, 0).await.unwrap(); + let txs = client.get_txs_by_signatures(signatures, 0).await.unwrap(); - let parsed_txs = txs - .iter() - .map(|tx| { - plerkle_serialization::root_as_transaction_info(tx.transaction.as_slice()).unwrap() - }) - .collect::>(); + let parsed_txs = txs + .iter() + .map(|tx| { + plerkle_serialization::root_as_transaction_info(tx.transaction.as_slice()).unwrap() + }) + .collect::>(); - assert_eq!(parsed_txs.len(), 2); - assert_eq!(parsed_txs[0].signature(), Some("2H4c1LcgWG2VuxE4rb318spyiMe1Aet5AysQHAB3Pm3z9nadxJH4C1GZD8yMeAgjdzojmLZGQppuiZqG2oKrtwF2")); - assert_eq!(parsed_txs[1].slot(), 240869063) -} + assert_eq!(parsed_txs.len(), 2); + assert_eq!(parsed_txs[0].signature(), Some("2H4c1LcgWG2VuxE4rb318spyiMe1Aet5AysQHAB3Pm3z9nadxJH4C1GZD8yMeAgjdzojmLZGQppuiZqG2oKrtwF2")); + assert_eq!(parsed_txs[1].slot(), 240869063) + } -#[cfg(feature = "rpc_tests")] -#[tokio::test] -#[should_panic] -async fn test_rpc_get_txs_by_signatures_error() { - let client = BackfillRPC::connect("https://docs-demo.solana-mainnet.quiknode.pro/".to_string()); - let signatures = vec![ + #[tokio::test] + #[should_panic] + async fn test_rpc_get_txs_by_signatures_error() { + let client = + BackfillRPC::connect("https://docs-demo.solana-mainnet.quiknode.pro/".to_string()); + let signatures = vec![ Signature::from_str("2H4c1LcgWG2VuxE4rb318spyiMe1Aet5AysQHAB3Pm3z9nadxJH4C1GZD8yMeAgjdzojmLZGQppuiZqG2oKrtwF3").unwrap(), // transaction that does not exists ]; - client.get_txs_by_signatures(signatures, 0).await.unwrap(); + client.get_txs_by_signatures(signatures, 0).await.unwrap(); + } }