diff --git a/api/utils.go b/api/utils.go index dc9d2ddd..9723e637 100644 --- a/api/utils.go +++ b/api/utils.go @@ -29,9 +29,6 @@ func resolveBlockTag( if number, ok := blockNumberOrHash.Number(); ok { height, err := resolveBlockNumber(number, blocksDB) if err != nil { - logger.Error().Err(err). - Stringer("block_number", number). - Msg("failed to resolve block by number") return 0, err } return height, nil @@ -40,9 +37,6 @@ func resolveBlockTag( if hash, ok := blockNumberOrHash.Hash(); ok { height, err := blocksDB.GetHeightByID(hash) if err != nil { - logger.Error().Err(err). - Stringer("block_hash", hash). - Msg("failed to resolve block by hash") return 0, err } return height, nil diff --git a/models/errors/errors.go b/models/errors/errors.go index 614c7403..fb0f38d6 100644 --- a/models/errors/errors.go +++ b/models/errors/errors.go @@ -28,9 +28,8 @@ var ( // Transaction errors - ErrFailedTransaction = errors.New("failed transaction") - ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) - ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool") + ErrFailedTransaction = errors.New("failed transaction") + ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) // Storage errors diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index d82ae258..ac81993b 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -18,39 +18,45 @@ import ( "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-evm-gateway/models" - errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/requester/keystore" ) -const eoaActivityCacheSize = 10_000 +const ( + eoaActivityCacheSize = 10_000 + maxTrackedTxNoncesPerEOA = 30 +) type pooledEvmTx struct { txPayload cadence.String - txHash gethCommon.Hash nonce uint64 } -// BatchTxPool is a `TxPool` implementation that collects and groups -// transactions based on their EOA signer, and submits them for execution -// using a batch. +type eoaActivityMetadata struct { + lastSubmission time.Time + txNonces []uint64 +} + +// BatchTxPool is a `TxPool` implementation that groups incoming transactions +// based on their EOA signer, and submits them for execution using a batch. // // The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the // `EVM.run` used in `SingleTxPool`. // // The main advantage of this implementation over the `SingleTxPool`, is the -// guarantee that transactions originated from the same EOA address, which -// arrive in a short time interval (about the same as Flow's block production rate), -// will be executed in the same order their arrived. -// This helps to reduce the nonce mismatch errors which mainly occur from the -// re-ordering of Cadence transactions that happens from Collection nodes. +// guarantee that transactions originating from the same EOA address, which +// arrive in a short time interval (configurable by the node operator), +// will be executed in the same order they arrived. +// This helps to reduce the execution errors which may occur from the +// re-ordering of Cadence transactions that happens on Collection nodes. type BatchTxPool struct { *SingleTxPool - pooledTxs map[gethCommon.Address][]pooledEvmTx - txMux sync.Mutex - eoaActivity *expirable.LRU[gethCommon.Address, time.Time] + + pooledTxs map[gethCommon.Address][]pooledEvmTx + txMux sync.Mutex + eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata] } -var _ TxPool = &BatchTxPool{} +var _ TxPool = (*BatchTxPool)(nil) func NewBatchTxPool( ctx context.Context, @@ -77,16 +83,16 @@ func NewBatchTxPool( return nil, err } - eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time]( + eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata]( eoaActivityCacheSize, nil, config.EOAActivityCacheTTL, ) batchPool := &BatchTxPool{ - SingleTxPool: singleTxPool, - pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), - txMux: sync.Mutex{}, - eoaActivity: eoaActivity, + SingleTxPool: singleTxPool, + pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), + txMux: sync.Mutex{}, + eoaActivityCache: eoaActivityCache, } go batchPool.processPooledTransactions(ctx) @@ -104,11 +110,6 @@ func (t *BatchTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event - // tx adding should be blocking, so we don't have races when - // pooled transactions are being processed in the background. - t.txMux.Lock() - defer t.txMux.Unlock() - from, err := models.DeriveTxSender(tx) if err != nil { return err @@ -123,6 +124,20 @@ func (t *BatchTxPool) Add( return err } + eoaActivity, found := t.eoaActivityCache.Get(from) + nonce := tx.Nonce() + + // Skip transactions that have been already submitted, + // as they are *likely* to fail. + if found && slices.Contains(eoaActivity.txNonces, nonce) { + t.logger.Info(). + Str("evm_tx", tx.Hash().Hex()). + Str("from", from.Hex()). + Uint64("nonce", nonce). + Msg("tx with same nonce has been already submitted") + return nil + } + // Scenarios // 1. EOA activity not found: // => We send the transaction individually, without adding it @@ -140,27 +155,45 @@ func (t *BatchTxPool) Add( // For all 3 cases, we record the activity time for the next // transactions that might come from the same EOA. // [X] is equal to the configured `TxBatchInterval` duration. - lastActivityTime, found := t.eoaActivity.Get(from) - if !found { // Case 1. EOA activity not found: err = t.submitSingleTransaction(ctx, hexEncodedTx) - } else if time.Since(lastActivityTime) > t.config.TxBatchInterval { - // Case 2. EOA activity found AND it was more than [X] seconds ago: - err = t.submitSingleTransaction(ctx, hexEncodedTx) + } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { + // If the EOA has pooled transactions, which are not yet processed, + // due to congestion or anything, make sure to include the current + // tx on that batch. + t.txMux.Lock() + hasBatch := len(t.pooledTxs[from]) > 0 + if hasBatch { + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} + t.pooledTxs[from] = append(t.pooledTxs[from], userTx) + } + t.txMux.Unlock() + + // If it wasn't batched, submit individually + if !hasBatch { + // Case 2. EOA activity found AND it was more than [X] seconds ago: + err = t.submitSingleTransaction(ctx, hexEncodedTx) + } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: - userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()} - // Prevent submission of duplicate transactions, based on their tx hash - if slices.Contains(t.pooledTxs[from], userTx) { - return errs.ErrDuplicateTransaction - } + t.txMux.Lock() + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) + t.txMux.Unlock() } - t.eoaActivity.Add(from, time.Now()) + if err != nil { + t.logger.Error().Err(err).Msgf( + "failed to submit single Flow transaction for EOA: %s", + from.Hex(), + ) + return err + } + + t.updateEOAActivityMetadata(from, nonce) - return err + return nil } func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { @@ -188,10 +221,14 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { ) if err != nil { t.logger.Error().Err(err).Msgf( - "failed to submit Flow transaction from BatchTxPool for EOA: %s", + "failed to submit batch Flow transaction for EOA: %s", address.Hex(), ) - continue + // In case of any error, add the transactions back to the pool, + // as a retry mechanism. + t.txMux.Lock() + t.pooledTxs[address] = append(t.pooledTxs[address], pooledTxs...) + t.txMux.Unlock() } } } @@ -235,6 +272,9 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress( } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record all transactions as dropped. + t.collector.TransactionsDropped(len(hexEncodedTxs)) return err } @@ -266,8 +306,32 @@ func (t *BatchTxPool) submitSingleTransaction( } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) return err } return nil } + +func (t *BatchTxPool) updateEOAActivityMetadata( + from gethCommon.Address, + nonce uint64, +) { + t.txMux.Lock() + defer t.txMux.Unlock() + + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity, _ := t.eoaActivityCache.Get(from) + eoaActivity.lastSubmission = time.Now() + eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) + // To avoid the slice of nonces from growing indefinitely, + // keep only the last `maxTrackedTxNoncesPerEOA` nonces. + if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { + firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA + eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] + } + + t.eoaActivityCache.Add(from, eoaActivity) +} diff --git a/services/requester/requester.go b/services/requester/requester.go index 55c3440c..3006fbbe 100644 --- a/services/requester/requester.go +++ b/services/requester/requester.go @@ -238,7 +238,7 @@ func (e *EVM) SendRawTransaction(ctx context.Context, data []byte) (common.Hash, } e.logger.Info(). - Str("evm-id", tx.Hash().Hex()). + Str("evm_tx", tx.Hash().Hex()). Str("to", to). Str("from", from.Hex()). Str("value", tx.Value().String()). diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index e8de3e89..01120e37 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -31,7 +31,6 @@ type SingleTxPool struct { pool *sync.Map txPublisher *models.Publisher[*gethTypes.Transaction] config config.Config - mux sync.Mutex keystore *keystore.KeyStore collector metrics.Collector // referenceBlockHeader is stored atomically to avoid races @@ -40,7 +39,7 @@ type SingleTxPool struct { // todo add methods to inspect transaction pool state } -var _ TxPool = &SingleTxPool{} +var _ TxPool = (*SingleTxPool)(nil) func NewSingleTxPool( ctx context.Context, @@ -96,6 +95,11 @@ func (t *SingleTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event + from, err := models.DeriveTxSender(tx) + if err != nil { + return err + } + txData, err := tx.MarshalBinary() if err != nil { return err @@ -121,10 +125,21 @@ func (t *SingleTxPool) Add( // If there was any error during the transaction build // process, we record it as a dropped transaction. t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to build Flow transaction for EOA: %s", + from.Hex(), + ) return err } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to submit Flow transaction for EOA: %s", + from.Hex(), + ) return err } @@ -150,8 +165,8 @@ func (t *SingleTxPool) Add( } t.logger.Error().Err(res.Error). - Str("flow-id", flowTx.ID().String()). - Str("evm-id", tx.Hash().Hex()). + Str("flow_tx", flowTx.ID().String()). + Str("evm_tx", tx.Hash().Hex()). Msg("flow transaction error") // hide specific cause since it's an implementation issue @@ -188,7 +203,11 @@ func (t *SingleTxPool) buildTransaction( } } - accKey, err := t.fetchSigningAccountKey() + // getting an account key from the `KeyStore` for signing transactions, + // *does not* need to be lock-protected, as the keys are read from a + // channel. No two go-routines will end up getting the same account + // key at the same time. + accKey, err := t.keystore.Take() if err != nil { return nil, err } @@ -211,16 +230,6 @@ func (t *SingleTxPool) buildTransaction( return flowTx, nil } -func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) { - // getting an account key from the `KeyStore` for signing transactions, - // should be lock-protected, so that we don't sign any two Flow - // transactions with the same account key - t.mux.Lock() - defer t.mux.Unlock() - - return t.keystore.Take() -} - func (t *SingleTxPool) getReferenceBlock() *flow.BlockHeader { if v := t.referenceBlockHeader.Load(); v != nil { return v.(*flow.BlockHeader) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index fd47c310..1db7e5c3 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -514,7 +514,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { ) } -func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { +func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) { _, cfg, stop := setupGatewayNode(t) defer stop() @@ -525,45 +525,30 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey) require.NoError(t, err) - testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194") - nonce := uint64(0) - hashes := make([]common.Hash, 0) - - signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) - - txHash, err := rpcTester.sendRawTx(signed) - require.NoError(t, err) - hashes = append(hashes, txHash) + testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") + nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} - // Increment nonce for the duplicate test transactions that follow - nonce += 1 - dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) + hashes := []common.Hash{} + // transfer some funds to the test address + transferAmount := int64(1_000_000_000) + for _, nonce := range nonces { + signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) + require.NoError(t, err) - // Submit 5 identical transactions to test duplicate detection: - // the first should succeed, the rest should be rejected as duplicates. - for i := range 5 { - if i == 0 { - txHash, err := rpcTester.sendRawTx(dupSigned) - require.NoError(t, err) - hashes = append(hashes, txHash) - } else { - _, err := rpcTester.sendRawTx(dupSigned) - require.Error(t, err) - require.ErrorContains(t, err, "invalid: transaction already in pool") - } + txHash, err := rpcTester.sendRawTx(signed) + require.NoError(t, err) + hashes = append(hashes, txHash) } + expectedBalance := big.NewInt(6 * transferAmount) + assert.Eventually(t, func() bool { - for _, h := range hashes { - rcp, err := rpcTester.getReceipt(h.String()) - if err != nil || rcp == nil || rcp.Status != 1 { - return false - } + balance, err := rpcTester.getBalance(testAddr) + if err != nil { + return false } - return true + return balance.Cmp(expectedBalance) == 0 }, time.Second*15, time.Second*1, "all transactions were not executed") } @@ -604,9 +589,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { EnforceGasPrice: true, LogLevel: zerolog.DebugLevel, LogWriter: testLogWriter(), - TxStateValidation: config.TxSealValidation, + TxStateValidation: config.LocalIndexValidation, TxBatchMode: true, - TxBatchInterval: time.Second * 2, + TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet } bootstrapDone := make(chan struct{}) @@ -617,6 +602,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { require.NoError(t, err) }() + // Allow the Gateway to catch up on indexing + time.Sleep(time.Second * 2) + <-bootstrapDone return emu, cfg, func() {