Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 74 additions & 18 deletions src/data_sources/events/web3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,23 @@ export interface ContractCallInfo {
export class Web3Source {
private readonly _web3Wrapper: Web3Wrapper;
private readonly _web3: any;
private _failureCount: number = 0;
private _lastFailureTime: number = 0;
private readonly MAX_FAILURES = 5;
private readonly RESET_TIMEOUT = 5 * 60 * 1000; // 5 minute

constructor(provider: Web3ProviderEngine, wsProvider: string) {
this._web3Wrapper = new Web3Wrapper(provider);
const httpProvider = new Web3.providers.HttpProvider(wsProvider, {
timeout: 2 * 60 * 1000, // 2 minutes
timeout: 2 * 60 * 1000, // 2 minutes
keepAlive: true,
withCredentials: false,
headers: [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the contnent type from headers, we should allow for compression
Specifying json will make it so that plaintext is enforced

{
name: 'Content-Type',
value: 'application/json',
},
],
});
this._web3 = new Web3(httpProvider, wsProvider);

Expand Down Expand Up @@ -78,6 +91,26 @@ export class Web3Source {
});
}

private async _checkCircuitBreaker(): Promise<void> {
const now = Date.now();
if (this._failureCount >= this.MAX_FAILURES) {
if (now - this._lastFailureTime < this.RESET_TIMEOUT) {
throw new Error('Circuit breaker open - too many failures');
}
// Reset after timeout
this._failureCount = 0;
}
}

private _recordFailure(): void {
this._failureCount++;
this._lastFailureTime = Date.now();
}

private _recordSuccess(): void {
this._failureCount = 0;
}

public async getBatchBlockInfoForRangeAsync<B extends boolean>(
startBlock: number,
endBlock: number,
Expand All @@ -102,29 +135,52 @@ export class Web3Source {
blockNumbers: number[],
includeTransactions: boolean,
): Promise<(BlockWithoutTransactionData | BlockWithTransactionData)[]> {
await this._checkCircuitBreaker();
const batch = new this._web3.BatchRequest();
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000; // 1 second

const promises = blockNumbers.map((blockNumber) => {
return new Promise<BlockWithoutTransactionData | BlockWithTransactionData>((resolve, reject) => {
const req = this._web3.eth.getBlockByNumberN.request(
blockNumber,
includeTransactions,
(err: any, data: BlockWithTransactionData) => {
if (err) {
logger.error(`Blocks error: ${err}`);
reject(err);
} else resolve(data);
},
);
batch.add(req);
return new Promise<BlockWithoutTransactionData | BlockWithTransactionData>(async (resolve, reject) => {
let retryCount = 0;
const makeRequest = () => {
const req = this._web3.eth.getBlockByNumberN.request(
blockNumber,
includeTransactions,
(err: any, data: BlockWithTransactionData) => {
if (err) {
if (retryCount < MAX_RETRIES) {
retryCount++;
logger.warn(`Retry ${retryCount}/${MAX_RETRIES} for block ${blockNumber} due to error: ${err}`);
setTimeout(() => {
makeRequest();
}, RETRY_DELAY * retryCount);
} else {
this._recordFailure();
logger.error(`Blocks error after ${MAX_RETRIES} retries: ${err}`);
reject(err);
}
} else {
this._recordSuccess();
resolve(data);
}
},
);
batch.add(req);
};
makeRequest();
});
});

batch.execute();

const blocks = await Promise.all(promises);

return blocks;
try {
batch.execute();
const blocks = await Promise.all(promises);
return blocks;
} catch (err) {
this._recordFailure();
logger.error(`Batch execution failed: ${err}`);
throw err;
}
}

public async getBatchBlockReceiptsForRangeAsync(
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async function schedule(connection: Connection | null, producer: Producer | null
let wait: number;
if (duration > SECONDS_BETWEEN_RUNS * 1000) {
wait = 0;
logger.warn(`${funcName} is taking longer than desiered interval`);
logger.warn(`${funcName} is taking longer than desired interval`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lmao, I wonder how long that has been there

} else {
wait = SECONDS_BETWEEN_RUNS * 1000 - duration;
}
Expand Down
Loading