Skip to content
6 changes: 0 additions & 6 deletions api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ func resolveBlockTag(
if number, ok := blockNumberOrHash.Number(); ok {
height, err := resolveBlockNumber(number, blocksDB)
if err != nil {
logger.Error().Err(err).
Stringer("block_number", number).
Msg("failed to resolve block by number")
return 0, err
}
return height, nil
Expand All @@ -40,9 +37,6 @@ func resolveBlockTag(
if hash, ok := blockNumberOrHash.Hash(); ok {
height, err := blocksDB.GetHeightByID(hash)
if err != nil {
logger.Error().Err(err).
Stringer("block_hash", hash).
Msg("failed to resolve block by hash")
return 0, err
}
return height, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func init() {
Cmd.Flags().DurationVar(&cfg.TxRequestLimitDuration, "tx-request-limit-duration", time.Second*3, "Time interval upon which to enforce transaction submission rate limiting.")
Cmd.Flags().BoolVar(&cfg.TxBatchMode, "tx-batch-mode", false, "Enable batch transaction submission, to avoid nonce mismatch issues for high-volume EOAs.")
Cmd.Flags().DurationVar(&cfg.TxBatchInterval, "tx-batch-interval", time.Millisecond*1200, "Time interval upon which to submit the transaction batches to the Flow network.")
Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*10, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.")
Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*15, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.")
Cmd.Flags().DurationVar(&cfg.RpcRequestTimeout, "rpc-request-timeout", time.Second*120, "Sets the maximum duration at which JSON-RPC requests should generate a response, before they timeout. The default is 120 seconds.")

err := Cmd.Flags().MarkDeprecated("init-cadence-height", "This flag is no longer necessary and will be removed in future version. The initial Cadence height is known for testnet/mainnet and this was only required for fresh deployments of EVM Gateway. Once the DB has been initialized, the latest index Cadence height will be used upon start-up.")
Expand Down
5 changes: 2 additions & 3 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ var (

// Transaction errors

ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool")
ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)

// Storage errors

Expand Down
113 changes: 76 additions & 37 deletions services/requester/batch_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,45 @@ import (
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/requester/keystore"
)

const eoaActivityCacheSize = 10_000
const (
eoaActivityCacheSize = 10_000
maxTrackedTxNoncesPerEOA = 30
)

type pooledEvmTx struct {
txPayload cadence.String
txHash gethCommon.Hash
nonce uint64
}

// BatchTxPool is a `TxPool` implementation that collects and groups
// transactions based on their EOA signer, and submits them for execution
// using a batch.
type eoaActivityMetadata struct {
lastSubmission time.Time
txNonces []uint64
}

// BatchTxPool is a `TxPool` implementation that groups incoming transactions
// based on their EOA signer, and submits them for execution using a batch.
//
// The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the
// `EVM.run` used in `SingleTxPool`.
//
// The main advantage of this implementation over the `SingleTxPool`, is the
// guarantee that transactions originated from the same EOA address, which
// arrive in a short time interval (about the same as Flow's block production rate),
// will be executed in the same order their arrived.
// This helps to reduce the nonce mismatch errors which mainly occur from the
// re-ordering of Cadence transactions that happens from Collection nodes.
// guarantee that transactions originating from the same EOA address, which
// arrive in a short time interval (configurable by the node operator),
// will be executed in the same order they arrived.
// This helps to reduce the execution errors which may occur from the
// re-ordering of Cadence transactions that happens on Collection nodes.
type BatchTxPool struct {
*SingleTxPool
pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivity *expirable.LRU[gethCommon.Address, time.Time]

pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata]
}

var _ TxPool = &BatchTxPool{}
var _ TxPool = (*BatchTxPool)(nil)

func NewBatchTxPool(
ctx context.Context,
Expand All @@ -77,16 +83,16 @@ func NewBatchTxPool(
return nil, err
}

eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time](
eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata](
eoaActivityCacheSize,
nil,
config.EOAActivityCacheTTL,
)
batchPool := &BatchTxPool{
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivity: eoaActivity,
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivityCache: eoaActivityCache,
}

go batchPool.processPooledTransactions(ctx)
Expand All @@ -104,11 +110,6 @@ func (t *BatchTxPool) Add(
) error {
t.txPublisher.Publish(tx) // publish pending transaction event

// tx adding should be blocking, so we don't have races when
// pooled transactions are being processed in the background.
t.txMux.Lock()
defer t.txMux.Unlock()

from, err := models.DeriveTxSender(tx)
if err != nil {
return err
Expand All @@ -123,6 +124,20 @@ func (t *BatchTxPool) Add(
return err
}

eoaActivity, found := t.eoaActivityCache.Get(from)
nonce := tx.Nonce()

// Skip transactions that have been already submitted,
// as they are *likely* to fail.
if found && slices.Contains(eoaActivity.txNonces, nonce) {
t.logger.Info().
Str("evm_tx", tx.Hash().Hex()).
Str("from", from.Hex()).
Uint64("nonce", nonce).
Msg("tx with same nonce has been already submitted")
return nil
}

// Scenarios
// 1. EOA activity not found:
// => We send the transaction individually, without adding it
Expand All @@ -140,27 +155,51 @@ func (t *BatchTxPool) Add(
// For all 3 cases, we record the activity time for the next
// transactions that might come from the same EOA.
// [X] is equal to the configured `TxBatchInterval` duration.
lastActivityTime, found := t.eoaActivity.Get(from)

if !found {
// Case 1. EOA activity not found:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(lastActivityTime) > t.config.TxBatchInterval {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval {
// If the EOA has pooled transactions, which are not yet processed,
// due to congestion or anything, make sure to include the current
// tx on that batch.
t.txMux.Lock()
hasBatch := len(t.pooledTxs[from]) > 0
if hasBatch {
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
}
t.txMux.Unlock()

// If it wasn't batched, submit individually
if !hasBatch {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
}
} else {
// Case 3. EOA activity found AND it was less than [X] seconds ago:
userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()}
// Prevent submission of duplicate transactions, based on their tx hash
if slices.Contains(t.pooledTxs[from], userTx) {
return errs.ErrDuplicateTransaction
}
t.txMux.Lock()
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
t.txMux.Unlock()
}

if err != nil {
return err
}

t.eoaActivity.Add(from, time.Now())
// Update metadata for the last EOA activity only on successful add/submit.
eoaActivity.lastSubmission = time.Now()
eoaActivity.txNonces = append(eoaActivity.txNonces, nonce)
Comment on lines +191 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to protect these writes? Is it possible another request is concurrently updating/reading them?

// To avoid the slice of nonces from growing indefinitely,
// keep only the last `maxTrackedTxNoncesPerEOA` nonces.
if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA {
firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA
eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:]
}

return err
t.eoaActivityCache.Add(from, eoaActivity)

return nil
}

func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion services/requester/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (e *EVM) SendRawTransaction(ctx context.Context, data []byte) (common.Hash,
}

e.logger.Info().
Str("evm-id", tx.Hash().Hex()).
Str("evm_tx", tx.Hash().Hex()).
Str("to", to).
Str("from", from.Hex()).
Str("value", tx.Value().String()).
Expand Down
23 changes: 8 additions & 15 deletions services/requester/single_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type SingleTxPool struct {
pool *sync.Map
txPublisher *models.Publisher[*gethTypes.Transaction]
config config.Config
mux sync.Mutex
keystore *keystore.KeyStore
collector metrics.Collector
// referenceBlockHeader is stored atomically to avoid races
Expand All @@ -40,7 +39,7 @@ type SingleTxPool struct {
// todo add methods to inspect transaction pool state
}

var _ TxPool = &SingleTxPool{}
var _ TxPool = (*SingleTxPool)(nil)

func NewSingleTxPool(
ctx context.Context,
Expand Down Expand Up @@ -150,8 +149,8 @@ func (t *SingleTxPool) Add(
}

t.logger.Error().Err(res.Error).
Str("flow-id", flowTx.ID().String()).
Str("evm-id", tx.Hash().Hex()).
Str("flow_tx", flowTx.ID().String()).
Str("evm_tx", tx.Hash().Hex()).
Msg("flow transaction error")

// hide specific cause since it's an implementation issue
Expand Down Expand Up @@ -188,7 +187,11 @@ func (t *SingleTxPool) buildTransaction(
}
}

accKey, err := t.fetchSigningAccountKey()
// getting an account key from the `KeyStore` for signing transactions,
// *does not* need to be lock-protected, as the keys are read from a
// channel. No two go-routines will end up getting the same account
// key at the same time.
accKey, err := t.keystore.Take()
if err != nil {
return nil, err
}
Expand All @@ -211,16 +214,6 @@ func (t *SingleTxPool) buildTransaction(
return flowTx, nil
}

func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) {
// getting an account key from the `KeyStore` for signing transactions,
// should be lock-protected, so that we don't sign any two Flow
// transactions with the same account key
t.mux.Lock()
defer t.mux.Unlock()

return t.keystore.Take()
}

func (t *SingleTxPool) getReferenceBlock() *flow.BlockHeader {
if v := t.referenceBlockHeader.Load(); v != nil {
return v.(*flow.BlockHeader)
Expand Down
62 changes: 25 additions & 37 deletions tests/tx_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) {
// activity of the EOA was X seconds ago, where:
// X = `cfg.TxBatchInterval`.
// For the E2E tests the `cfg.TxBatchInterval` is equal
// to 2 seconds.
// to 2.5 seconds.
for i := range uint64(2) {
signed, _, err := evmSign(
big.NewInt(500_000),
Expand Down Expand Up @@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
// activity of the EOA was X seconds ago, where:
// X = `cfg.TxBatchInterval`.
// For the E2E tests the `cfg.TxBatchInterval` is equal
// to 2 seconds.
// to 2.5 seconds.
for i := range uint64(2) {
signed, _, err := evmSign(
big.NewInt(500_000),
Expand Down Expand Up @@ -514,7 +514,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
)
}

func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) {
_, cfg, stop := setupGatewayNode(t)
defer stop()

Expand All @@ -525,45 +525,30 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey)
require.NoError(t, err)

testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194")
nonce := uint64(0)
hashes := make([]common.Hash, 0)

signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

txHash, err := rpcTester.sendRawTx(signed)
require.NoError(t, err)
hashes = append(hashes, txHash)
testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8")
nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5}

// Increment nonce for the duplicate test transactions that follow
nonce += 1
dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)
hashes := []common.Hash{}
// transfer some funds to the test address
transferAmount := int64(1_000_000_000)
for _, nonce := range nonces {
signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

// Submit 5 identical transactions to test duplicate detection:
// the first should succeed, the rest should be rejected as duplicates.
for i := range 5 {
if i == 0 {
txHash, err := rpcTester.sendRawTx(dupSigned)
require.NoError(t, err)
hashes = append(hashes, txHash)
} else {
_, err := rpcTester.sendRawTx(dupSigned)
require.Error(t, err)
require.ErrorContains(t, err, "invalid: transaction already in pool")
}
txHash, err := rpcTester.sendRawTx(signed)
require.NoError(t, err)
hashes = append(hashes, txHash)
}

expectedBalance := big.NewInt(6 * transferAmount)

assert.Eventually(t, func() bool {
for _, h := range hashes {
rcp, err := rpcTester.getReceipt(h.String())
if err != nil || rcp == nil || rcp.Status != 1 {
return false
}
balance, err := rpcTester.getBalance(testAddr)
if err != nil {
return false
}

return true
return balance.Cmp(expectedBalance) == 0
}, time.Second*15, time.Second*1, "all transactions were not executed")
}

Expand Down Expand Up @@ -604,9 +589,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
EnforceGasPrice: true,
LogLevel: zerolog.DebugLevel,
LogWriter: testLogWriter(),
TxStateValidation: config.TxSealValidation,
TxStateValidation: config.LocalIndexValidation,
TxBatchMode: true,
TxBatchInterval: time.Second * 2,
TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet
}

bootstrapDone := make(chan struct{})
Expand All @@ -617,6 +602,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
require.NoError(t, err)
}()

// Allow the Gateway to catch up on indexing
time.Sleep(time.Second * 2)

<-bootstrapDone

return emu, cfg, func() {
Expand Down