Skip to content
5 changes: 2 additions & 3 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ var (

// Transaction errors

ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool")
ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)

// Storage errors

Expand Down
111 changes: 75 additions & 36 deletions services/requester/batch_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package requester
import (
"context"
"encoding/hex"
"fmt"
"slices"
"sort"
"sync"
Expand All @@ -22,35 +23,42 @@ import (
"github.com/onflow/flow-evm-gateway/services/requester/keystore"
)

const eoaActivityCacheSize = 10_000
const (
eoaActivityCacheSize = 10_000
maxTrackedTxNoncesPerEOA = 15
)

type pooledEvmTx struct {
txPayload cadence.String
txHash gethCommon.Hash
nonce uint64
}

// BatchTxPool is a `TxPool` implementation that collects and groups
// transactions based on their EOA signer, and submits them for execution
// using a batch.
type eoaActivityMetadata struct {
lastSubmission time.Time
txNonces []uint64
}

// BatchTxPool is a `TxPool` implementation that groups incoming transactions
// based on their EOA signer, and submits them for execution using a batch.
//
// The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the
// `EVM.run` used in `SingleTxPool`.
//
// The main advantage of this implementation over the `SingleTxPool`, is the
// guarantee that transactions originated from the same EOA address, which
// arrive in a short time interval (about the same as Flow's block production rate),
// will be executed in the same order their arrived.
// This helps to reduce the nonce mismatch errors which mainly occur from the
// re-ordering of Cadence transactions that happens from Collection nodes.
// guarantee that transactions originating from the same EOA address, which
// arrive in a short time interval (configurable by the node operator),
// will be executed in the same order they arrived.
// This helps to reduce the execution errors which may occur from the
// re-ordering of Cadence transactions that happens on Collection nodes.
type BatchTxPool struct {
*SingleTxPool
pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivity *expirable.LRU[gethCommon.Address, time.Time]

pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata]
}

var _ TxPool = &BatchTxPool{}
var _ TxPool = (*BatchTxPool)(nil)

func NewBatchTxPool(
ctx context.Context,
Expand All @@ -77,16 +85,16 @@ func NewBatchTxPool(
return nil, err
}

eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time](
eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata](
eoaActivityCacheSize,
nil,
config.EOAActivityCacheTTL,
)
batchPool := &BatchTxPool{
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivity: eoaActivity,
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivityCache: eoaActivityCache,
}

go batchPool.processPooledTransactions(ctx)
Expand All @@ -104,11 +112,6 @@ func (t *BatchTxPool) Add(
) error {
t.txPublisher.Publish(tx) // publish pending transaction event

// tx adding should be blocking, so we don't have races when
// pooled transactions are being processed in the background.
t.txMux.Lock()
defer t.txMux.Unlock()

from, err := models.DeriveTxSender(tx)
if err != nil {
return err
Expand All @@ -123,6 +126,19 @@ func (t *BatchTxPool) Add(
return err
}

eoaActivity, found := t.eoaActivityCache.Get(from)
nonce := tx.Nonce()

// Reject transactions that have already been submitted,
// as they are *likely* to fail.
if found && slices.Contains(eoaActivity.txNonces, nonce) {
return fmt.Errorf(
"%w: a tx with nonce %d has already been submitted",
errs.ErrInvalid,
nonce,
)
}

// Scenarios
// 1. EOA activity not found:
// => We send the transaction individually, without adding it
Expand All @@ -140,27 +156,50 @@ func (t *BatchTxPool) Add(
// For all 3 cases, we record the activity time for the next
// transactions that might come from the same EOA.
// [X] is equal to the configured `TxBatchInterval` duration.
lastActivityTime, found := t.eoaActivity.Get(from)

if !found {
// Case 1. EOA activity not found:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(lastActivityTime) > t.config.TxBatchInterval {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval {
// If the EOA has pooled transactions, which are not yet processed,
// due to congestion or anything, make sure to include the current
// tx on that batch.
t.txMux.Lock()
hasBatch := len(t.pooledTxs[from]) > 0
if hasBatch {
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
}
t.txMux.Unlock()

// If it wasn't batched, submit individually
if !hasBatch {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
}
} else {
// Case 3. EOA activity found AND it was less than [X] seconds ago:
userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()}
// Prevent submission of duplicate transactions, based on their tx hash
if slices.Contains(t.pooledTxs[from], userTx) {
return errs.ErrDuplicateTransaction
}
t.txMux.Lock()
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
t.txMux.Unlock()
}

if err != nil {
return err
}

t.eoaActivity.Add(from, time.Now())
// Update metadata for the last EOA activity only on successful add/submit.
eoaActivity.lastSubmission = time.Now()
eoaActivity.txNonces = append(eoaActivity.txNonces, nonce)
// To avoid the slice of nonces from growing indefinitely,
// maintain only a handful of the last tx nonces.
if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA {
eoaActivity.txNonces = eoaActivity.txNonces[1:]
}

return err
t.eoaActivityCache.Add(from, eoaActivity)

return nil
}

func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
Expand Down
19 changes: 6 additions & 13 deletions services/requester/single_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type SingleTxPool struct {
pool *sync.Map
txPublisher *models.Publisher[*gethTypes.Transaction]
config config.Config
mux sync.Mutex
keystore *keystore.KeyStore
collector metrics.Collector
// referenceBlockHeader is stored atomically to avoid races
Expand All @@ -40,7 +39,7 @@ type SingleTxPool struct {
// todo add methods to inspect transaction pool state
}

var _ TxPool = &SingleTxPool{}
var _ TxPool = (*SingleTxPool)(nil)

func NewSingleTxPool(
ctx context.Context,
Expand Down Expand Up @@ -188,7 +187,11 @@ func (t *SingleTxPool) buildTransaction(
}
}

accKey, err := t.fetchSigningAccountKey()
// getting an account key from the `KeyStore` for signing transactions,
// *does not* need to be lock-protected, as the keys are read from a
// channel. No two go-routines will end up getting the same account
// key at the same time.
accKey, err := t.keystore.Take()
if err != nil {
return nil, err
}
Expand All @@ -211,16 +214,6 @@ func (t *SingleTxPool) buildTransaction(
return flowTx, nil
}

func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) {
// getting an account key from the `KeyStore` for signing transactions,
// should be lock-protected, so that we don't sign any two Flow
// transactions with the same account key
t.mux.Lock()
defer t.mux.Unlock()

return t.keystore.Take()
}

func (t *SingleTxPool) getReferenceBlock() *flow.BlockHeader {
if v := t.referenceBlockHeader.Load(); v != nil {
return v.(*flow.BlockHeader)
Expand Down
61 changes: 32 additions & 29 deletions tests/tx_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) {
// activity of the EOA was X seconds ago, where:
// X = `cfg.TxBatchInterval`.
// For the E2E tests the `cfg.TxBatchInterval` is equal
// to 2 seconds.
// to 2.5 seconds.
for i := range uint64(2) {
signed, _, err := evmSign(
big.NewInt(500_000),
Expand Down Expand Up @@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
// activity of the EOA was X seconds ago, where:
// X = `cfg.TxBatchInterval`.
// For the E2E tests the `cfg.TxBatchInterval` is equal
// to 2 seconds.
// to 2.5 seconds.
for i := range uint64(2) {
signed, _, err := evmSign(
big.NewInt(500_000),
Expand Down Expand Up @@ -514,7 +514,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
)
}

func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) {
_, cfg, stop := setupGatewayNode(t)
defer stop()

Expand All @@ -525,36 +525,36 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey)
require.NoError(t, err)

testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194")
nonce := uint64(0)
hashes := make([]common.Hash, 0)

signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

txHash, err := rpcTester.sendRawTx(signed)
require.NoError(t, err)
hashes = append(hashes, txHash)
testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8")
nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5}

// Increment nonce for the duplicate test transactions that follow
nonce += 1
dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)
var errors []error
hashes := []common.Hash{}
// transfer some funds to the test address
for _, nonce := range nonces {
signed, _, err := evmSign(big.NewInt(1_000_000_000), 23_500, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

// Submit 5 identical transactions to test duplicate detection:
// the first should succeed, the rest should be rejected as duplicates.
for i := range 5 {
if i == 0 {
txHash, err := rpcTester.sendRawTx(dupSigned)
require.NoError(t, err)
hashes = append(hashes, txHash)
txHash, err := rpcTester.sendRawTx(signed)
if err != nil {
errors = append(errors, err)
} else {
_, err := rpcTester.sendRawTx(dupSigned)
require.Error(t, err)
require.ErrorContains(t, err, "invalid: transaction already in pool")
hashes = append(hashes, txHash)
}
}

require.Len(t, errors, 2)
assert.ErrorContains(
t,
errors[0],
"a tx with nonce 2 has already been submitted",
)
assert.ErrorContains(
t,
errors[1],
"a tx with nonce 3 has already been submitted",
)

assert.Eventually(t, func() bool {
for _, h := range hashes {
rcp, err := rpcTester.getReceipt(h.String())
Expand Down Expand Up @@ -604,9 +604,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
EnforceGasPrice: true,
LogLevel: zerolog.DebugLevel,
LogWriter: testLogWriter(),
TxStateValidation: config.TxSealValidation,
TxStateValidation: config.LocalIndexValidation,
TxBatchMode: true,
TxBatchInterval: time.Second * 2,
TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet
}

bootstrapDone := make(chan struct{})
Expand All @@ -617,6 +617,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
require.NoError(t, err)
}()

// Allow the Gateway to catch up on indexing
time.Sleep(time.Second * 2)

<-bootstrapDone

return emu, cfg, func() {
Expand Down