From e3a8937f2a970eb355f765acf2979454cba126c3 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Mon, 5 May 2025 15:01:43 +0200 Subject: [PATCH 1/4] feat(txm): added txm callbacks and promises when tx status change --- packages/txm/lib/Transaction.ts | 43 +++++++++++++++++++++++++++++++++ packages/txm/test/txm.test.ts | 38 +++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/packages/txm/lib/Transaction.ts b/packages/txm/lib/Transaction.ts index 56a33c34b8..70a166d0f5 100644 --- a/packages/txm/lib/Transaction.ts +++ b/packages/txm/lib/Transaction.ts @@ -147,6 +147,21 @@ export class Transaction { */ readonly metadata: Record + /** + * Stores callback functions for each transaction status. + * These callbacks are triggered when the transaction status changes. + */ + private callbacks: Record void)[]> + + /** + * Stores promises that wait for the transaction to be finalized. + * These promises are resolved when the transaction status changes to a finalized state. + */ + private waitingPromises: { + resolve: (transaction: Transaction) => void + reject: (error: Error) => void + }[] + constructor( config: TransactionConstructorConfig & { intentId?: UUID @@ -192,6 +207,16 @@ export class Transaction { this.contractName = config.contractName this.args = config.args } + + this.callbacks = Object.values(TransactionStatus).reduce( + (acc, status) => { + acc[status] = [] + return acc + }, + {} as Record void)[]>, + ) + + this.waitingPromises = [] } addAttempt(attempt: Attempt): void { @@ -240,13 +265,31 @@ export class Transaction { if (!NotFinalizedStatuses.includes(status)) { TxmMetrics.getInstance().attemptsUntilFinalization.record(this.attempts.length) + + this.waitingPromises.forEach(({ resolve }) => { + resolve(this) + }) + + this.waitingPromises = [] } + this.callbacks[status].forEach((callback) => callback(this)) + eventBus.emit(Topics.TransactionStatusChanged, { transaction: this, }) } + on(status: TransactionStatus, callback: (transaction: Transaction) => void): void { + this.callbacks[status].push(callback) + } + + waitForFinalization(): Promise { + return new Promise((resolve: (transaction: Transaction) => void, reject: (error: Error) => void) => { + this.waitingPromises.push({ resolve, reject }) + }) + } + get attemptCount(): number { return this.attempts.length } diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index 26da388285..38b6da3baa 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -521,6 +521,44 @@ test("Transaction failed for out of gas", async () => { expect(transactionReverted.collectionBlock).toBe(previousBlock.number! + 1n) }) +test("Use transaction.waitForFinalization() to wait for a transaction to be finalized", async () => { + let promiseResolved = false + const transaction = await createCounterTransaction() + + transaction.waitForFinalization().then((transaction) => { + promiseResolved = true + expect(transaction.status).toBe(TransactionStatus.Success) + }) + + transactionQueue.push(transaction) + + await mineBlock(2) + + expect(promiseResolved).toBe(true) +}) + +test("Use transaction.on() to register a callback which is triggered when the transaction changes to a specific status", async () => { + let promiseResolved = false + + const transaction = await txm.createTransaction({ + address: deployment.MockRevert, + functionName: "intentionalRevert", + contractName: "MockRevert", + args: [], + }) + + transaction.on(TransactionStatus.Failed, (transaction) => { + promiseResolved = true + expect(transaction.status).toBe(TransactionStatus.Failed) + }) + + transactionQueue.push(transaction) + + await mineBlock(2) + + expect(promiseResolved).toBe(true) +}) + test("Transaction cancelled due to deadline passing", async () => { const previousLivenessThreshold = txm.livenessThreshold Object.defineProperty(txm, "livenessThreshold", { From a61e32b12430280662c4898c1a912f1b770abdec Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Mon, 5 May 2025 15:19:32 +0200 Subject: [PATCH 2/4] feat(txm): implemented timeout mechanism for waitForFinalization --- packages/txm/lib/Transaction.ts | 28 ++++++++++++++++++---------- packages/txm/test/txm.test.ts | 7 +++++-- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/packages/txm/lib/Transaction.ts b/packages/txm/lib/Transaction.ts index 70a166d0f5..018035781c 100644 --- a/packages/txm/lib/Transaction.ts +++ b/packages/txm/lib/Transaction.ts @@ -9,6 +9,7 @@ import { Topics, eventBus } from "./EventBus.js" import type { TransactionTable } from "./db/types.js" import { TxmMetrics } from "./telemetry/metrics" import { TraceMethod } from "./telemetry/traces" +import { err, ok, type Result } from "neverthrow" export enum TransactionStatus { /** @@ -157,10 +158,7 @@ export class Transaction { * Stores promises that wait for the transaction to be finalized. * These promises are resolved when the transaction status changes to a finalized state. */ - private waitingPromises: { - resolve: (transaction: Transaction) => void - reject: (error: Error) => void - }[] + private waitingPromises: ((transaction: Result) => void)[] constructor( config: TransactionConstructorConfig & { @@ -266,8 +264,8 @@ export class Transaction { if (!NotFinalizedStatuses.includes(status)) { TxmMetrics.getInstance().attemptsUntilFinalization.record(this.attempts.length) - this.waitingPromises.forEach(({ resolve }) => { - resolve(this) + this.waitingPromises.forEach((resolve) => { + resolve(ok(this)) }) this.waitingPromises = [] @@ -284,10 +282,20 @@ export class Transaction { this.callbacks[status].push(callback) } - waitForFinalization(): Promise { - return new Promise((resolve: (transaction: Transaction) => void, reject: (error: Error) => void) => { - this.waitingPromises.push({ resolve, reject }) - }) + waitForFinalization(timeout?: number): Promise> { + return new Promise((resolve) => { + this.waitingPromises.push(resolve); + + if (timeout) { + setTimeout(() => { + this.waitingPromises = this.waitingPromises.filter( + p => p !== resolve + ); + + resolve(err(new Error(`Transaction finalization timed out after ${timeout}ms`))) + }, timeout); + } + }); } get attemptCount(): number { diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index 38b6da3baa..f1fe5a4d9f 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -525,9 +525,12 @@ test("Use transaction.waitForFinalization() to wait for a transaction to be fina let promiseResolved = false const transaction = await createCounterTransaction() - transaction.waitForFinalization().then((transaction) => { + transaction.waitForFinalization().then((result) => { + if (result.isErr()) { + throw result.error + } promiseResolved = true - expect(transaction.status).toBe(TransactionStatus.Success) + expect(result.value.status).toBe(TransactionStatus.Success) }) transactionQueue.push(transaction) From 40a224e4087087b69e21fb59f65f58c409d92111 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Mon, 5 May 2025 16:11:16 +0200 Subject: [PATCH 3/4] chore: format --- packages/txm/lib/Transaction.ts | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/txm/lib/Transaction.ts b/packages/txm/lib/Transaction.ts index 018035781c..9d59edd961 100644 --- a/packages/txm/lib/Transaction.ts +++ b/packages/txm/lib/Transaction.ts @@ -3,13 +3,13 @@ import type { Address, Hex, UUID } from "@happy.tech/common" import { context, trace } from "@opentelemetry/api" import type { Insertable, Selectable } from "kysely" import { type ContractFunctionArgs, type Hash, encodeFunctionData } from "viem" +import { type Result, err, ok } from "neverthrow" import type { ABIManager } from "./AbiManager" import type { LatestBlock } from "./BlockMonitor" import { Topics, eventBus } from "./EventBus.js" import type { TransactionTable } from "./db/types.js" import { TxmMetrics } from "./telemetry/metrics" import { TraceMethod } from "./telemetry/traces" -import { err, ok, type Result } from "neverthrow" export enum TransactionStatus { /** @@ -284,18 +284,16 @@ export class Transaction { waitForFinalization(timeout?: number): Promise> { return new Promise((resolve) => { - this.waitingPromises.push(resolve); - + this.waitingPromises.push(resolve) + if (timeout) { setTimeout(() => { - this.waitingPromises = this.waitingPromises.filter( - p => p !== resolve - ); + this.waitingPromises = this.waitingPromises.filter((p) => p !== resolve) resolve(err(new Error(`Transaction finalization timed out after ${timeout}ms`))) - }, timeout); + }, timeout) } - }); + }) } get attemptCount(): number { From e5b67a84b7a8eae38f14ae1e38e9a75c7d717dd3 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Wed, 21 May 2025 11:11:54 +0200 Subject: [PATCH 4/4] chore(txm): pr review --- packages/txm/lib/Transaction.ts | 44 +++++++++++++++------------ packages/txm/test/txm.test.ts | 54 ++++++++++++++++++++------------- 2 files changed, 58 insertions(+), 40 deletions(-) diff --git a/packages/txm/lib/Transaction.ts b/packages/txm/lib/Transaction.ts index 9d59edd961..7510e16316 100644 --- a/packages/txm/lib/Transaction.ts +++ b/packages/txm/lib/Transaction.ts @@ -2,8 +2,8 @@ import { bigIntReplacer, bigIntReviver, bigIntToZeroPadded, createUUID } from "@ import type { Address, Hex, UUID } from "@happy.tech/common" import { context, trace } from "@opentelemetry/api" import type { Insertable, Selectable } from "kysely" -import { type ContractFunctionArgs, type Hash, encodeFunctionData } from "viem" import { type Result, err, ok } from "neverthrow" +import { type ContractFunctionArgs, type Hash, encodeFunctionData } from "viem" import type { ABIManager } from "./AbiManager" import type { LatestBlock } from "./BlockMonitor" import { Topics, eventBus } from "./EventBus.js" @@ -158,7 +158,7 @@ export class Transaction { * Stores promises that wait for the transaction to be finalized. * These promises are resolved when the transaction status changes to a finalized state. */ - private waitingPromises: ((transaction: Result) => void)[] + private finalizedPromiseResolvers: ((transaction: Result) => void)[] constructor( config: TransactionConstructorConfig & { @@ -206,15 +206,12 @@ export class Transaction { this.args = config.args } - this.callbacks = Object.values(TransactionStatus).reduce( - (acc, status) => { - acc[status] = [] - return acc - }, - {} as Record void)[]>, - ) + this.callbacks = {} as Record void)[]> + Object.values(TransactionStatus).forEach((status) => { + this.callbacks[status] = [] + }) - this.waitingPromises = [] + this.finalizedPromiseResolvers = [] } addAttempt(attempt: Attempt): void { @@ -264,14 +261,23 @@ export class Transaction { if (!NotFinalizedStatuses.includes(status)) { TxmMetrics.getInstance().attemptsUntilFinalization.record(this.attempts.length) - this.waitingPromises.forEach((resolve) => { + this.finalizedPromiseResolvers.forEach((resolve) => { resolve(ok(this)) }) - this.waitingPromises = [] + this.finalizedPromiseResolvers = [] } - this.callbacks[status].forEach((callback) => callback(this)) + this.callbacks[status].forEach((callback) => { + try { + callback(this) + } catch (error) { + console.error( + `Error in callback for transaction ${this.intentId} when status changed to ${status}:`, + error, + ) + } + }) eventBus.emit(Topics.TransactionStatusChanged, { transaction: this, @@ -282,16 +288,16 @@ export class Transaction { this.callbacks[status].push(callback) } - waitForFinalization(timeout?: number): Promise> { + waitForFinalization(timeoutMs?: number): Promise> { return new Promise((resolve) => { - this.waitingPromises.push(resolve) + this.finalizedPromiseResolvers.push(resolve) - if (timeout) { + if (timeoutMs) { setTimeout(() => { - this.waitingPromises = this.waitingPromises.filter((p) => p !== resolve) + this.finalizedPromiseResolvers = this.finalizedPromiseResolvers.filter((p) => p !== resolve) - resolve(err(new Error(`Transaction finalization timed out after ${timeout}ms`))) - }, timeout) + resolve(err(new Error(`Transaction finalization timed out after ${timeoutMs}ms`))) + }, timeoutMs) } }) } diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index f1fe5a4d9f..450856cab9 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -109,8 +109,8 @@ async function getCurrentBlock(): Promise { }) } -async function createCounterTransaction(deadline?: number): Promise { - return await txm.createTransaction({ +function createCounterTransaction(deadline?: number): Transaction { + return txm.createTransaction({ address: deployment.HappyCounter, functionName: "increment", contractName: "HappyCounter", @@ -195,7 +195,7 @@ test("NewBlock hook works correctly", async () => { test("onTransactionStatusChanged hook works correctly", async () => { let hookTriggered = false - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) @@ -225,7 +225,7 @@ test("TransactionSubmissionFailed hook works correctly", async () => { proxyServer.addBehavior(ProxyBehavior.Fail) - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) @@ -269,7 +269,7 @@ test("TransactionSaveFailed hook works correctly", async () => { methodSpy.mockResolvedValue(err(new Error("Test error"))) - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) @@ -288,7 +288,7 @@ test("TransactionSaveFailed hook works correctly", async () => { test("Simple transaction executed", async () => { const previousCount = await getCurrentCounterValue() - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) @@ -329,7 +329,7 @@ test("A transaction created using calldata is executed correctly", async () => { args: [], }) - const transaction = await txm.createTransaction({ + const transaction = txm.createTransaction({ address: deployment.HappyCounter, calldata, }) @@ -364,7 +364,7 @@ test("A transaction created using calldata is executed correctly", async () => { test("Transaction retried", async () => { const previousCount = await getCurrentCounterValue() - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() proxyServer.addBehavior(ProxyBehavior.NotAnswer) @@ -424,7 +424,7 @@ test("Transaction retried", async () => { test("Transaction failed", async () => { const previousCount = await getCurrentCounterValue() - const transaction = await txm.createTransaction({ + const transaction = txm.createTransaction({ address: deployment.MockRevert, functionName: "intentionalRevert", contractName: "MockRevert", @@ -477,7 +477,7 @@ test("Transaction failed", async () => { test("Transaction failed for out of gas", async () => { const previousCount = await getCurrentCounterValue() - const transaction = await txm.createTransaction({ + const transaction = txm.createTransaction({ address: deployment.MockRevert, functionName: "intentionalRevertDueToGasLimit", contractName: "MockRevert", @@ -523,7 +523,7 @@ test("Transaction failed for out of gas", async () => { test("Use transaction.waitForFinalization() to wait for a transaction to be finalized", async () => { let promiseResolved = false - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transaction.waitForFinalization().then((result) => { if (result.isErr()) { @@ -540,10 +540,22 @@ test("Use transaction.waitForFinalization() to wait for a transaction to be fina expect(promiseResolved).toBe(true) }) +test("Use transaction.waitForFinalization() to wait for a transaction to be finalized with a timeout", async () => { + const transaction = createCounterTransaction() + + const timeStart = Date.now() + const timeout = 1000 + const result = await transaction.waitForFinalization(timeout) + const timeEnd = Date.now() + + expect(result.isErr()).toBe(true) + expect(timeEnd - timeStart).toBeLessThan(timeout + 250) +}) + test("Use transaction.on() to register a callback which is triggered when the transaction changes to a specific status", async () => { let promiseResolved = false - const transaction = await txm.createTransaction({ + const transaction = txm.createTransaction({ address: deployment.MockRevert, functionName: "intentionalRevert", contractName: "MockRevert", @@ -573,7 +585,7 @@ test("Transaction cancelled due to deadline passing", async () => { const deadline = Math.round(Date.now() / 1000 + 2) - const transaction = await createCounterTransaction(deadline) + const transaction = createCounterTransaction(deadline) proxyServer.addBehavior(ProxyBehavior.NotAnswer) @@ -652,7 +664,7 @@ test("Transaction cancelled due to deadline passing", async () => { test("Execute a transaction with value", async () => { const value = BigInt(1000) - const transactionToSend = await txm.createTransaction({ + const transactionToSend = txm.createTransaction({ address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", calldata: "0x", value, @@ -712,7 +724,7 @@ test("Correctly calculates baseFeePerGas after a block with high gas usage", asy hash: transactionBurnerExecuted.attempts[0].hash, }) - const incrementerTransaction = await createCounterTransaction() + const incrementerTransaction = createCounterTransaction() transactionQueue.push(incrementerTransaction) @@ -866,7 +878,7 @@ test("Transaction manager successfully processes transactions despite random RPC const emittedTransactions: Transaction[] = [] const numTransactions = 5 for (let i = 0; i < numTransactions; i++) { - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) emittedTransactions.push(transaction) @@ -911,7 +923,7 @@ test("Transaction succeeds in congested blocks", async () => { await sendBurnGasTransactionWithSecondWallet(2) - const incrementerTransaction = await createCounterTransaction() + const incrementerTransaction = createCounterTransaction() transactionQueue.push(incrementerTransaction) @@ -966,7 +978,7 @@ test("Finalized transactions are automatically purged from db after finalizedTra configurable: true, }) - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) @@ -1006,7 +1018,7 @@ test("RPC liveness monitor works correctly", async () => { proxyServer.setMode(ProxyMode.Deterministic) while (!txm.rpcLivenessMonitor.isAlive) { - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) @@ -1033,7 +1045,7 @@ test("RPC liveness monitor works correctly", async () => { expect(txm.rpcLivenessMonitor.isAlive).toBe(true) while (txm.rpcLivenessMonitor.isAlive) { - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction) @@ -1046,7 +1058,7 @@ test("RPC liveness monitor works correctly", async () => { proxyServer.setMode(ProxyMode.Deterministic) while (!txm.rpcLivenessMonitor.isAlive) { - const transaction = await createCounterTransaction() + const transaction = createCounterTransaction() transactionQueue.push(transaction)