From 1143ff46b48f0cbc935de35a6a29263d0822e13c Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 10 Nov 2025 14:01:32 +0200 Subject: [PATCH] Track the latest finalized block header to avoid fetching it on every tx submission --- bootstrap/bootstrap.go | 9 +++- services/requester/batch_tx_pool.go | 31 +++++--------- services/requester/single_tx_pool.go | 62 +++++++++++++++++++++++----- tests/helpers.go | 2 +- 4 files changed, 70 insertions(+), 34 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 90b7d1ed..f5748c2e 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -253,8 +253,9 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { // create transaction pool var txPool requester.TxPool + var err error if b.config.TxBatchMode { - txPool = requester.NewBatchTxPool( + txPool, err = requester.NewBatchTxPool( ctx, b.client, b.publishers.Transaction, @@ -264,7 +265,8 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { b.keystore, ) } else { - txPool = requester.NewSingleTxPool( + txPool, err = requester.NewSingleTxPool( + ctx, b.client, b.publishers.Transaction, b.logger, @@ -273,6 +275,9 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { b.keystore, ) } + if err != nil { + return fmt.Errorf("failed to create transaction pool: %w", err) + } evm, err := requester.NewEVM( b.storages.Registers, diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index f96cd371..28f824c7 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -57,11 +57,12 @@ func NewBatchTxPool( config config.Config, collector metrics.Collector, keystore *keystore.KeyStore, -) *BatchTxPool { +) (*BatchTxPool, error) { // initialize the available keys metric since it is only updated when sending a tx collector.AvailableSigningKeys(keystore.AvailableKeys()) - singleTxPool := NewSingleTxPool( + singleTxPool, err := NewSingleTxPool( + ctx, client, transactionsPublisher, logger, @@ -69,6 +70,9 @@ func NewBatchTxPool( collector, keystore, ) + if err != nil { + return nil, err + } eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time]( eoaActivityCacheSize, @@ -84,7 +88,7 @@ func NewBatchTxPool( go batchPool.processPooledTransactions(ctx) - return batchPool + return batchPool, nil } // Add adds the EVM transaction to the tx pool, grouped with the rest of the @@ -161,14 +165,6 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - latestBlock, err := t.client.GetLatestBlock(ctx, true) - if err != nil { - t.logger.Error().Err(err).Msg( - "failed to get latest Flow block on batch tx submission", - ) - continue - } - // Take a copy here to allow `Add()` to continue accept // incoming EVM transactions, without blocking until the // batch transactions are submitted. @@ -180,7 +176,7 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { for address, pooledTxs := range txsGroupedByAddress { err := t.batchSubmitTransactionsForSameAddress( ctx, - latestBlock, + t.getReferenceBlock(), pooledTxs, ) if err != nil { @@ -197,7 +193,7 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { func (t *BatchTxPool) batchSubmitTransactionsForSameAddress( ctx context.Context, - latestBlock *flow.Block, + referenceBlockHeader *flow.BlockHeader, pooledTxs []pooledEvmTx, ) error { // Sort the transactions based on their nonce, to make sure @@ -219,7 +215,7 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress( script := replaceAddresses(runTxScript, t.config.FlowNetworkID) flowTx, err := t.buildTransaction( ctx, - latestBlock, + referenceBlockHeader, script, cadence.NewArray(hexEncodedTxs), coinbaseAddress, @@ -242,11 +238,6 @@ func (t *BatchTxPool) submitSingleTransaction( ctx context.Context, hexEncodedTx cadence.String, ) error { - latestBlock, err := t.client.GetLatestBlock(ctx, true) - if err != nil { - return err - } - coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex()) if err != nil { return err @@ -255,7 +246,7 @@ func (t *BatchTxPool) submitSingleTransaction( script := replaceAddresses(runTxScript, t.config.FlowNetworkID) flowTx, err := t.buildTransaction( ctx, - latestBlock, + t.getReferenceBlock(), script, cadence.NewArray([]cadence.Value{hexEncodedTx}), coinbaseAddress, diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index 3a79d02e..e8de3e89 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "sync" + "sync/atomic" "time" gethTypes "github.com/ethereum/go-ethereum/core/types" @@ -20,6 +21,8 @@ import ( "github.com/onflow/flow-evm-gateway/services/requester/keystore" ) +const referenceBlockUpdateFrequency = time.Second * 15 + // SingleTxPool is a simple implementation of the `TxPool` interface that submits // transactions as soon as they arrive, without any delays or batching strategies. type SingleTxPool struct { @@ -31,23 +34,32 @@ type SingleTxPool struct { mux sync.Mutex keystore *keystore.KeyStore collector metrics.Collector + // referenceBlockHeader is stored atomically to avoid races + // between request path and ticker updates. + referenceBlockHeader atomic.Value // stores *flow.BlockHeader // todo add methods to inspect transaction pool state } var _ TxPool = &SingleTxPool{} func NewSingleTxPool( + ctx context.Context, client *CrossSporkClient, transactionsPublisher *models.Publisher[*gethTypes.Transaction], logger zerolog.Logger, config config.Config, collector metrics.Collector, keystore *keystore.KeyStore, -) *SingleTxPool { +) (*SingleTxPool, error) { + referenceBlockHeader, err := client.GetLatestBlockHeader(ctx, false) + if err != nil { + return nil, err + } + // initialize the available keys metric since it is only updated when sending a tx collector.AvailableSigningKeys(keystore.AvailableKeys()) - return &SingleTxPool{ + singleTxPool := &SingleTxPool{ logger: logger.With().Str("component", "tx-pool").Logger(), client: client, txPublisher: transactionsPublisher, @@ -56,6 +68,11 @@ func NewSingleTxPool( collector: collector, keystore: keystore, } + singleTxPool.referenceBlockHeader.Store(referenceBlockHeader) + + go singleTxPool.updateReferenceBlock(ctx) + + return singleTxPool, nil } // Add creates a Cadence transaction that wraps the given EVM transaction in @@ -92,15 +109,10 @@ func (t *SingleTxPool) Add( return err } - latestBlock, err := t.client.GetLatestBlock(ctx, true) - if err != nil { - return err - } - script := replaceAddresses(runTxScript, t.config.FlowNetworkID) flowTx, err := t.buildTransaction( ctx, - latestBlock, + t.getReferenceBlock(), script, cadence.NewArray([]cadence.Value{hexEncodedTx}), coinbaseAddress, @@ -157,7 +169,7 @@ func (t *SingleTxPool) Add( // with the given arguments and signs it with the configured COA account. func (t *SingleTxPool) buildTransaction( ctx context.Context, - latestBlock *flow.Block, + referenceBlockHeader *flow.BlockHeader, script []byte, args ...cadence.Value, ) (*flow.Transaction, error) { @@ -167,7 +179,7 @@ func (t *SingleTxPool) buildTransaction( flowTx := flow.NewTransaction(). SetScript(script). - SetReferenceBlockID(latestBlock.ID). + SetReferenceBlockID(referenceBlockHeader.ID). SetComputeLimit(flowGo.DefaultMaxTransactionGasLimit) for _, arg := range args { @@ -194,7 +206,7 @@ func (t *SingleTxPool) buildTransaction( } // now that the transaction is prepared, store the transaction's metadata - accKey.SetLockMetadata(flowTx.ID(), latestBlock.Height) + accKey.SetLockMetadata(flowTx.ID(), referenceBlockHeader.Height) return flowTx, nil } @@ -208,3 +220,31 @@ func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) { return t.keystore.Take() } + +func (t *SingleTxPool) getReferenceBlock() *flow.BlockHeader { + if v := t.referenceBlockHeader.Load(); v != nil { + return v.(*flow.BlockHeader) + } + return nil +} + +func (t *SingleTxPool) updateReferenceBlock(ctx context.Context) { + ticker := time.NewTicker(referenceBlockUpdateFrequency) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + blockHeader, err := t.client.GetLatestBlockHeader(ctx, false) + if err != nil { + t.logger.Error().Err(err).Msg( + "failed to update the reference block", + ) + continue + } + t.referenceBlockHeader.Store(blockHeader) + } + } +} diff --git a/tests/helpers.go b/tests/helpers.go index 4fc69f28..017b8578 100644 --- a/tests/helpers.go +++ b/tests/helpers.go @@ -90,7 +90,7 @@ func startEmulator(createTestAccounts bool) (*server.EmulatorServer, error) { GenesisTokenSupply: genesisToken, WithContracts: true, Host: "localhost", - TransactionExpiry: 10, + TransactionExpiry: flow.DefaultTransactionExpiry, TransactionMaxGasLimit: flow.DefaultMaxTransactionGasLimit, SetupEVMEnabled: true, SetupVMBridgeEnabled: true,