From ae3815d1424cb7fe2aa53ef1a48241cde8ddc7cd Mon Sep 17 00:00:00 2001 From: Nicolas Alencar <55406040+nicoalencar@users.noreply.github.com> Date: Mon, 5 May 2025 18:57:52 -0300 Subject: [PATCH] Added retry logic on RPC requests --- src/data_sources/events/web3.ts | 92 ++++++++++++++++++++++++++------- src/index.ts | 2 +- 2 files changed, 75 insertions(+), 19 deletions(-) diff --git a/src/data_sources/events/web3.ts b/src/data_sources/events/web3.ts index f6b4ee3..c0a0583 100644 --- a/src/data_sources/events/web3.ts +++ b/src/data_sources/events/web3.ts @@ -31,10 +31,23 @@ export interface ContractCallInfo { export class Web3Source { private readonly _web3Wrapper: Web3Wrapper; private readonly _web3: any; + private _failureCount: number = 0; + private _lastFailureTime: number = 0; + private readonly MAX_FAILURES = 5; + private readonly RESET_TIMEOUT = 5 * 60 * 1000; // 5 minute + constructor(provider: Web3ProviderEngine, wsProvider: string) { this._web3Wrapper = new Web3Wrapper(provider); const httpProvider = new Web3.providers.HttpProvider(wsProvider, { - timeout: 2 * 60 * 1000, // 2 minutes + timeout: 2 * 60 * 1000, // 2 minutes + keepAlive: true, + withCredentials: false, + headers: [ + { + name: 'Content-Type', + value: 'application/json', + }, + ], }); this._web3 = new Web3(httpProvider, wsProvider); @@ -78,6 +91,26 @@ export class Web3Source { }); } + private async _checkCircuitBreaker(): Promise { + const now = Date.now(); + if (this._failureCount >= this.MAX_FAILURES) { + if (now - this._lastFailureTime < this.RESET_TIMEOUT) { + throw new Error('Circuit breaker open - too many failures'); + } + // Reset after timeout + this._failureCount = 0; + } + } + + private _recordFailure(): void { + this._failureCount++; + this._lastFailureTime = Date.now(); + } + + private _recordSuccess(): void { + this._failureCount = 0; + } + public async getBatchBlockInfoForRangeAsync( startBlock: number, endBlock: number, @@ -102,29 +135,52 @@ export class Web3Source { blockNumbers: number[], includeTransactions: boolean, ): Promise<(BlockWithoutTransactionData | BlockWithTransactionData)[]> { + await this._checkCircuitBreaker(); const batch = new this._web3.BatchRequest(); + const MAX_RETRIES = 3; + const RETRY_DELAY = 1000; // 1 second const promises = blockNumbers.map((blockNumber) => { - return new Promise((resolve, reject) => { - const req = this._web3.eth.getBlockByNumberN.request( - blockNumber, - includeTransactions, - (err: any, data: BlockWithTransactionData) => { - if (err) { - logger.error(`Blocks error: ${err}`); - reject(err); - } else resolve(data); - }, - ); - batch.add(req); + return new Promise(async (resolve, reject) => { + let retryCount = 0; + const makeRequest = () => { + const req = this._web3.eth.getBlockByNumberN.request( + blockNumber, + includeTransactions, + (err: any, data: BlockWithTransactionData) => { + if (err) { + if (retryCount < MAX_RETRIES) { + retryCount++; + logger.warn(`Retry ${retryCount}/${MAX_RETRIES} for block ${blockNumber} due to error: ${err}`); + setTimeout(() => { + makeRequest(); + }, RETRY_DELAY * retryCount); + } else { + this._recordFailure(); + logger.error(`Blocks error after ${MAX_RETRIES} retries: ${err}`); + reject(err); + } + } else { + this._recordSuccess(); + resolve(data); + } + }, + ); + batch.add(req); + }; + makeRequest(); }); }); - batch.execute(); - - const blocks = await Promise.all(promises); - - return blocks; + try { + batch.execute(); + const blocks = await Promise.all(promises); + return blocks; + } catch (err) { + this._recordFailure(); + logger.error(`Batch execution failed: ${err}`); + throw err; + } } public async getBatchBlockReceiptsForRangeAsync( diff --git a/src/index.ts b/src/index.ts index 61ac8a7..4ca3501 100644 --- a/src/index.ts +++ b/src/index.ts @@ -139,7 +139,7 @@ async function schedule(connection: Connection | null, producer: Producer | null let wait: number; if (duration > SECONDS_BETWEEN_RUNS * 1000) { wait = 0; - logger.warn(`${funcName} is taking longer than desiered interval`); + logger.warn(`${funcName} is taking longer than desired interval`); } else { wait = SECONDS_BETWEEN_RUNS * 1000 - duration; }