Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions packages/txm/lib/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { bigIntReplacer, bigIntReviver, bigIntToZeroPadded, createUUID } from "@
import type { Address, Hex, UUID } from "@happy.tech/common"
import { context, trace } from "@opentelemetry/api"
import type { Insertable, Selectable } from "kysely"
import { type Result, err, ok } from "neverthrow"
import { type ContractFunctionArgs, type Hash, encodeFunctionData } from "viem"
import type { ABIManager } from "./AbiManager"
import type { LatestBlock } from "./BlockMonitor"
Expand Down Expand Up @@ -147,6 +148,18 @@ export class Transaction {
*/
readonly metadata: Record<string, unknown>

/**
* Stores callback functions for each transaction status.
* These callbacks are triggered when the transaction status changes.
*/
private callbacks: Record<TransactionStatus, ((transaction: Transaction) => void)[]>

/**
* Stores promises that wait for the transaction to be finalized.
* These promises are resolved when the transaction status changes to a finalized state.
*/
private finalizedPromiseResolvers: ((transaction: Result<Transaction, Error>) => void)[]

constructor(
config: TransactionConstructorConfig & {
intentId?: UUID
Expand Down Expand Up @@ -192,6 +205,13 @@ export class Transaction {
this.contractName = config.contractName
this.args = config.args
}

this.callbacks = {} as Record<TransactionStatus, ((transaction: Transaction) => void)[]>
Object.values(TransactionStatus).forEach((status) => {
this.callbacks[status] = []
})

this.finalizedPromiseResolvers = []
}

addAttempt(attempt: Attempt): void {
Expand Down Expand Up @@ -240,13 +260,48 @@ export class Transaction {

if (!NotFinalizedStatuses.includes(status)) {
TxmMetrics.getInstance().attemptsUntilFinalization.record(this.attempts.length)

this.finalizedPromiseResolvers.forEach((resolve) => {
resolve(ok(this))
})

this.finalizedPromiseResolvers = []
}

this.callbacks[status].forEach((callback) => {
try {
callback(this)
} catch (error) {
console.error(
`Error in callback for transaction ${this.intentId} when status changed to ${status}:`,
error,
)
}
})

eventBus.emit(Topics.TransactionStatusChanged, {
transaction: this,
})
}

on(status: TransactionStatus, callback: (transaction: Transaction) => void): void {
this.callbacks[status].push(callback)
}

waitForFinalization(timeoutMs?: number): Promise<Result<Transaction, Error>> {
return new Promise((resolve) => {
this.finalizedPromiseResolvers.push(resolve)

if (timeoutMs) {
setTimeout(() => {
this.finalizedPromiseResolvers = this.finalizedPromiseResolvers.filter((p) => p !== resolve)

resolve(err(new Error(`Transaction finalization timed out after ${timeoutMs}ms`)))
}, timeoutMs)
}
})
}

get attemptCount(): number {
return this.attempts.length
}
Expand Down
91 changes: 72 additions & 19 deletions packages/txm/test/txm.test.ts
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran stats on the repo the other day and this is the biggest file in it :D
Not important, but we can probably consider splitting it up sometime in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ async function getCurrentBlock(): Promise<Block> {
})
}

async function createCounterTransaction(deadline?: number): Promise<Transaction> {
return await txm.createTransaction({
function createCounterTransaction(deadline?: number): Transaction {
return txm.createTransaction({
address: deployment.HappyCounter,
functionName: "increment",
contractName: "HappyCounter",
Expand Down Expand Up @@ -195,7 +195,7 @@ test("NewBlock hook works correctly", async () => {
test("onTransactionStatusChanged hook works correctly", async () => {
let hookTriggered = false

const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand Down Expand Up @@ -225,7 +225,7 @@ test("TransactionSubmissionFailed hook works correctly", async () => {

proxyServer.addBehavior(ProxyBehavior.Fail)

const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand Down Expand Up @@ -269,7 +269,7 @@ test("TransactionSaveFailed hook works correctly", async () => {

methodSpy.mockResolvedValue(err(new Error("Test error")))

const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand All @@ -288,7 +288,7 @@ test("TransactionSaveFailed hook works correctly", async () => {
test("Simple transaction executed", async () => {
const previousCount = await getCurrentCounterValue()

const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand Down Expand Up @@ -329,7 +329,7 @@ test("A transaction created using calldata is executed correctly", async () => {
args: [],
})

const transaction = await txm.createTransaction({
const transaction = txm.createTransaction({
address: deployment.HappyCounter,
calldata,
})
Expand Down Expand Up @@ -364,7 +364,7 @@ test("A transaction created using calldata is executed correctly", async () => {
test("Transaction retried", async () => {
const previousCount = await getCurrentCounterValue()

const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

proxyServer.addBehavior(ProxyBehavior.NotAnswer)

Expand Down Expand Up @@ -424,7 +424,7 @@ test("Transaction retried", async () => {
test("Transaction failed", async () => {
const previousCount = await getCurrentCounterValue()

const transaction = await txm.createTransaction({
const transaction = txm.createTransaction({
address: deployment.MockRevert,
functionName: "intentionalRevert",
contractName: "MockRevert",
Expand Down Expand Up @@ -477,7 +477,7 @@ test("Transaction failed", async () => {
test("Transaction failed for out of gas", async () => {
const previousCount = await getCurrentCounterValue()

const transaction = await txm.createTransaction({
const transaction = txm.createTransaction({
address: deployment.MockRevert,
functionName: "intentionalRevertDueToGasLimit",
contractName: "MockRevert",
Expand Down Expand Up @@ -521,6 +521,59 @@ test("Transaction failed for out of gas", async () => {
expect(transactionReverted.collectionBlock).toBe(previousBlock.number! + 1n)
})

test("Use transaction.waitForFinalization() to wait for a transaction to be finalized", async () => {
let promiseResolved = false
const transaction = createCounterTransaction()

transaction.waitForFinalization().then((result) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a timeout test to check that that functionality works as intended?

if (result.isErr()) {
throw result.error
}
promiseResolved = true
expect(result.value.status).toBe(TransactionStatus.Success)
})

transactionQueue.push(transaction)

await mineBlock(2)

expect(promiseResolved).toBe(true)
})

test("Use transaction.waitForFinalization() to wait for a transaction to be finalized with a timeout", async () => {
const transaction = createCounterTransaction()

const timeStart = Date.now()
const timeout = 1000
const result = await transaction.waitForFinalization(timeout)
const timeEnd = Date.now()

expect(result.isErr()).toBe(true)
expect(timeEnd - timeStart).toBeLessThan(timeout + 250)
})

test("Use transaction.on() to register a callback which is triggered when the transaction changes to a specific status", async () => {
let promiseResolved = false

const transaction = txm.createTransaction({
address: deployment.MockRevert,
functionName: "intentionalRevert",
contractName: "MockRevert",
args: [],
})

transaction.on(TransactionStatus.Failed, (transaction) => {
promiseResolved = true
expect(transaction.status).toBe(TransactionStatus.Failed)
})

transactionQueue.push(transaction)

await mineBlock(2)

expect(promiseResolved).toBe(true)
})

test("Transaction cancelled due to deadline passing", async () => {
const previousLivenessThreshold = txm.livenessThreshold
Object.defineProperty(txm, "livenessThreshold", {
Expand All @@ -532,7 +585,7 @@ test("Transaction cancelled due to deadline passing", async () => {

const deadline = Math.round(Date.now() / 1000 + 2)

const transaction = await createCounterTransaction(deadline)
const transaction = createCounterTransaction(deadline)

proxyServer.addBehavior(ProxyBehavior.NotAnswer)

Expand Down Expand Up @@ -611,7 +664,7 @@ test("Transaction cancelled due to deadline passing", async () => {

test("Execute a transaction with value", async () => {
const value = BigInt(1000)
const transactionToSend = await txm.createTransaction({
const transactionToSend = txm.createTransaction({
address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266",
calldata: "0x",
value,
Expand Down Expand Up @@ -671,7 +724,7 @@ test("Correctly calculates baseFeePerGas after a block with high gas usage", asy
hash: transactionBurnerExecuted.attempts[0].hash,
})

const incrementerTransaction = await createCounterTransaction()
const incrementerTransaction = createCounterTransaction()

transactionQueue.push(incrementerTransaction)

Expand Down Expand Up @@ -825,7 +878,7 @@ test("Transaction manager successfully processes transactions despite random RPC
const emittedTransactions: Transaction[] = []
const numTransactions = 5
for (let i = 0; i < numTransactions; i++) {
const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()
transactionQueue.push(transaction)
emittedTransactions.push(transaction)

Expand Down Expand Up @@ -870,7 +923,7 @@ test("Transaction succeeds in congested blocks", async () => {

await sendBurnGasTransactionWithSecondWallet(2)

const incrementerTransaction = await createCounterTransaction()
const incrementerTransaction = createCounterTransaction()

transactionQueue.push(incrementerTransaction)

Expand Down Expand Up @@ -925,7 +978,7 @@ test("Finalized transactions are automatically purged from db after finalizedTra
configurable: true,
})

const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand Down Expand Up @@ -965,7 +1018,7 @@ test("RPC liveness monitor works correctly", async () => {
proxyServer.setMode(ProxyMode.Deterministic)

while (!txm.rpcLivenessMonitor.isAlive) {
const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand All @@ -992,7 +1045,7 @@ test("RPC liveness monitor works correctly", async () => {
expect(txm.rpcLivenessMonitor.isAlive).toBe(true)

while (txm.rpcLivenessMonitor.isAlive) {
const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand All @@ -1005,7 +1058,7 @@ test("RPC liveness monitor works correctly", async () => {
proxyServer.setMode(ProxyMode.Deterministic)

while (!txm.rpcLivenessMonitor.isAlive) {
const transaction = await createCounterTransaction()
const transaction = createCounterTransaction()

transactionQueue.push(transaction)

Expand Down