Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/randomness/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
},
"dependencies": {
"@happy.tech/common": "workspace:0.1.0",
"@happy.tech/txm": "workspace:0.1.0",
"@happy.tech/contracts": "workspace:0.1.0",
"@happy.tech/txm": "workspace:0.1.0",
"better-sqlite3": "^11.7.0",
"kysely": "^0.27.5",
"neverthrow": "^8.1.0",
Expand Down
80 changes: 80 additions & 0 deletions bun.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions packages/txm/lib/BlockMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { LogTag, Logger } from "@happy.tech/common"
import type { Block } from "viem"
import { Topics, eventBus } from "./EventBus.js"
import type { TransactionManager } from "./TransactionManager.js"
import { TxmMetrics } from "./telemetry/metrics"

/**
* A type alias for {@link Block} with the `blockTag` set to `"latest"`, ensuring type definitions correspond to the latest block.
*/
Expand Down Expand Up @@ -35,6 +37,10 @@ export class BlockMonitor {
private onNewBlock(block: LatestBlock) {
if (this.blockTimeout) clearTimeout(this.blockTimeout)
eventBus.emit(Topics.NewBlock, block)

TxmMetrics.getInstance().currentBlockGauge.record(Number(block.number))
TxmMetrics.getInstance().newBlockDelayHistogram.record(Date.now() - Number(block.timestamp) * 1000)

this.scheduleTimeout()
}

Expand All @@ -46,6 +52,7 @@ export class BlockMonitor {
}

private resetBlockSubscription() {
TxmMetrics.getInstance().resetBlockMonitorCounter.add(1)
if (this.unwatch) {
this.unwatch()
}
Expand Down
33 changes: 29 additions & 4 deletions packages/txm/lib/NonceManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { LogTag, Logger } from "@happy.tech/common"
import type { TransactionManager } from "./TransactionManager"
import { TxmMetrics } from "./telemetry/metrics"

/*
/*
* This class manages the nonce of the account that the transaction manager is using.
*
Expand Down Expand Up @@ -41,10 +44,19 @@ export class NonceManager {
public async start() {
const address = this.txmgr.viemWallet.account.address

const blockchainNonce = await this.txmgr.viemClient.getTransactionCount({
const blockchainNonceResult = await this.txmgr.viemClient.safeGetTransactionCount({
address: address,
})

if (blockchainNonceResult.isErr()) {
Logger.instance.error(LogTag.TXM, `Failed to get transaction count for address ${address}`, {
error: blockchainNonceResult.error,
})
throw new Error("Failed to get transaction count for address")
}

const blockchainNonce = blockchainNonceResult.value

this.maxExecutedNonce = blockchainNonce

const highestDbNonce = this.txmgr.transactionRepository.getHighestNonce()
Expand All @@ -62,11 +74,14 @@ export class NonceManager {

public requestNonce(): number {
if (this.returnedNonceQueue.length > 0) {
return this.returnedNonceQueue.shift()!
const nonce = this.returnedNonceQueue.shift()!
TxmMetrics.getInstance().returnedNonceQueueGauge.record(this.returnedNonceQueue.length)
return nonce
}

const requestedNonce = this.nonce
this.nonce = this.nonce + 1
TxmMetrics.getInstance().nonceManagerGauge.record(this.nonce)
return requestedNonce
}

Expand All @@ -79,15 +94,25 @@ export class NonceManager {
} else {
this.returnedNonceQueue.splice(index, 0, nonce)
}

TxmMetrics.getInstance().returnedNonceCounter.add(1)
TxmMetrics.getInstance().returnedNonceQueueGauge.record(this.returnedNonceQueue.length)
}

public async resync() {
const address = this.txmgr.viemWallet.account.address

const blockchainNonce = await this.txmgr.viemClient.getTransactionCount({
const blockchainNonceResult = await this.txmgr.viemClient.safeGetTransactionCount({
address: address,
})

this.maxExecutedNonce = blockchainNonce
if (blockchainNonceResult.isErr()) {
Logger.instance.error(LogTag.TXM, `Failed to get transaction count for address ${address}`, {
error: blockchainNonceResult.error,
})
return
}

this.maxExecutedNonce = blockchainNonceResult.value
}
}
10 changes: 10 additions & 0 deletions packages/txm/lib/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Address, ContractFunctionArgs, Hash } from "viem"
import type { LatestBlock } from "./BlockMonitor"
import { Topics, eventBus } from "./EventBus.js"
import type { TransactionTable } from "./db/types.js"
import { TxmMetrics } from "./telemetry/metrics"

export enum TransactionStatus {
/**
Expand Down Expand Up @@ -204,6 +205,15 @@ export class Transaction {
changeStatus(status: TransactionStatus): void {
this.status = status
this.markUpdated()

TxmMetrics.getInstance().transactionStatusChangeCounter.add(1, {
status: this.status,
})

if (!NotFinalizedStatuses.includes(status)) {
TxmMetrics.getInstance().attemptsUntilFinalization.record(this.attempts.length)
}

eventBus.emit(Topics.TransactionStatusChanged, {
transaction: this,
})
Expand Down
3 changes: 3 additions & 0 deletions packages/txm/lib/TransactionCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { LatestBlock } from "./BlockMonitor.js"
import { Topics, eventBus } from "./EventBus.js"
import { AttemptType, TransactionStatus } from "./Transaction.js"
import type { TransactionManager } from "./TransactionManager.js"
import { TxmMetrics } from "./telemetry/metrics"

/**
* This module is responsible for retrieving transactions from the originators when a new block is received.
Expand Down Expand Up @@ -41,6 +42,8 @@ export class TransactionCollector {
return
}

TxmMetrics.getInstance().transactionCollectedCounter.add(transactionsBatch.length)

await Promise.all(
transactionsBatch.map(async (transaction) => {
const nonce = this.txmgr.nonceManager.requestNonce()
Expand Down
46 changes: 45 additions & 1 deletion packages/txm/lib/TransactionManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { UUID } from "@happy.tech/common"
import type { MetricReader } from "@opentelemetry/sdk-metrics"
import type { Result } from "neverthrow"
import {
type Abi,
type Hex,
Expand All @@ -23,6 +25,8 @@ import { TransactionRepository } from "./TransactionRepository.js"
import { TransactionSubmitter } from "./TransactionSubmitter.js"
import { TxMonitor } from "./TxMonitor.js"
import { type EIP1559Parameters, opStackDefaultEIP1559Parameters } from "./eip1559.js"
import { initializeTelemetry } from "./telemetry/instrumentation"
import { TxmMetrics } from "./telemetry/metrics"
import { getUrlProtocol } from "./utils/getUrlProtocol"
import type { SafeViemPublicClient, SafeViemWalletClient } from "./utils/safeViemClients"
import { convertToSafeViemPublicClient, convertToSafeViemWalletClient } from "./utils/safeViemClients"
Expand Down Expand Up @@ -133,6 +137,30 @@ export type TransactionManagerConfig = {
* Default: {@link DefaultRetryPolicyManager}
*/
retryPolicyManager?: RetryPolicyManager

/**
* Transaction Manager metrics configuration.
*/
metrics?: {
/**
* Whether to enable metrics collection.
* Defaults to true.
*/
active?: boolean
/**
* Port number for the default Prometheus metrics endpoint.
* The default metric reader is a Prometheus reader that exposes metrics via this endpoint.
* This setting is only used when custom metricReaders are not provided.
* Defaults to 9090.
*/
port?: number
/**
* Custom metric readers to use instead of the default Prometheus reader.
* If provided, these readers will be used and the port setting will be ignored.
* If not provided, a default Prometheus reader will be configured using the specified port.
*/
metricReaders?: MetricReader[]
}
}

export type TransactionOriginator = (block: LatestBlock) => Promise<Transaction[]>
Expand Down Expand Up @@ -172,6 +200,12 @@ export class TransactionManager {
public readonly blockInactivityTimeout: number

constructor(_config: TransactionManagerConfig) {
initializeTelemetry({
active: _config.metrics?.active ?? true,
port: _config.metrics?.port ?? 9090,
metricReaders: _config.metrics?.metricReaders,
})

this.collectors = []

const protocol = getUrlProtocol(_config.rpc.url)
Expand Down Expand Up @@ -230,13 +264,23 @@ export class TransactionManager {
transport,
chain,
}),
{
rpcCounter: TxmMetrics.getInstance().rpcCounter,
rpcErrorCounter: TxmMetrics.getInstance().rpcErrorCounter,
rpcResponseTimeHistogram: TxmMetrics.getInstance().blockchainRpcResponseTimeHistogram,
},
)

this.viemClient = convertToSafeViemPublicClient(
createPublicClient({
transport,
chain,
}),
{
rpcCounter: TxmMetrics.getInstance().rpcCounter,
rpcErrorCounter: TxmMetrics.getInstance().rpcErrorCounter,
rpcResponseTimeHistogram: TxmMetrics.getInstance().blockchainRpcResponseTimeHistogram,
},
)

this.nonceManager = new NonceManager(this)
Expand Down Expand Up @@ -284,7 +328,7 @@ export class TransactionManager {
return this.hookManager.addHook(type, handler)
}

public async getTransaction(txIntentId: UUID): Promise<Transaction | undefined> {
public async getTransaction(txIntentId: UUID): Promise<Result<Transaction | undefined, Error>> {
return this.transactionRepository.getTransaction(txIntentId)
}

Expand Down
89 changes: 73 additions & 16 deletions packages/txm/lib/TransactionRepository.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { unknownToError } from "@happy.tech/common"
import type { UUID } from "@happy.tech/common"
import { type Result, ResultAsync } from "neverthrow"
import { type Result, ResultAsync, err, ok } from "neverthrow"
import { Topics, eventBus } from "./EventBus.js"
import { NotFinalizedStatuses, Transaction } from "./Transaction.js"
import type { TransactionManager } from "./TransactionManager.js"
import { db } from "./db/driver.js"
import { TxmMetrics } from "./telemetry/metrics"

/**
* This module acts as intermediate layer between the library and the database.
Expand Down Expand Up @@ -41,28 +42,54 @@ export class TransactionRepository {
return this.notFinalizedTransactions.filter((t) => t.collectionBlock && t.collectionBlock < blockNumber)
}

async getTransaction(intentId: UUID): Promise<Transaction | undefined> {
async getTransaction(intentId: UUID): Promise<Result<Transaction | undefined, Error>> {
const cachedTransaction = this.notFinalizedTransactions.find((t) => t.intentId === intentId)

if (cachedTransaction) {
return cachedTransaction
return ok(cachedTransaction)
}

const persistedTransaction = await db
.selectFrom("transaction")
.where("intentId", "=", intentId)
.where("from", "=", this.transactionManager.viemWallet.account.address)
.selectAll()
.executeTakeFirst()
TxmMetrics.getInstance().databaseOperationsCounter.add(1, {
operation: "getTransaction",
})
const start = Date.now()

const persistedTransactionResult = await ResultAsync.fromPromise(
db
.selectFrom("transaction")
.where("intentId", "=", intentId)
.where("from", "=", this.transactionManager.viemWallet.account.address)
.selectAll()
.executeTakeFirst(),
unknownToError,
)

TxmMetrics.getInstance().databaseOperationDurationHistogram.record(Date.now() - start, {
operation: "getTransaction",
})

return persistedTransaction ? Transaction.fromDbRow(persistedTransaction) : undefined
if (persistedTransactionResult.isErr()) {
TxmMetrics.getInstance().databaseErrorsCounter.add(1, {
operation: "getTransaction",
})
return err(persistedTransactionResult.error)
}

const persistedTransaction = persistedTransactionResult.value

return persistedTransaction ? ok(Transaction.fromDbRow(persistedTransaction)) : ok(undefined)
}

async saveTransactions(transactions: Transaction[]): Promise<Result<void, Error>> {
const transactionsToFlush = transactions.filter((t) => t.pendingFlush)

const notPersistedTransactions = transactions.filter((t) => t.notPersisted)

TxmMetrics.getInstance().databaseOperationsCounter.add(1, {
operation: "saveTransactions",
})
const start = Date.now()

const result = await ResultAsync.fromPromise(
db.transaction().execute(async (dbTransaction) => {
const promises = transactionsToFlush.map((t) => {
Expand All @@ -81,12 +108,22 @@ export class TransactionRepository {
unknownToError,
)

TxmMetrics.getInstance().databaseOperationDurationHistogram.record(Date.now() - start, {
operation: "saveTransactions",
})

if (result.isOk()) {
this.notFinalizedTransactions = this.notFinalizedTransactions.filter((transaction) =>
NotFinalizedStatuses.includes(transaction.status),
)
this.notFinalizedTransactions.push(...notPersistedTransactions)
transactions.forEach((t) => t.markFlushed())

TxmMetrics.getInstance().notFinalizedTransactionsGauge.record(this.notFinalizedTransactions.length)
} else {
TxmMetrics.getInstance().databaseErrorsCounter.add(1, {
operation: "saveTransactions",
})
}

return result
Expand All @@ -105,11 +142,31 @@ export class TransactionRepository {
}

async purgeFinalizedTransactions() {
await db
.deleteFrom("transaction")
.where("status", "not in", NotFinalizedStatuses)
.where("updatedAt", "<", Date.now() - this.transactionManager.finalizedTransactionPurgeTime)
.where("from", "=", this.transactionManager.viemWallet.account.address)
.execute()
TxmMetrics.getInstance().databaseOperationsCounter.add(1, {
operation: "purgeFinalizedTransactions",
})
const start = Date.now()

const result = await ResultAsync.fromPromise(
db
.deleteFrom("transaction")
.where("status", "not in", NotFinalizedStatuses)
.where("updatedAt", "<", Date.now() - this.transactionManager.finalizedTransactionPurgeTime)
.where("from", "=", this.transactionManager.viemWallet.account.address)
.execute(),
unknownToError,
)

TxmMetrics.getInstance().databaseOperationDurationHistogram.record(Date.now() - start, {
operation: "purgeFinalizedTransactions",
})

if (result.isErr()) {
TxmMetrics.getInstance().databaseErrorsCounter.add(1, {
operation: "purgeFinalizedTransactions",
})
}

return result
}
}
Loading
Loading