diff --git a/apps/randomness/package.json b/apps/randomness/package.json index 235fb4c725..597373fd7c 100644 --- a/apps/randomness/package.json +++ b/apps/randomness/package.json @@ -13,6 +13,8 @@ "@happy.tech/common": "workspace:0.1.0", "@happy.tech/contracts": "workspace:0.1.0", "@happy.tech/txm": "workspace:0.1.0", + "@opentelemetry/sdk-trace-node": "^1.30.1", + "@opentelemetry/api": "^1.9.0", "better-sqlite3": "^11.7.0", "kysely": "^0.27.5", "neverthrow": "^8.1.0", diff --git a/apps/randomness/src/env.ts b/apps/randomness/src/env.ts index 29dd9ae807..9246770552 100644 --- a/apps/randomness/src/env.ts +++ b/apps/randomness/src/env.ts @@ -33,6 +33,7 @@ const envSchema = z.object({ EVM_DRAND_GENESIS_TIMESTAMP_SECONDS: z.string().transform((s) => BigInt(s)), EVM_DRAND_PERIOD_SECONDS: z.string().transform((s) => BigInt(s)), EVM_DRAND_MARGIN: z.string().transform((s) => BigInt(s)), + OTEL_EXPORTER_OTLP_ENDPOINT: z.string().trim().optional(), }) const parsedEnv = envSchema.safeParse(process.env) diff --git a/apps/randomness/src/index.ts b/apps/randomness/src/index.ts index 8976cdc1bd..1b4b30f044 100644 --- a/apps/randomness/src/index.ts +++ b/apps/randomness/src/index.ts @@ -1,6 +1,7 @@ import { abis } from "@happy.tech/contracts/random/anvil" import { TransactionManager, TransactionStatus, TxmHookType } from "@happy.tech/txm" import type { LatestBlock, Transaction } from "@happy.tech/txm" +import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto" import { CustomGasEstimator } from "./CustomGasEstimator.js" import { DrandRepository } from "./DrandRepository" import { DrandService } from "./DrandService" @@ -33,6 +34,14 @@ class RandomnessService { gas: { minPriorityFeePerGas: 10n, }, + traces: { + active: true, + spanExporter: env.OTEL_EXPORTER_OTLP_ENDPOINT + ? new OTLPTraceExporter({ + url: env.OTEL_EXPORTER_OTLP_ENDPOINT, + }) + : undefined, + }, }) this.transactionFactory = new TransactionFactory(this.txm, env.RANDOM_CONTRACT_ADDRESS, env.PRECOMMIT_DELAY) this.drandService = new DrandService(this.drandRepository, this.transactionFactory) diff --git a/bun.lock b/bun.lock index 3ff0c87e9f..445c09b25d 100644 --- a/bun.lock +++ b/bun.lock @@ -100,6 +100,8 @@ "@happy.tech/common": "workspace:0.1.0", "@happy.tech/contracts": "workspace:0.1.0", "@happy.tech/txm": "workspace:0.1.0", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/sdk-trace-node": "^1.30.1", "better-sqlite3": "^11.7.0", "kysely": "^0.27.5", "neverthrow": "^8.1.0", diff --git a/packages/txm/lib/BlockMonitor.ts b/packages/txm/lib/BlockMonitor.ts index f3dc88fd08..4a738f0398 100644 --- a/packages/txm/lib/BlockMonitor.ts +++ b/packages/txm/lib/BlockMonitor.ts @@ -1,9 +1,10 @@ import { LogTag, Logger } from "@happy.tech/common" +import { ROOT_CONTEXT, SpanStatusCode, context, trace } from "@opentelemetry/api" import type { Block } from "viem" import { Topics, eventBus } from "./EventBus.js" import type { TransactionManager } from "./TransactionManager.js" import { TxmMetrics } from "./telemetry/metrics" - +import { TraceMethod } from "./telemetry/traces" /** * A type alias for {@link Block} with the `blockTag` set to `"latest"`, ensuring type definitions correspond to the latest block. */ @@ -31,17 +32,21 @@ export class BlockMonitor { }) } + @TraceMethod("txm.block-monitor.on-new-block") private onNewBlock(block: LatestBlock | undefined) { if (!block) { Logger.instance.error(LogTag.TXM, "Received undefined block") return } + const span = trace.getSpan(context.active())! + span.setAttribute("block.number", Number(block.number)) + if (this.latestProcessedBlockNumber && block.number <= this.latestProcessedBlockNumber) { - Logger.instance.warn( - LogTag.TXM, - "Received block number less than or equal to latest processed block number. Skipping.", - ) + const description = `Received block number less than or equal to latest processed block number. Skipping. Latest processed block number: ${this.latestProcessedBlockNumber}, received block number: ${block.number}` + span.recordException(new Error(description)) + span.setStatus({ code: SpanStatusCode.ERROR }) + Logger.instance.warn(LogTag.TXM, description) return } @@ -58,6 +63,7 @@ export class BlockMonitor { this.scheduleTimeout() } + @TraceMethod("txm.block-monitor.schedule-timeout") private scheduleTimeout() { this.blockTimeout = setTimeout(() => { Logger.instance.warn(LogTag.TXM, "Timeout reached. Resetting block subscription.") @@ -65,6 +71,7 @@ export class BlockMonitor { }, this.txmgr.blockInactivityTimeout) } + @TraceMethod("txm.block-monitor.reset-block-subscription") private resetBlockSubscription() { TxmMetrics.getInstance().resetBlockMonitorCounter.add(1) this.txmgr.rpcLivenessMonitor.trackError() @@ -79,6 +86,13 @@ export class BlockMonitor { this.unwatch = undefined } - setTimeout(() => this.start(), 500) + // Delay block monitor restart by 500ms + // This prevents an infinite loop of rapid restarts + // when the blockchain node is unresponsive or down + setTimeout(() => { + context.with(ROOT_CONTEXT, () => { + this.start() + }) + }, 500) } } diff --git a/packages/txm/lib/GasEstimator.ts b/packages/txm/lib/GasEstimator.ts index d6fb3c7163..ce30c6410f 100644 --- a/packages/txm/lib/GasEstimator.ts +++ b/packages/txm/lib/GasEstimator.ts @@ -1,7 +1,9 @@ +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import { type Result, err, ok } from "neverthrow" import { encodeFunctionData } from "viem" import type { Transaction } from "./Transaction.js" import type { TransactionManager } from "./TransactionManager.js" +import { TraceMethod } from "./telemetry/traces" export enum EstimateGasErrorCause { EstimateGasABINotFound = "EstimateGasABINotFound", @@ -61,16 +63,22 @@ export class DefaultGasLimitEstimator implements GasEstimator { * to use the default case without needing to reimplement it. This is particularly * useful for services like the Randomness service. */ + @TraceMethod("txm.gas-estimator.simulate-transaction-for-gas") protected async simulateTransactionForGas( transactionManager: TransactionManager, transaction: Transaction, ): Promise> { + const span = trace.getSpan(context.active())! + const abi = transactionManager.abiManager.get(transaction.contractName) if (!abi) { + const description = `ABI not found for contract ${transaction.contractName}` + span.recordException(new Error(description)) + span.setStatus({ code: SpanStatusCode.ERROR }) return err({ cause: EstimateGasErrorCause.EstimateGasABINotFound, - description: `ABI not found for contract ${transaction.contractName}`, + description, }) } @@ -86,12 +94,20 @@ export class DefaultGasLimitEstimator implements GasEstimator { }) if (gasResult.isErr()) { + const description = `Failed to estimate gas for transaction ${transaction.intentId}. Details: ${gasResult.error}` + span.recordException(new Error(description)) + span.setStatus({ code: SpanStatusCode.ERROR }) return err({ cause: EstimateGasErrorCause.EstimateGasClientError, - description: `Failed to estimate gas for transaction ${transaction.intentId}. Details: ${gasResult.error}`, + description, }) } + span.addEvent("txm.gas-estimator.simulate-transaction-for-gas.succeeded", { + transactionIntentId: transaction.intentId, + gas: Number(gasResult.value), + }) + return ok(gasResult.value) } } diff --git a/packages/txm/lib/GasPriceOracle.ts b/packages/txm/lib/GasPriceOracle.ts index b77bedbbf6..f89aa5e372 100644 --- a/packages/txm/lib/GasPriceOracle.ts +++ b/packages/txm/lib/GasPriceOracle.ts @@ -1,8 +1,10 @@ import { bigIntMax } from "@happy.tech/common" +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import { type Result, err, ok } from "neverthrow" import type { LatestBlock } from "./BlockMonitor.js" import { Topics, eventBus } from "./EventBus.js" import type { TransactionManager } from "./TransactionManager.js" +import { TraceMethod } from "./telemetry/traces" /** * This module estimates the gas price for transaction execution. It updates with each new block, basing its calculations on EIP-1559. @@ -39,11 +41,21 @@ export class GasPriceOracle { await this.onNewBlock(block) } + @TraceMethod("txm.gas-price-oracle.on-new-block") private async onNewBlock(block: LatestBlock) { + const span = trace.getSpan(context.active())! + const baseFeePerGas = block.baseFeePerGas const gasUsed = block.gasUsed const gasLimit = block.gasLimit + span.addEvent("txm.gas-price-oracle.on-new-block.started", { + blockNumber: Number(block.number), + baseFeePerGas: Number(baseFeePerGas), + gasUsed: Number(gasUsed), + gasLimit: Number(gasLimit), + }) + this.expectedNextBaseFeePerGas = this.calculateExpectedNextBaseFeePerGas( baseFeePerGas as bigint, gasUsed, @@ -51,14 +63,23 @@ export class GasPriceOracle { ) const targetPriorityFeeResult = await this.calculateTargetPriorityFee() if (targetPriorityFeeResult.isErr()) { + span.recordException(targetPriorityFeeResult.error) + span.setStatus({ code: SpanStatusCode.ERROR }) if (this.targetPriorityFee === undefined) { this.targetPriorityFee = this.txmgr.maxPriorityFeePerGas ?? 0n } return } this.targetPriorityFee = targetPriorityFeeResult.value + + span.addEvent("txm.gas-price-oracle.on-new-block.succeeded", { + blockNumber: Number(block.number), + expectedNextBaseFeePerGas: Number(this.expectedNextBaseFeePerGas), + targetPriorityFee: Number(this.targetPriorityFee), + }) } + @TraceMethod("txm.gas-price-oracle.calculate-target-priority-fee") private async calculateTargetPriorityFee(): Promise> { const feeHistory = await this.txmgr.viemClient.safeGetFeeHistory({ blockCount: this.txmgr.priorityFeeAnalysisBlocks, diff --git a/packages/txm/lib/HookManager.ts b/packages/txm/lib/HookManager.ts index f8e155f9dc..2f31a21d1d 100644 --- a/packages/txm/lib/HookManager.ts +++ b/packages/txm/lib/HookManager.ts @@ -2,6 +2,7 @@ import type { LatestBlock } from "./BlockMonitor" import { Topics, eventBus } from "./EventBus.js" import type { Transaction } from "./Transaction.js" import type { AttemptSubmissionErrorCause } from "./TransactionSubmitter" +import { TraceMethod } from "./telemetry/traces" export enum TxmHookType { All = "All", @@ -114,6 +115,7 @@ export class HookManager { } } + @TraceMethod("txm.hook-manager.on-transaction-status-changed") private async onTransactionStatusChanged(payload: { transaction: Transaction }): Promise { this.hooks[TxmHookType.TransactionStatusChanged].forEach((handler) => handler(payload.transaction)) @@ -125,6 +127,7 @@ export class HookManager { ) } + @TraceMethod("txm.hook-manager.on-transaction-save-failed") private async onTransactionSaveFailed(payload: { transaction: Transaction }): Promise { @@ -138,6 +141,7 @@ export class HookManager { ) } + @TraceMethod("txm.hook-manager.on-new-block") private async onNewBlock(block: LatestBlock): Promise { this.hooks[TxmHookType.NewBlock].forEach((handler) => handler(block)) @@ -149,6 +153,7 @@ export class HookManager { ) } + @TraceMethod("txm.hook-manager.on-transaction-submission-failed") private async onTransactionSubmissionFailed(payload: { transaction: Transaction description: string @@ -168,12 +173,14 @@ export class HookManager { ) } + @TraceMethod("txm.hook-manager.on-rpc-is-down") private async onRpcIsDown(): Promise { this.hooks[TxmHookType.RpcIsDown].forEach((handler) => handler()) this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsDown })) } + @TraceMethod("txm.hook-manager.on-rpc-is-up") private async onRpcIsUp(): Promise { this.hooks[TxmHookType.RpcIsUp].forEach((handler) => handler()) diff --git a/packages/txm/lib/NonceManager.ts b/packages/txm/lib/NonceManager.ts index 6181c9bf9d..fa766a97a5 100644 --- a/packages/txm/lib/NonceManager.ts +++ b/packages/txm/lib/NonceManager.ts @@ -1,6 +1,8 @@ import { LogTag, Logger } from "@happy.tech/common" +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import type { TransactionManager } from "./TransactionManager" import { TxmMetrics } from "./telemetry/metrics" +import { TraceMethod } from "./telemetry/traces" /* /* @@ -72,21 +74,35 @@ export class NonceManager { } } + @TraceMethod("txm.nonce-manager.request-nonce") public requestNonce(): number { + const span = trace.getSpan(context.active())! + if (this.returnedNonceQueue.length > 0) { const nonce = this.returnedNonceQueue.shift()! + span.addEvent("txm.nonce-manager.request-nonce.from-queue", { + nonce, + }) TxmMetrics.getInstance().returnedNonceQueueGauge.record(this.returnedNonceQueue.length) return nonce } const requestedNonce = this.nonce this.nonce = this.nonce + 1 + + span.addEvent("txm.nonce-manager.request-nonce.new-nonce", { + nonce: requestedNonce, + }) + TxmMetrics.getInstance().nonceManagerGauge.record(this.nonce) return requestedNonce } // Only called when a transaction that has reserved a nonce ultimately doesn't reach the mempool + @TraceMethod("txm.nonce-manager.return-nonce") public returnNonce(nonce: number) { + const span = trace.getSpan(context.active())! + const index = this.returnedNonceQueue.findIndex((n) => nonce < n) if (index === -1) { @@ -95,11 +111,18 @@ export class NonceManager { this.returnedNonceQueue.splice(index, 0, nonce) } + span.addEvent("txm.nonce-manager.return-nonce.added-to-queue", { + nonce, + }) + TxmMetrics.getInstance().returnedNonceCounter.add(1) TxmMetrics.getInstance().returnedNonceQueueGauge.record(this.returnedNonceQueue.length) } + @TraceMethod("txm.nonce-manager.resync") public async resync() { + const span = trace.getSpan(context.active())! + const address = this.txmgr.viemWallet.account.address const blockchainNonceResult = await this.txmgr.viemClient.safeGetTransactionCount({ @@ -111,11 +134,17 @@ export class NonceManager { error: blockchainNonceResult.error, }) this.txmgr.rpcLivenessMonitor.trackError() + span.recordException(blockchainNonceResult.error) + span.setStatus({ code: SpanStatusCode.ERROR }) return } this.txmgr.rpcLivenessMonitor.trackSuccess() this.maxExecutedNonce = blockchainNonceResult.value - 1 + + span.addEvent("txm.nonce-manager.resync.updated-max-executed-nonce", { + maxExecutedNonce: this.maxExecutedNonce, + }) } } diff --git a/packages/txm/lib/RetryPolicyManager.ts b/packages/txm/lib/RetryPolicyManager.ts index 59b6b4daee..97617e6579 100644 --- a/packages/txm/lib/RetryPolicyManager.ts +++ b/packages/txm/lib/RetryPolicyManager.ts @@ -2,6 +2,7 @@ import { type Result, err, ok } from "neverthrow" import { type TransactionReceipt, encodeErrorResult } from "viem" import type { Attempt, Transaction } from "./Transaction" import type { TransactionManager } from "./TransactionManager" +import { TraceMethod } from "./telemetry/traces" export type RevertedTransactionReceipt = TransactionReceipt @@ -24,6 +25,7 @@ export interface RetryPolicyManager { * It will only retry if the transaction runs out of gas. */ export class DefaultRetryPolicyManager implements RetryPolicyManager { + @TraceMethod("txm.retry-policy-manager.should-retry") public async shouldRetry( transactionManager: TransactionManager, _: Transaction, @@ -40,6 +42,7 @@ export class DefaultRetryPolicyManager implements RetryPolicyManager { * @param attempt - The attempt * @returns The revert message or undefined if it cannot be retrieved or the rpc does not allow debug */ + @TraceMethod("txm.retry-policy-manager.get-revert-message-and-output") protected async getRevertMessageAndOutput( transactionManager: TransactionManager, attempt: Attempt, @@ -57,6 +60,7 @@ export class DefaultRetryPolicyManager implements RetryPolicyManager { return { message: traceResult.value.error, output: traceResult.value.output } } + @TraceMethod("txm.retry-policy-manager.is-out-of-gas") protected async isOutOfGas( transactionManager: TransactionManager, attempt: Attempt, @@ -71,6 +75,7 @@ export class DefaultRetryPolicyManager implements RetryPolicyManager { return message === "Out of Gas" } + @TraceMethod("txm.retry-policy-manager.is-reverted-with-message") protected async isRevertedWithMessage( transactionManager: TransactionManager, attempt: Attempt, @@ -81,6 +86,7 @@ export class DefaultRetryPolicyManager implements RetryPolicyManager { return _message === message } + @TraceMethod("txm.retry-policy-manager.is-custom-error") protected async isCustomError( transactionManager: TransactionManager, transaction: Transaction, diff --git a/packages/txm/lib/RpcLivenessMonitor.ts b/packages/txm/lib/RpcLivenessMonitor.ts index c068f4b571..bbec70a479 100644 --- a/packages/txm/lib/RpcLivenessMonitor.ts +++ b/packages/txm/lib/RpcLivenessMonitor.ts @@ -1,8 +1,10 @@ import { LogTag, Logger } from "@happy.tech/common" +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import { Topics } from "./EventBus" import { eventBus } from "./EventBus" import type { TransactionManager } from "./TransactionManager" import { TxmMetrics } from "./telemetry/metrics" +import { TraceMethod } from "./telemetry/traces" interface SecondCounters { successCount: number @@ -47,6 +49,7 @@ export class RpcLivenessMonitor { }) } + @TraceMethod("txm.rpc-liveness-monitor.track-success") trackSuccess() { const currentSecond = this.getCurrentSecond() @@ -58,6 +61,7 @@ export class RpcLivenessMonitor { this.checkIfDown() } + @TraceMethod("txm.rpc-liveness-monitor.track-error") trackError() { const currentSecond = this.getCurrentSecond() @@ -69,7 +73,10 @@ export class RpcLivenessMonitor { this.checkIfDown() } + @TraceMethod("txm.rpc-liveness-monitor.check-if-down") private checkIfDown() { + const span = trace.getSpan(context.active())! + if (this.isAlive && this.ratioOfSuccess() < this.txmgr.livenessThreshold) { this.isAlive = false this.isDownSince = new Date() @@ -78,13 +85,20 @@ export class RpcLivenessMonitor { eventBus.emit(Topics.RpcIsDown) Logger.instance.error(LogTag.TXM, "Detected that the RPC is not healthy") + span.addEvent("txm.rpc-liveness-monitor.check-if-down.is-down") + span.setStatus({ code: SpanStatusCode.ERROR }) + this.checkIfHealthyInterval = setInterval(() => { this.checkIfHealthy() }, this.txmgr.livenessCheckInterval) + } else { + span.addEvent("txm.rpc-liveness-monitor.check-if-down.is-up") } } + @TraceMethod("txm.rpc-liveness-monitor.check-if-healthy") private async checkIfHealthy() { + const span = trace.getSpan(context.active())! if (this.isDownSince && this.isDownSince.getTime() + this.txmgr.livenessDownDelay > new Date().getTime()) { return } @@ -92,13 +106,16 @@ export class RpcLivenessMonitor { const chainIdResult = await this.txmgr.viemClient.safeGetChainId() if (chainIdResult.isOk()) { + span.addEvent("txm.rpc-liveness-monitor.check-if-healthy.increment-success-count") this.consecutiveSuccessesWhileCheckingIfHealthy++ } else { + span.addEvent("txm.rpc-liveness-monitor.check-if-healthy.reset-success-count") this.consecutiveSuccessesWhileCheckingIfHealthy = 0 } if (this.consecutiveSuccessesWhileCheckingIfHealthy > this.txmgr.livenessSuccessCount) { Logger.instance.info(LogTag.TXM, "Detected that the RPC is healthy") + span.addEvent("txm.rpc-liveness-monitor.check-if-healthy.is-up") this.isAlive = true this.isDownSince = null this.consecutiveSuccessesWhileCheckingIfHealthy = 0 diff --git a/packages/txm/lib/Transaction.ts b/packages/txm/lib/Transaction.ts index 48ae0824f7..113654f755 100644 --- a/packages/txm/lib/Transaction.ts +++ b/packages/txm/lib/Transaction.ts @@ -1,10 +1,12 @@ import { type UUID, bigIntReplacer, bigIntReviver, createUUID } from "@happy.tech/common" +import { context, trace } from "@opentelemetry/api" import type { Insertable, Selectable } from "kysely" import type { Address, ContractFunctionArgs, Hash } from "viem" import type { LatestBlock } from "./BlockMonitor" import { Topics, eventBus } from "./EventBus.js" import type { TransactionTable } from "./db/types.js" import { TxmMetrics } from "./telemetry/metrics" +import { TraceMethod } from "./telemetry/traces" export enum TransactionStatus { /** @@ -202,7 +204,15 @@ export class Transaction { return this.deadline ? block.timestamp + blockTime > BigInt(this.deadline) : false } + @TraceMethod("txm.transaction.change-status") changeStatus(status: TransactionStatus): void { + const span = trace.getSpan(context.active())! + + span.addEvent("transaction.change-status", { + transactionIntentId: this.intentId, + status, + }) + this.status = status this.markUpdated() @@ -275,4 +285,8 @@ export class Transaction { pendingFlush: false, }) } + + toJson(): string { + return JSON.stringify(this, bigIntReplacer) + } } diff --git a/packages/txm/lib/TransactionCollector.ts b/packages/txm/lib/TransactionCollector.ts index 466d452593..2a9eac9b9e 100644 --- a/packages/txm/lib/TransactionCollector.ts +++ b/packages/txm/lib/TransactionCollector.ts @@ -1,9 +1,11 @@ import { LogTag, Logger } from "@happy.tech/common" +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import type { LatestBlock } from "./BlockMonitor.js" import { Topics, eventBus } from "./EventBus.js" import { AttemptType, TransactionStatus } from "./Transaction.js" import type { TransactionManager } from "./TransactionManager.js" import { TxmMetrics } from "./telemetry/metrics" +import { TraceMethod } from "./telemetry/traces" /** * This module is responsible for retrieving transactions from the originators when a new block is received. @@ -20,7 +22,10 @@ export class TransactionCollector { eventBus.on(Topics.NewBlock, this.onNewBlock.bind(this)) } + @TraceMethod("txm.transaction-collector.on-new-block") private async onNewBlock(block: LatestBlock) { + const span = trace.getSpan(context.active())! + const { maxFeePerGas, maxPriorityFeePerGas } = this.txmgr.gasPriceOracle.suggestGasForNextBlock() const transactionUnsorted = await Promise.all(this.txmgr.collectors.map((c) => c(block))) @@ -32,6 +37,14 @@ export class TransactionCollector { return transaction }) + for (const transaction of transactionsBatch) { + span.addEvent("txm.transaction-collector.on-new-block.collected-transaction", { + transactionIntentId: transaction.intentId, + transaction: transaction.toJson(), + blockNumber: Number(block.number), + }) + } + const saveResult = await this.txmgr.transactionRepository.saveTransactions(transactionsBatch) if (saveResult.isErr()) { @@ -39,6 +52,10 @@ export class TransactionCollector { eventBus.emit(Topics.TransactionSaveFailed, { transaction }) } Logger.instance.error(LogTag.TXM, "Error saving transactions", saveResult.error) + + span.addEvent("txm.transaction-collector.on-new-block.save-failed") + span.recordException(saveResult.error) + span.setStatus({ code: SpanStatusCode.ERROR }) return } @@ -46,6 +63,8 @@ export class TransactionCollector { if (!this.txmgr.rpcLivenessMonitor.isAlive) { Logger.instance.error(LogTag.TXM, "RPC is not alive, skipping attempt to submit transactions") + span.addEvent("txm.transaction-collector.on-new-block.rpc-not-alive") + span.setStatus({ code: SpanStatusCode.ERROR }) return } @@ -53,6 +72,11 @@ export class TransactionCollector { transactionsBatch.map(async (transaction) => { const nonce = this.txmgr.nonceManager.requestNonce() + span.addEvent("txm.transaction-collector.on-new-block.requested-nonce", { + transactionIntentId: transaction.intentId, + nonce, + }) + if (transaction.status === TransactionStatus.Interrupted) { transaction.changeStatus(TransactionStatus.Pending) } @@ -71,7 +95,18 @@ export class TransactionCollector { cause: submissionResult.error.cause, }) + span.addEvent("txm.transaction-collector.on-new-block.submission-failed", { + transactionIntentId: transaction.intentId, + description: submissionResult.error.description, + cause: submissionResult.error.cause, + }) + span.setStatus({ code: SpanStatusCode.ERROR }) + if (!submissionResult.error.flushed) { + span.addEvent("txm.transaction-collector.on-new-block.returned-nonce", { + transactionIntentId: transaction.intentId, + nonce, + }) this.txmgr.nonceManager.returnNonce(nonce) } } diff --git a/packages/txm/lib/TransactionManager.ts b/packages/txm/lib/TransactionManager.ts index d32b73e11e..2bd8d4cb7f 100644 --- a/packages/txm/lib/TransactionManager.ts +++ b/packages/txm/lib/TransactionManager.ts @@ -1,5 +1,7 @@ import type { UUID } from "@happy.tech/common" +import { trace } from "@opentelemetry/api" import type { MetricReader } from "@opentelemetry/sdk-metrics" +import type { SpanExporter } from "@opentelemetry/sdk-trace-node" import type { Result } from "neverthrow" import { type Abi, @@ -229,11 +231,28 @@ export type TransactionManagerConfig = { */ port?: number /** - * Custom metric readers to use instead of the default Prometheus reader. - * If provided, these readers will be used and the port setting will be ignored. + * Custom metric reader to use instead of the default Prometheus reader. + * If provided, this reader will be used and the port setting will be ignored. * If not provided, a default Prometheus reader will be configured using the specified port. */ - metricReaders?: MetricReader[] + metricReader?: MetricReader + } + + /** + * The traces configuration. + */ + traces?: { + /** + * Whether to enable traces collection. + * Defaults to false. + */ + active?: boolean + + /** + * The span exporter to use. + * Defaults to a console span exporter. + */ + spanExporter?: SpanExporter } } @@ -284,9 +303,11 @@ export class TransactionManager { constructor(_config: TransactionManagerConfig) { initializeTelemetry({ - active: _config.metrics?.active ?? true, - port: _config.metrics?.port ?? 9090, - metricReaders: _config.metrics?.metricReaders, + metricsActive: _config.metrics?.active ?? true, + prometheusPort: _config.metrics?.port ?? 9090, + userMetricReader: _config.metrics?.metricReader, + tracesActive: _config.traces?.active ?? false, + userTraceExporter: _config.traces?.spanExporter, }) this.collectors = [] @@ -352,6 +373,7 @@ export class TransactionManager { rpcErrorCounter: TxmMetrics.getInstance().rpcErrorCounter, rpcResponseTimeHistogram: TxmMetrics.getInstance().blockchainRpcResponseTimeHistogram, }, + trace.getTracer("txm"), ) this.viemClient = convertToSafeViemPublicClient( @@ -364,6 +386,7 @@ export class TransactionManager { rpcErrorCounter: TxmMetrics.getInstance().rpcErrorCounter, rpcResponseTimeHistogram: TxmMetrics.getInstance().blockchainRpcResponseTimeHistogram, }, + trace.getTracer("txm"), ) this.nonceManager = new NonceManager(this) diff --git a/packages/txm/lib/TransactionRepository.ts b/packages/txm/lib/TransactionRepository.ts index 8fa8af5cba..f6dd6499cd 100644 --- a/packages/txm/lib/TransactionRepository.ts +++ b/packages/txm/lib/TransactionRepository.ts @@ -1,11 +1,13 @@ import { unknownToError } from "@happy.tech/common" import type { UUID } from "@happy.tech/common" +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import { type Result, ResultAsync, err, ok } from "neverthrow" import { Topics, eventBus } from "./EventBus.js" import { NotFinalizedStatuses, Transaction } from "./Transaction.js" import type { TransactionManager } from "./TransactionManager.js" import { db } from "./db/driver.js" import { TxmMetrics } from "./telemetry/metrics" +import { TraceMethod } from "./telemetry/traces" /** * This module acts as intermediate layer between the library and the database. @@ -38,11 +40,15 @@ export class TransactionRepository { } } + @TraceMethod("txm.transaction-repository.get-not-finalized-transactions-older-than") getNotFinalizedTransactionsOlderThan(blockNumber: bigint): Transaction[] { return this.notFinalizedTransactions.filter((t) => t.collectionBlock && t.collectionBlock < blockNumber) } + @TraceMethod("txm.transaction-repository.get-transaction") async getTransaction(intentId: UUID): Promise> { + const span = trace.getSpan(context.active())! + const cachedTransaction = this.notFinalizedTransactions.find((t) => t.intentId === intentId) if (cachedTransaction) { @@ -72,6 +78,8 @@ export class TransactionRepository { TxmMetrics.getInstance().databaseErrorsCounter.add(1, { operation: "getTransaction", }) + span.recordException(persistedTransactionResult.error) + span.setStatus({ code: SpanStatusCode.ERROR }) return err(persistedTransactionResult.error) } @@ -80,7 +88,10 @@ export class TransactionRepository { return persistedTransaction ? ok(Transaction.fromDbRow(persistedTransaction)) : ok(undefined) } + @TraceMethod("txm.transaction-repository.save-transactions") async saveTransactions(transactions: Transaction[]): Promise> { + const span = trace.getSpan(context.active())! + const transactionsToFlush = transactions.filter((t) => t.pendingFlush) const notPersistedTransactions = transactions.filter((t) => t.notPersisted) @@ -121,6 +132,8 @@ export class TransactionRepository { TxmMetrics.getInstance().notFinalizedTransactionsGauge.record(this.notFinalizedTransactions.length) } else { + span.recordException(result.error) + span.setStatus({ code: SpanStatusCode.ERROR }) TxmMetrics.getInstance().databaseErrorsCounter.add(1, { operation: "saveTransactions", }) @@ -129,19 +142,24 @@ export class TransactionRepository { return result } + @TraceMethod("txm.transaction-repository.get-highest-nonce") getHighestNonce(): number | undefined { return this.notFinalizedTransactions.length > 0 ? Math.max(...this.notFinalizedTransactions.flatMap((t) => t.attempts.map((a) => a.nonce))) : undefined } + @TraceMethod("txm.transaction-repository.get-not-reserved-nonces-in-range") getNotReservedNoncesInRange(from: number, to: number): number[] { return Array.from({ length: to - from + 1 }, (_, i) => from + i).filter( (n) => !this.notFinalizedTransactions.some((t) => t.attempts.some((a) => a.nonce === n)), ) } + @TraceMethod("txm.transaction-repository.purge-finalized-transactions") async purgeFinalizedTransactions() { + const span = trace.getSpan(context.active())! + TxmMetrics.getInstance().databaseOperationsCounter.add(1, { operation: "purgeFinalizedTransactions", }) @@ -165,6 +183,8 @@ export class TransactionRepository { TxmMetrics.getInstance().databaseErrorsCounter.add(1, { operation: "purgeFinalizedTransactions", }) + span.recordException(result.error) + span.setStatus({ code: SpanStatusCode.ERROR }) } return result diff --git a/packages/txm/lib/TransactionSubmitter.ts b/packages/txm/lib/TransactionSubmitter.ts index 74374003bd..36fbf6ba6f 100644 --- a/packages/txm/lib/TransactionSubmitter.ts +++ b/packages/txm/lib/TransactionSubmitter.ts @@ -1,9 +1,12 @@ +import { LogTag, Logger, bigIntReplacer } from "@happy.tech/common" +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import { type Result, err, ok } from "neverthrow" import type { TransactionRequestEIP1559 } from "viem" import { TransactionRejectedRpcError, encodeFunctionData, keccak256 } from "viem" import type { EstimateGasErrorCause } from "./GasEstimator.js" import { type Attempt, AttemptType, type Transaction } from "./Transaction.js" import type { TransactionManager } from "./TransactionManager.js" +import { TraceMethod } from "./telemetry/traces" export type AttemptSubmissionParameters = Omit @@ -44,6 +47,7 @@ export class TransactionSubmitter { this.txmgr = txmgr } + @TraceMethod("txm.transaction-submitter.submit-new-attempt") public async submitNewAttempt( transaction: Transaction, payload: AttemptSubmissionParameters, @@ -62,15 +66,25 @@ export class TransactionSubmitter { ) } + @TraceMethod("txm.transaction-submitter.resubmit-attempt") async resubmitAttempt(transaction: Transaction, attempt: Attempt): Promise { return await this.sendAttempt(transaction, attempt, false) } + @TraceMethod("txm.transaction-submitter.send-attempt") private async sendAttempt( transaction: Transaction, attempt: Omit & Partial>, saveAttempt = false, ): Promise { + const span = trace.getSpan(context.active())! + + span.addEvent("txm.transaction-submitter.send-attempt.started", { + transactionIntentId: transaction.intentId, + transaction: transaction.toJson(), + payload: JSON.stringify(attempt, bigIntReplacer), + }) + let transactionRequest: TransactionRequestEIP1559 & { gas: bigint } if (attempt.type === AttemptType.Cancellation) { @@ -89,6 +103,12 @@ export class TransactionSubmitter { const abi = this.txmgr.abiManager.get(transaction.contractName) if (!abi) { + span.addEvent("txm.transaction-submitter.send-attempt.abi-not-found", { + transactionIntentId: transaction.intentId, + contractName: transaction.contractName, + }) + span.setStatus({ code: SpanStatusCode.ERROR }) + Logger.instance.error(LogTag.TXM, `ABI not found for contract ${transaction.contractName}`) return err({ cause: AttemptSubmissionErrorCause.ABINotFound, description: `ABI not found for contract ${transaction.contractName}`, @@ -105,6 +125,11 @@ export class TransactionSubmitter { const gasResult = await this.txmgr.gasEstimator.estimateGas(this.txmgr, transaction) if (gasResult.isErr()) { + span.addEvent("txm.transaction-submitter.send-attempt.gas-estimation-failed", { + transactionIntentId: transaction.intentId, + description: gasResult.error.description, + }) + span.setStatus({ code: SpanStatusCode.ERROR }) return err({ cause: gasResult.error.cause, description: gasResult.error.description, @@ -133,9 +158,16 @@ export class TransactionSubmitter { const signedTransactionResult = await this.txmgr.viemWallet.safeSignTransaction(transactionRequest) if (signedTransactionResult.isErr()) { + const description = `Failed to sign transaction ${transaction.intentId}. Details: ${signedTransactionResult.error}` + span.addEvent("txm.transaction-submitter.attempt-submission.failed-to-sign-transaction", { + transactionIntentId: transaction.intentId, + description, + }) + span.recordException(signedTransactionResult.error) + span.setStatus({ code: SpanStatusCode.ERROR }) return err({ cause: AttemptSubmissionErrorCause.FailedToSignTransaction, - description: `Failed to sign transaction ${transaction.intentId} for retry. Details: ${signedTransactionResult.error}`, + description, flushed: false, }) } @@ -172,10 +204,21 @@ export class TransactionSubmitter { }) if (sendRawTransactionResult.isErr()) { + const description = `Failed to send raw transaction ${transaction.intentId}. Details: ${sendRawTransactionResult.error}` + span.addEvent("txm.transaction-submitter.attempt-submission.failed-to-send-raw-transaction", { + transactionIntentId: transaction.intentId, + description, + }) + span.recordException(sendRawTransactionResult.error) + span.setStatus({ code: SpanStatusCode.ERROR }) if ( sendRawTransactionResult.error instanceof TransactionRejectedRpcError && sendRawTransactionResult.error.message.includes("nonce too low") ) { + span.addEvent("txm.transaction-submitter.attempt-submission.nonce-too-low", { + transactionIntentId: transaction.intentId, + nonce: attempt.nonce, + }) this.txmgr.nonceManager.resync() } diff --git a/packages/txm/lib/TxMonitor.ts b/packages/txm/lib/TxMonitor.ts index d15faca8b1..0638310305 100644 --- a/packages/txm/lib/TxMonitor.ts +++ b/packages/txm/lib/TxMonitor.ts @@ -1,4 +1,5 @@ -import { LogTag, Logger, bigIntMax, promiseWithResolvers, unknownToError } from "@happy.tech/common" +import { LogTag, Logger, bigIntMax, bigIntReplacer, promiseWithResolvers, unknownToError } from "@happy.tech/common" +import { SpanStatusCode, context, trace } from "@opentelemetry/api" import { type Result, ResultAsync, err, ok } from "neverthrow" import { type GetTransactionReceiptErrorType, type TransactionReceipt, TransactionReceiptNotFoundError } from "viem" import type { LatestBlock } from "./BlockMonitor.js" @@ -7,6 +8,7 @@ import type { RevertedTransactionReceipt } from "./RetryPolicyManager" import { type Attempt, AttemptType, type Transaction, TransactionStatus } from "./Transaction.js" import type { TransactionManager } from "./TransactionManager.js" import { TxmMetrics } from "./telemetry/metrics" +import { TraceMethod } from "./telemetry/traces" type AttemptWithReceipt = { attempt: Attempt; receipt: TransactionReceipt } @@ -39,14 +41,24 @@ export class TxMonitor { eventBus.on(Topics.NewBlock, this.onNewBlock.bind(this)) } + @TraceMethod("txm.tx-monitor.on-new-block") private async onNewBlock(block: LatestBlock) { + const span = trace.getSpan(context.active())! + + span.addEvent("txm.tx-monitor.on-new-block.started", { + blockNumber: Number(block.number), + }) + if (this.locked) { + span.addEvent("txm.tx-monitor.on-new-block.locked") const pending = promiseWithResolvers() this.pendingBlockPromises.push(pending) try { await pending.promise + span.addEvent("txm.tx-monitor.on-new-block.lock-resolved") } catch { // A more recent block came while we were waiting, abort. + span.addEvent("txm.tx-monitor.on-new-block.lock-aborted") return } } @@ -55,6 +67,8 @@ export class TxMonitor { try { await this.handleNewBlock(block) } catch (error) { + span.recordException(unknownToError(error)) + span.setStatus({ code: SpanStatusCode.ERROR }) Logger.instance.error(LogTag.TXM, "Error in handleNewBlock: ", error) } this.locked = false @@ -63,8 +77,17 @@ export class TxMonitor { this.pendingBlockPromises.forEach((p) => p.reject()) } + @TraceMethod("txm.tx-monitor.handle-new-block") private async handleNewBlock(block: LatestBlock) { + const span = trace.getSpan(context.active())! + + span.addEvent("txm.tx-monitor.handle-new-block.started", { + blockNumber: Number(block.number), + }) + if (!this.transactionManager.rpcLivenessMonitor.isAlive) { + span.addEvent("txm.tx-monitor.handle-new-block.rpc-not-alive") + span.setStatus({ code: SpanStatusCode.ERROR }) Logger.instance.warn(LogTag.TXM, "RPC is not alive, skipping attempt to monitor transactions") return } @@ -73,9 +96,20 @@ export class TxMonitor { block.number, ) + for (const transaction of transactions) { + span.addEvent("txm.tx-monitor.handle-new-block.monitoring-transaction", { + transactionIntentId: transaction.intentId, + }) + } + const promises = transactions.map(async (transaction) => { const inAirAttempts = transaction.getInAirAttempts() + span.addEvent("txm.tx-monitor.handle-new-block.monitoring-transaction.in-air-attempts", { + transactionIntentId: transaction.intentId, + inAirAttempts: JSON.stringify(inAirAttempts, bigIntReplacer), + }) + // This could happen if, on the first try, the attempt to submit the transaction fails before flush if (inAirAttempts.length === 0) { return this.handleNotAttemptedTransaction(transaction, block) @@ -118,22 +152,30 @@ export class TxMonitor { const attemptOrResults = await Promise.race([receiptPromise, Promise.all(promises)]) if (Array.isArray(attemptOrResults)) { + span.addEvent("txm.tx-monitor.handle-new-block.monitoring-transaction.array-of-results", { + transactionIntentId: transaction.intentId, + attemptOrResults: JSON.stringify(attemptOrResults, bigIntReplacer), + }) + /* If there are any errors, then we should return because there is a risk that the transaction was executed and we don’t know */ if (attemptOrResults.some((v) => v.isErr())) { - Logger.instance.error( - LogTag.TXM, - `Failed to get transaction receipt for transaction ${transaction.intentId}`, - ) + const description = `Failed to get transaction receipt for transaction ${transaction.intentId}` + span.recordException(new Error(description)) + span.setStatus({ code: SpanStatusCode.ERROR }) + Logger.instance.error(LogTag.TXM, description) return } const nonce = transaction.lastAttempt?.nonce if (nonce === undefined) { - console.error(`Transaction ${transaction.intentId} inconsistent state: no nonce found`) + const description = `Transaction ${transaction.intentId} inconsistent state: no nonce found` + span.recordException(new Error(description)) + span.setStatus({ code: SpanStatusCode.ERROR }) + Logger.instance.error(LogTag.TXM, description) return } @@ -149,6 +191,11 @@ export class TxMonitor { const { attempt, receipt } = attemptOrResults + span.addEvent("txm.tx-monitor.handle-new-block.monitoring-transaction.receipt-received", { + transactionIntentId: transaction.intentId, + receipt: JSON.stringify(receipt, bigIntReplacer), + }) + TxmMetrics.getInstance().transactionInclusionBlockHistogram.record( Number(block.number - transaction.collectionBlock!), ) @@ -190,6 +237,7 @@ export class TxMonitor { } } + @TraceMethod("txm.tx-monitor.calc-replacement-fee") private calcReplacementFee( maxFeePerGas: bigint, maxPriorityFeePerGas: bigint, @@ -209,6 +257,7 @@ export class TxMonitor { } } + @TraceMethod("txm.tx-monitor.should-emit-new-attempt") private shouldEmitNewAttempt(attempt: Attempt): boolean { const { expectedNextBaseFeePerGas, targetPriorityFee } = this.transactionManager.gasPriceOracle @@ -218,14 +267,17 @@ export class TxMonitor { ) } + @TraceMethod("txm.tx-monitor.handle-expired-transaction") private async handleExpiredTransaction(transaction: Transaction): Promise { + const span = trace.getSpan(context.active())! + const attempt = transaction.lastAttempt if (!attempt) { - Logger.instance.error( - LogTag.TXM, - `Transaction ${transaction.intentId} inconsistent state: no attempt found in handleExpiredTransaction`, - ) + const description = `Transaction ${transaction.intentId} inconsistent state: no attempt found in handleExpiredTransaction` + span.recordException(new Error(description)) + span.setStatus({ code: SpanStatusCode.ERROR }) + Logger.instance.error(LogTag.TXM, description) return } @@ -234,24 +286,39 @@ export class TxMonitor { attempt.maxPriorityFeePerGas, ) + span.addEvent("txm.tx-monitor.handle-expired-transaction.attempting-submission", { + transactionIntentId: transaction.intentId, + nonce: attempt.nonce, + maxFeePerGas: Number(replacementMaxFeePerGas), + maxPriorityFeePerGas: Number(replacementMaxPriorityFeePerGas), + }) + transaction.changeStatus(TransactionStatus.Cancelling) - await this.transactionManager.transactionSubmitter.submitNewAttempt(transaction, { + const submissionResult = await this.transactionManager.transactionSubmitter.submitNewAttempt(transaction, { type: AttemptType.Cancellation, nonce: attempt.nonce, maxFeePerGas: replacementMaxFeePerGas, maxPriorityFeePerGas: replacementMaxPriorityFeePerGas, }) + + if (submissionResult.isErr()) { + span.recordException(unknownToError(submissionResult.error)) + span.setStatus({ code: SpanStatusCode.ERROR }) + } } + @TraceMethod("txm.tx-monitor.handle-stuck-transaction") private async handleStuckTransaction(transaction: Transaction): Promise { + const span = trace.getSpan(context.active())! + const attempt = transaction.lastAttempt if (!attempt) { - Logger.instance.error( - LogTag.TXM, - `Transaction ${transaction.intentId} inconsistent state: no attempt found in handleStuckTransaction`, - ) + const description = `Transaction ${transaction.intentId} inconsistent state: no attempt found in handleStuckTransaction` + span.recordException(new Error(description)) + span.setStatus({ code: SpanStatusCode.ERROR }) + Logger.instance.error(LogTag.TXM, description) return } @@ -260,7 +327,11 @@ export class TxMonitor { LogTag.TXM, `Transaction ${transaction.intentId} is stuck, but the gas price is still sufficient for current network conditions. Sending same attempt again.`, ) - await this.transactionManager.transactionSubmitter.resubmitAttempt(transaction, attempt) + const result = await this.transactionManager.transactionSubmitter.resubmitAttempt(transaction, attempt) + if (result.isErr()) { + span.recordException(unknownToError(result.error)) + span.setStatus({ code: SpanStatusCode.ERROR }) + } return } @@ -274,24 +345,50 @@ export class TxMonitor { attempt.maxPriorityFeePerGas, ) - await this.transactionManager.transactionSubmitter.submitNewAttempt(transaction, { + span.addEvent("txm.tx-monitor.handle-stuck-transaction.attempting-submission", { + transactionIntentId: transaction.intentId, + nonce: attempt.nonce, + maxFeePerGas: Number(replacementMaxFeePerGas), + maxPriorityFeePerGas: Number(replacementMaxPriorityFeePerGas), + }) + + const submissionResult = await this.transactionManager.transactionSubmitter.submitNewAttempt(transaction, { type: AttemptType.Original, nonce: attempt.nonce, maxFeePerGas: replacementMaxFeePerGas, maxPriorityFeePerGas: replacementMaxPriorityFeePerGas, }) + + if (submissionResult.isErr()) { + span.recordException(unknownToError(submissionResult.error)) + span.setStatus({ code: SpanStatusCode.ERROR }) + } } + @TraceMethod("txm.tx-monitor.handle-not-attempted-transaction") private async handleNotAttemptedTransaction(transaction: Transaction, block: LatestBlock): Promise { + const span = trace.getSpan(context.active())! + if (transaction.isExpired(block, this.transactionManager.blockTime)) { return transaction.changeStatus(TransactionStatus.Expired) } const nonce = this.transactionManager.nonceManager.requestNonce() + span.addEvent("txm.tx-monitor.handle-not-attempted-transaction.requested-nonce", { + transactionIntentId: transaction.intentId, + nonce, + }) + const { maxFeePerGas: marketMaxFeePerGas, maxPriorityFeePerGas: marketMaxPriorityFeePerGas } = this.transactionManager.gasPriceOracle.suggestGasForNextBlock() + span.addEvent("txm.tx-monitor.handle-not-attempted-transaction.suggested-gas-price", { + transactionIntentId: transaction.intentId, + maxFeePerGas: Number(marketMaxFeePerGas), + maxPriorityFeePerGas: Number(marketMaxPriorityFeePerGas), + }) + const submissionResult = await this.transactionManager.transactionSubmitter.submitNewAttempt(transaction, { type: AttemptType.Original, nonce, @@ -300,15 +397,32 @@ export class TxMonitor { }) if (submissionResult.isErr() && !submissionResult.error.flushed) { + span.recordException(unknownToError(submissionResult.error)) + span.setStatus({ code: SpanStatusCode.ERROR }) this.transactionManager.nonceManager.returnNonce(nonce) } } + @TraceMethod("txm.tx-monitor.handle-retry-transaction") private async handleRetryTransaction(transaction: Transaction): Promise { + const span = trace.getSpan(context.active())! + const nonce = this.transactionManager.nonceManager.requestNonce() + + span.addEvent("txm.tx-monitor.handle-retry-transaction.requested-nonce", { + transactionIntentId: transaction.intentId, + nonce, + }) + const { maxFeePerGas: marketMaxFeePerGas, maxPriorityFeePerGas: marketMaxPriorityFeePerGas } = this.transactionManager.gasPriceOracle.suggestGasForNextBlock() + span.addEvent("txm.tx-monitor.handle-retry-transaction.suggested-gas-price", { + transactionIntentId: transaction.intentId, + maxFeePerGas: Number(marketMaxFeePerGas), + maxPriorityFeePerGas: Number(marketMaxPriorityFeePerGas), + }) + const submissionResult = await this.transactionManager.transactionSubmitter.submitNewAttempt(transaction, { type: AttemptType.Original, nonce, @@ -317,6 +431,8 @@ export class TxMonitor { }) if (submissionResult.isErr() && !submissionResult.error.flushed) { + span.recordException(unknownToError(submissionResult.error)) + span.setStatus({ code: SpanStatusCode.ERROR }) this.transactionManager.nonceManager.returnNonce(nonce) } } diff --git a/packages/txm/lib/telemetry/instrumentation.ts b/packages/txm/lib/telemetry/instrumentation.ts index 4a80549ffb..7c107ce126 100644 --- a/packages/txm/lib/telemetry/instrumentation.ts +++ b/packages/txm/lib/telemetry/instrumentation.ts @@ -1,7 +1,8 @@ -import opentelemetry from "@opentelemetry/api" import { PrometheusExporter } from "@opentelemetry/exporter-prometheus" import { Resource } from "@opentelemetry/resources" -import { MeterProvider, type MetricReader } from "@opentelemetry/sdk-metrics" +import type { MetricReader } from "@opentelemetry/sdk-metrics" +import { NodeSDK } from "@opentelemetry/sdk-node" +import { ConsoleSpanExporter, type SpanExporter } from "@opentelemetry/sdk-trace-node" import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from "@opentelemetry/semantic-conventions" const resource = Resource.default().merge( @@ -12,18 +13,33 @@ const resource = Resource.default().merge( ) export function initializeTelemetry({ - active, - port, - metricReaders, -}: { active: boolean; port: number; metricReaders?: MetricReader[] }): void { - if (!active) { - return + metricsActive, + prometheusPort, + userMetricReader, + tracesActive, + userTraceExporter, +}: { + metricsActive: boolean + prometheusPort: number + userMetricReader?: MetricReader + userTraceExporter?: SpanExporter + tracesActive?: boolean +}): void { + let metricReader: MetricReader | undefined + if (metricsActive) { + metricReader = userMetricReader || new PrometheusExporter({ port: prometheusPort }) } - const meterProvider = new MeterProvider({ - resource: resource, - readers: metricReaders || [new PrometheusExporter({ port })], + let traceExporter: SpanExporter | undefined + if (tracesActive) { + traceExporter = userTraceExporter || new ConsoleSpanExporter() + } + + const sdk = new NodeSDK({ + resource, + traceExporter, + metricReader, }) - opentelemetry.metrics.setGlobalMeterProvider(meterProvider) + sdk.start() } diff --git a/packages/txm/lib/telemetry/traces.ts b/packages/txm/lib/telemetry/traces.ts new file mode 100644 index 0000000000..389974eda0 --- /dev/null +++ b/packages/txm/lib/telemetry/traces.ts @@ -0,0 +1,55 @@ +import { unknownToError } from "@happy.tech/common" +import { context, trace } from "@opentelemetry/api" + +/** + * A method decorator that adds OpenTelemetry tracing to any method. It creates a new span with the provided name + * (or the method name if not provided), executes the decorated method, and ensures the span is properly closed. + * + * The decorator handles both synchronous and asynchronous methods appropriately. For synchronous methods, + * the span is ended immediately after execution. For asynchronous methods, the span is ended after the promise + * resolves or rejects. This approach preserves the original method signature (sync methods remain sync, + * async remain async). + * + * If an exception occurs during execution, it is recorded in the span before being re-thrown. + * + * @param spanName - Optional custom name for the span. If not provided, the method name is used. + */ +export function TraceMethod(spanName?: string) { + return (_target: unknown, propertyKey: string, descriptor: PropertyDescriptor) => { + const originalMethod = descriptor.value + descriptor.value = function (...args: unknown[]) { + const tracer = trace.getTracer("txm") + const name = spanName || propertyKey + const span = tracer.startSpan(name) + + // IMPORTANT: The callback must not be async to preserve the original method's synchronicity. + // An async callback would implicitly return a Promise, which would convert synchronous methods + // into asynchronous ones, breaking caller expectations and type signatures. + return context.with(trace.setSpan(context.active(), span), () => { + try { + const result = originalMethod.apply(this, args) + + if (result instanceof Promise) { + return result + .then((value) => { + span.end() + return value + }) + .catch((err) => { + span.recordException(unknownToError(err)) + span.end() + throw err + }) + } + span.end() + return result + } catch (err) { + span.recordException(unknownToError(err)) + span.end() + throw err + } + }) + } + return descriptor + } +} diff --git a/packages/txm/lib/utils/safeViemClients.ts b/packages/txm/lib/utils/safeViemClients.ts index 9c1ba6735a..6bc57ea723 100644 --- a/packages/txm/lib/utils/safeViemClients.ts +++ b/packages/txm/lib/utils/safeViemClients.ts @@ -1,5 +1,5 @@ -import { unknownToError } from "@happy.tech/common" -import type { Counter, Histogram } from "@opentelemetry/api" +import { bigIntReplacer, unknownToError } from "@happy.tech/common" +import type { Counter, Histogram, Tracer } from "@opentelemetry/api" import { ResultAsync } from "neverthrow" import type { Account, @@ -89,6 +89,7 @@ export interface SafeViemPublicClient extends ViemPublicClient { rpcCounter: Counter | undefined rpcErrorCounter: Counter | undefined rpcResponseTimeHistogram: Histogram | undefined + tracer: Tracer | undefined safeEstimateGas: ( ...args: Parameters @@ -117,14 +118,17 @@ export interface MetricsHandlers { export function convertToSafeViemPublicClient( client: ViemPublicClient, metrics?: MetricsHandlers, + tracer?: Tracer, ): SafeViemPublicClient { const safeClient = client as SafeViemPublicClient safeClient.rpcCounter = metrics?.rpcCounter safeClient.rpcErrorCounter = metrics?.rpcErrorCounter safeClient.rpcResponseTimeHistogram = metrics?.rpcResponseTimeHistogram + safeClient.tracer = tracer safeClient.safeEstimateGas = (...args: Parameters) => { + const span = safeClient.tracer?.startSpan("safe-viem-public-client.estimate-gas") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "estimateGas" }) const startTime = Date.now() @@ -133,17 +137,24 @@ export function convertToSafeViemPublicClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "estimateGas" }) + span?.addEvent("safe-viem-public-client.estimate-gas.success", { + result: JSON.stringify(result, bigIntReplacer), + }) + span?.end() return result }) .mapErr((error) => { if (safeClient.rpcErrorCounter) { safeClient.rpcErrorCounter.add(1, { method: "estimateGas" }) } + span?.recordException(error) + span?.end() return error as EstimateGasErrorType }) } safeClient.safeGetTransactionReceipt = (...args: Parameters) => { + const span = safeClient.tracer?.startSpan("safe-viem-public-client.get-transaction-receipt") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "getTransactionReceipt" }) const startTime = Date.now() @@ -152,17 +163,24 @@ export function convertToSafeViemPublicClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "getTransactionReceipt" }) + span?.addEvent("safe-viem-public-client.get-transaction-receipt.success", { + result: JSON.stringify(result, bigIntReplacer), + }) + span?.end() return result }) .mapErr((error) => { if (safeClient.rpcErrorCounter) { safeClient.rpcErrorCounter.add(1, { method: "getTransactionReceipt" }) } + span?.recordException(error) + span?.end() return error as GetTransactionReceiptErrorType }) } safeClient.safeDebugTransaction = (...args: DebugTransactionSchema["Parameters"]) => { + const span = safeClient.tracer?.startSpan("safe-viem-public-client.debug-transaction") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "debug_traceTransaction" }) const startTime = Date.now() @@ -177,6 +195,10 @@ export function convertToSafeViemPublicClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "debug_traceTransaction" }) + span?.addEvent("safe-viem-public-client.debug-transaction.success", { + result: JSON.stringify(result, bigIntReplacer), + }) + span?.end() return result as Call }) .mapErr((error) => { @@ -188,6 +210,7 @@ export function convertToSafeViemPublicClient( } safeClient.safeGetChainId = () => { + const span = safeClient.tracer?.startSpan("safe-viem-public-client.get-chain-id") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "getChainId" }) const startTime = Date.now() @@ -196,17 +219,24 @@ export function convertToSafeViemPublicClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "getChainId" }) + span?.addEvent("safe-viem-public-client.get-chain-id.success", { + result: result.toString(), + }) + span?.end() return result }) .mapErr((error) => { if (safeClient.rpcErrorCounter) { safeClient.rpcErrorCounter.add(1, { method: "getChainId" }) } + span?.recordException(error) + span?.end() return error as GetChainIdErrorType }) } safeClient.safeGetTransactionCount = (...args: Parameters) => { + const span = safeClient.tracer?.startSpan("safe-viem-public-client.get-transaction-count") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "getTransactionCount" }) const startTime = Date.now() @@ -215,17 +245,24 @@ export function convertToSafeViemPublicClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "getTransactionCount" }) + span?.addEvent("safe-viem-public-client.get-transaction-count.success", { + result: result.toString(), + }) + span?.end() return result }) .mapErr((error) => { if (safeClient.rpcErrorCounter) { safeClient.rpcErrorCounter.add(1, { method: "getTransactionCount" }) } + span?.recordException(error) + span?.end() return error as GetTransactionCountErrorType }) } safeClient.safeGetFeeHistory = (...args: Parameters) => { + const span = safeClient.tracer?.startSpan("safe-viem-public-client.get-fee-history") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "getFeeHistory" }) const startTime = Date.now() @@ -234,12 +271,18 @@ export function convertToSafeViemPublicClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "getFeeHistory" }) + span?.addEvent("safe-viem-public-client.get-fee-history.success", { + result: JSON.stringify(result, bigIntReplacer), + }) + span?.end() return result }) .mapErr((error) => { if (safeClient.rpcErrorCounter) { safeClient.rpcErrorCounter.add(1, { method: "getFeeHistory" }) } + span?.recordException(error) + span?.end() return error as GetFeeHistoryErrorType }) } @@ -251,6 +294,7 @@ export interface SafeViemWalletClient extends ViemWalletClient { rpcCounter?: Counter rpcErrorCounter?: Counter rpcResponseTimeHistogram?: Histogram + tracer?: Tracer safeSendRawTransaction: ( ...args: Parameters @@ -263,14 +307,17 @@ export interface SafeViemWalletClient extends ViemWalletClient { export function convertToSafeViemWalletClient( client: ViemWalletClient, metrics?: MetricsHandlers, + tracer?: Tracer, ): SafeViemWalletClient { const safeClient = client as SafeViemWalletClient safeClient.rpcCounter = metrics?.rpcCounter safeClient.rpcErrorCounter = metrics?.rpcErrorCounter safeClient.rpcResponseTimeHistogram = metrics?.rpcResponseTimeHistogram + safeClient.tracer = tracer safeClient.safeSendRawTransaction = (...args: Parameters) => { + const span = safeClient.tracer?.startSpan("safe-viem-wallet-client.send-raw-transaction") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "sendRawTransaction" }) const startTime = Date.now() @@ -279,17 +326,24 @@ export function convertToSafeViemWalletClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "sendRawTransaction" }) + span?.addEvent("safe-viem-wallet-client.send-raw-transaction.success", { + result: result.toString(), + }) + span?.end() return result }) .mapErr((error) => { if (safeClient.rpcErrorCounter) { safeClient.rpcErrorCounter.add(1, { method: "sendRawTransaction" }) } + span?.recordException(error) + span?.end() return error as SendRawTransactionErrorType }) } safeClient.safeSignTransaction = (args: TransactionRequestEIP1559 & { gas: bigint }) => { + const span = safeClient.tracer?.startSpan("safe-viem-wallet-client.sign-transaction") if (safeClient.rpcCounter) safeClient.rpcCounter.add(1, { method: "signTransaction" }) const startTime = Date.now() @@ -315,12 +369,18 @@ export function convertToSafeViemWalletClient( const duration = Date.now() - startTime if (safeClient.rpcResponseTimeHistogram) safeClient.rpcResponseTimeHistogram.record(duration, { method: "signTransaction" }) + span?.addEvent("safe-viem-wallet-client.sign-transaction.success", { + result: JSON.stringify(result, bigIntReplacer), + }) + span?.end() return result }) .mapErr((error) => { if (safeClient.rpcErrorCounter) { safeClient.rpcErrorCounter.add(1, { method: "signTransaction" }) } + span?.recordException(unknownToError(error)) + span?.end() return error as SignTransactionErrorType }) } diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index c16d1d3c5a..69d96b03d9 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -52,6 +52,9 @@ const txm = new TransactionManager({ metrics: { active: false, }, + traces: { + active: false, + }, }) const fromAddress = privateKeyToAddress(PRIVATE_KEY)