Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/randomness/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"@happy.tech/common": "workspace:0.1.0",
"@happy.tech/contracts": "workspace:0.1.0",
"@happy.tech/txm": "workspace:0.1.0",
"@opentelemetry/sdk-trace-node": "^1.30.1",
"@opentelemetry/api": "^1.9.0",
"better-sqlite3": "^11.7.0",
"kysely": "^0.27.5",
"neverthrow": "^8.1.0",
Expand Down
1 change: 1 addition & 0 deletions apps/randomness/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const envSchema = z.object({
EVM_DRAND_GENESIS_TIMESTAMP_SECONDS: z.string().transform((s) => BigInt(s)),
EVM_DRAND_PERIOD_SECONDS: z.string().transform((s) => BigInt(s)),
EVM_DRAND_MARGIN: z.string().transform((s) => BigInt(s)),
OTEL_EXPORTER_OTLP_ENDPOINT: z.string().trim().optional(),
})

const parsedEnv = envSchema.safeParse(process.env)
Expand Down
9 changes: 9 additions & 0 deletions apps/randomness/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { abis } from "@happy.tech/contracts/random/anvil"
import { TransactionManager, TransactionStatus, TxmHookType } from "@happy.tech/txm"
import type { LatestBlock, Transaction } from "@happy.tech/txm"
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto"
import { CustomGasEstimator } from "./CustomGasEstimator.js"
import { DrandRepository } from "./DrandRepository"
import { DrandService } from "./DrandService"
Expand Down Expand Up @@ -33,6 +34,14 @@ class RandomnessService {
gas: {
minPriorityFeePerGas: 10n,
},
traces: {
active: true,
spanExporter: env.OTEL_EXPORTER_OTLP_ENDPOINT
? new OTLPTraceExporter({
url: env.OTEL_EXPORTER_OTLP_ENDPOINT,
})
: undefined,
},
})
this.transactionFactory = new TransactionFactory(this.txm, env.RANDOM_CONTRACT_ADDRESS, env.PRECOMMIT_DELAY)
this.drandService = new DrandService(this.drandRepository, this.transactionFactory)
Expand Down
2 changes: 2 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 20 additions & 6 deletions packages/txm/lib/BlockMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { LogTag, Logger } from "@happy.tech/common"
import { ROOT_CONTEXT, SpanStatusCode, context, trace } from "@opentelemetry/api"
import type { Block } from "viem"
import { Topics, eventBus } from "./EventBus.js"
import type { TransactionManager } from "./TransactionManager.js"
import { TxmMetrics } from "./telemetry/metrics"

import { TraceMethod } from "./telemetry/traces"
/**
* A type alias for {@link Block} with the `blockTag` set to `"latest"`, ensuring type definitions correspond to the latest block.
*/
Expand Down Expand Up @@ -31,17 +32,21 @@ export class BlockMonitor {
})
}

@TraceMethod("txm.block-monitor.on-new-block")
private onNewBlock(block: LatestBlock | undefined) {
if (!block) {
Logger.instance.error(LogTag.TXM, "Received undefined block")
return
}

const span = trace.getSpan(context.active())!
span.setAttribute("block.number", Number(block.number))

if (this.latestProcessedBlockNumber && block.number <= this.latestProcessedBlockNumber) {
Logger.instance.warn(
LogTag.TXM,
"Received block number less than or equal to latest processed block number. Skipping.",
)
const description = `Received block number less than or equal to latest processed block number. Skipping. Latest processed block number: ${this.latestProcessedBlockNumber}, received block number: ${block.number}`
span.recordException(new Error(description))
span.setStatus({ code: SpanStatusCode.ERROR })
Logger.instance.warn(LogTag.TXM, description)
return
}

Expand All @@ -58,13 +63,15 @@ export class BlockMonitor {
this.scheduleTimeout()
}

@TraceMethod("txm.block-monitor.schedule-timeout")
private scheduleTimeout() {
this.blockTimeout = setTimeout(() => {
Logger.instance.warn(LogTag.TXM, "Timeout reached. Resetting block subscription.")
this.resetBlockSubscription()
}, this.txmgr.blockInactivityTimeout)
}

@TraceMethod("txm.block-monitor.reset-block-subscription")
private resetBlockSubscription() {
TxmMetrics.getInstance().resetBlockMonitorCounter.add(1)
this.txmgr.rpcLivenessMonitor.trackError()
Expand All @@ -79,6 +86,13 @@ export class BlockMonitor {
this.unwatch = undefined
}

setTimeout(() => this.start(), 500)
// Delay block monitor restart by 500ms
// This prevents an infinite loop of rapid restarts
// when the blockchain node is unresponsive or down
setTimeout(() => {
context.with(ROOT_CONTEXT, () => {
this.start()
})
}, 500)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this 500ms timeout on start again? We should document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment

}
}
20 changes: 18 additions & 2 deletions packages/txm/lib/GasEstimator.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { SpanStatusCode, context, trace } from "@opentelemetry/api"
import { type Result, err, ok } from "neverthrow"
import { encodeFunctionData } from "viem"
import type { Transaction } from "./Transaction.js"
import type { TransactionManager } from "./TransactionManager.js"
import { TraceMethod } from "./telemetry/traces"

export enum EstimateGasErrorCause {
EstimateGasABINotFound = "EstimateGasABINotFound",
Expand Down Expand Up @@ -61,16 +63,22 @@ export class DefaultGasLimitEstimator implements GasEstimator {
* to use the default case without needing to reimplement it. This is particularly
* useful for services like the Randomness service.
*/
@TraceMethod("txm.gas-estimator.simulate-transaction-for-gas")
protected async simulateTransactionForGas(
transactionManager: TransactionManager,
transaction: Transaction,
): Promise<Result<bigint, EstimateGasError>> {
const span = trace.getSpan(context.active())!

const abi = transactionManager.abiManager.get(transaction.contractName)

if (!abi) {
const description = `ABI not found for contract ${transaction.contractName}`
span.recordException(new Error(description))
span.setStatus({ code: SpanStatusCode.ERROR })
return err({
cause: EstimateGasErrorCause.EstimateGasABINotFound,
description: `ABI not found for contract ${transaction.contractName}`,
description,
})
}

Expand All @@ -86,12 +94,20 @@ export class DefaultGasLimitEstimator implements GasEstimator {
})

if (gasResult.isErr()) {
const description = `Failed to estimate gas for transaction ${transaction.intentId}. Details: ${gasResult.error}`
span.recordException(new Error(description))
span.setStatus({ code: SpanStatusCode.ERROR })
return err({
cause: EstimateGasErrorCause.EstimateGasClientError,
description: `Failed to estimate gas for transaction ${transaction.intentId}. Details: ${gasResult.error}`,
description,
})
}

span.addEvent("txm.gas-estimator.simulate-transaction-for-gas.succeeded", {
transactionIntentId: transaction.intentId,
gas: Number(gasResult.value),
})

return ok(gasResult.value)
}
}
21 changes: 21 additions & 0 deletions packages/txm/lib/GasPriceOracle.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { bigIntMax } from "@happy.tech/common"
import { SpanStatusCode, context, trace } from "@opentelemetry/api"
import { type Result, err, ok } from "neverthrow"
import type { LatestBlock } from "./BlockMonitor.js"
import { Topics, eventBus } from "./EventBus.js"
import type { TransactionManager } from "./TransactionManager.js"
import { TraceMethod } from "./telemetry/traces"

/**
* This module estimates the gas price for transaction execution. It updates with each new block, basing its calculations on EIP-1559.
Expand Down Expand Up @@ -39,26 +41,45 @@ export class GasPriceOracle {
await this.onNewBlock(block)
}

@TraceMethod("txm.gas-price-oracle.on-new-block")
private async onNewBlock(block: LatestBlock) {
const span = trace.getSpan(context.active())!

const baseFeePerGas = block.baseFeePerGas
const gasUsed = block.gasUsed
const gasLimit = block.gasLimit

span.addEvent("txm.gas-price-oracle.on-new-block.started", {
blockNumber: Number(block.number),
baseFeePerGas: Number(baseFeePerGas),
gasUsed: Number(gasUsed),
gasLimit: Number(gasLimit),
})

this.expectedNextBaseFeePerGas = this.calculateExpectedNextBaseFeePerGas(
baseFeePerGas as bigint,
gasUsed,
gasLimit,
)
const targetPriorityFeeResult = await this.calculateTargetPriorityFee()
if (targetPriorityFeeResult.isErr()) {
span.recordException(targetPriorityFeeResult.error)
span.setStatus({ code: SpanStatusCode.ERROR })
if (this.targetPriorityFee === undefined) {
this.targetPriorityFee = this.txmgr.maxPriorityFeePerGas ?? 0n
}
return
}
this.targetPriorityFee = targetPriorityFeeResult.value

span.addEvent("txm.gas-price-oracle.on-new-block.succeeded", {
blockNumber: Number(block.number),
expectedNextBaseFeePerGas: Number(this.expectedNextBaseFeePerGas),
targetPriorityFee: Number(this.targetPriorityFee),
})
}

@TraceMethod("txm.gas-price-oracle.calculate-target-priority-fee")
private async calculateTargetPriorityFee(): Promise<Result<bigint, Error>> {
const feeHistory = await this.txmgr.viemClient.safeGetFeeHistory({
blockCount: this.txmgr.priorityFeeAnalysisBlocks,
Expand Down
7 changes: 7 additions & 0 deletions packages/txm/lib/HookManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { LatestBlock } from "./BlockMonitor"
import { Topics, eventBus } from "./EventBus.js"
import type { Transaction } from "./Transaction.js"
import type { AttemptSubmissionErrorCause } from "./TransactionSubmitter"
import { TraceMethod } from "./telemetry/traces"

export enum TxmHookType {
All = "All",
Expand Down Expand Up @@ -114,6 +115,7 @@ export class HookManager {
}
}

@TraceMethod("txm.hook-manager.on-transaction-status-changed")
private async onTransactionStatusChanged(payload: { transaction: Transaction }): Promise<void> {
this.hooks[TxmHookType.TransactionStatusChanged].forEach((handler) => handler(payload.transaction))

Expand All @@ -125,6 +127,7 @@ export class HookManager {
)
}

@TraceMethod("txm.hook-manager.on-transaction-save-failed")
private async onTransactionSaveFailed(payload: {
transaction: Transaction
}): Promise<void> {
Expand All @@ -138,6 +141,7 @@ export class HookManager {
)
}

@TraceMethod("txm.hook-manager.on-new-block")
private async onNewBlock(block: LatestBlock): Promise<void> {
this.hooks[TxmHookType.NewBlock].forEach((handler) => handler(block))

Expand All @@ -149,6 +153,7 @@ export class HookManager {
)
}

@TraceMethod("txm.hook-manager.on-transaction-submission-failed")
private async onTransactionSubmissionFailed(payload: {
transaction: Transaction
description: string
Expand All @@ -168,12 +173,14 @@ export class HookManager {
)
}

@TraceMethod("txm.hook-manager.on-rpc-is-down")
private async onRpcIsDown(): Promise<void> {
this.hooks[TxmHookType.RpcIsDown].forEach((handler) => handler())

this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsDown }))
}

@TraceMethod("txm.hook-manager.on-rpc-is-up")
private async onRpcIsUp(): Promise<void> {
this.hooks[TxmHookType.RpcIsUp].forEach((handler) => handler())

Expand Down
29 changes: 29 additions & 0 deletions packages/txm/lib/NonceManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { LogTag, Logger } from "@happy.tech/common"
import { SpanStatusCode, context, trace } from "@opentelemetry/api"
import type { TransactionManager } from "./TransactionManager"
import { TxmMetrics } from "./telemetry/metrics"
import { TraceMethod } from "./telemetry/traces"

/*
/*
Expand Down Expand Up @@ -72,21 +74,35 @@ export class NonceManager {
}
}

@TraceMethod("txm.nonce-manager.request-nonce")
public requestNonce(): number {
const span = trace.getSpan(context.active())!

if (this.returnedNonceQueue.length > 0) {
const nonce = this.returnedNonceQueue.shift()!
span.addEvent("txm.nonce-manager.request-nonce.from-queue", {
nonce,
})
TxmMetrics.getInstance().returnedNonceQueueGauge.record(this.returnedNonceQueue.length)
return nonce
}

const requestedNonce = this.nonce
this.nonce = this.nonce + 1

span.addEvent("txm.nonce-manager.request-nonce.new-nonce", {
nonce: requestedNonce,
})

TxmMetrics.getInstance().nonceManagerGauge.record(this.nonce)
return requestedNonce
}

// Only called when a transaction that has reserved a nonce ultimately doesn't reach the mempool
@TraceMethod("txm.nonce-manager.return-nonce")
public returnNonce(nonce: number) {
const span = trace.getSpan(context.active())!

const index = this.returnedNonceQueue.findIndex((n) => nonce < n)

if (index === -1) {
Expand All @@ -95,11 +111,18 @@ export class NonceManager {
this.returnedNonceQueue.splice(index, 0, nonce)
}

span.addEvent("txm.nonce-manager.return-nonce.added-to-queue", {
nonce,
})

TxmMetrics.getInstance().returnedNonceCounter.add(1)
TxmMetrics.getInstance().returnedNonceQueueGauge.record(this.returnedNonceQueue.length)
}

@TraceMethod("txm.nonce-manager.resync")
public async resync() {
const span = trace.getSpan(context.active())!

const address = this.txmgr.viemWallet.account.address

const blockchainNonceResult = await this.txmgr.viemClient.safeGetTransactionCount({
Expand All @@ -111,11 +134,17 @@ export class NonceManager {
error: blockchainNonceResult.error,
})
this.txmgr.rpcLivenessMonitor.trackError()
span.recordException(blockchainNonceResult.error)
span.setStatus({ code: SpanStatusCode.ERROR })
return
}

this.txmgr.rpcLivenessMonitor.trackSuccess()

this.maxExecutedNonce = blockchainNonceResult.value - 1

span.addEvent("txm.nonce-manager.resync.updated-max-executed-nonce", {
maxExecutedNonce: this.maxExecutedNonce,
})
}
}
Loading
Loading