Skip to content

Commit 62deb8b

Browse files
committed
Reject transactions that have already been submitted to the tx pool
1 parent bbeb971 commit 62deb8b

File tree

3 files changed

+105
-64
lines changed

3 files changed

+105
-64
lines changed

models/errors/errors.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ var (
2828

2929
// Transaction errors
3030

31-
ErrFailedTransaction = errors.New("failed transaction")
32-
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
33-
ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool")
31+
ErrFailedTransaction = errors.New("failed transaction")
32+
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
3433

3534
// Storage errors
3635

services/requester/batch_tx_pool.go

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package requester
33
import (
44
"context"
55
"encoding/hex"
6+
"fmt"
67
"slices"
78
"sort"
89
"sync"
@@ -22,35 +23,43 @@ import (
2223
"github.com/onflow/flow-evm-gateway/services/requester/keystore"
2324
)
2425

25-
const eoaActivityCacheSize = 10_000
26+
const (
27+
eoaActivityCacheSize = 10_000
28+
eoaActivityCacheTTL = time.Second * 10
29+
maxTrackedTxHashesPerEOA = 15
30+
)
2631

2732
type pooledEvmTx struct {
2833
txPayload cadence.String
29-
txHash gethCommon.Hash
3034
nonce uint64
3135
}
3236

33-
// BatchTxPool is a `TxPool` implementation that collects and groups
34-
// transactions based on their EOA signer, and submits them for execution
35-
// using a batch.
37+
type eoaActivityMetadata struct {
38+
lastSubmission time.Time
39+
txHashes []gethCommon.Hash
40+
}
41+
42+
// BatchTxPool is a `TxPool` implementation that groups incoming transactions
43+
// based on their EOA signer, and submits them for execution using a batch.
3644
//
3745
// The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the
3846
// `EVM.run` used in `SingleTxPool`.
3947
//
4048
// The main advantage of this implementation over the `SingleTxPool`, is the
41-
// guarantee that transactions originated from the same EOA address, which
42-
// arrive in a short time interval (about the same as Flow's block production rate),
43-
// will be executed in the same order their arrived.
44-
// This helps to reduce the nonce mismatch errors which mainly occur from the
45-
// re-ordering of Cadence transactions that happens from Collection nodes.
49+
// guarantee that transactions originating from the same EOA address, which
50+
// arrive in a short time interval (configurable by the node operator),
51+
// will be executed in the same order they arrived.
52+
// This helps to reduce the execution errors which may occur from the
53+
// re-ordering of Cadence transactions that happens on Collection nodes.
4654
type BatchTxPool struct {
4755
*SingleTxPool
48-
pooledTxs map[gethCommon.Address][]pooledEvmTx
49-
txMux sync.Mutex
50-
eoaActivity *expirable.LRU[gethCommon.Address, time.Time]
56+
57+
pooledTxs map[gethCommon.Address][]pooledEvmTx
58+
txMux sync.Mutex
59+
eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata]
5160
}
5261

53-
var _ TxPool = &BatchTxPool{}
62+
var _ TxPool = (*BatchTxPool)(nil)
5463

5564
func NewBatchTxPool(
5665
ctx context.Context,
@@ -77,16 +86,16 @@ func NewBatchTxPool(
7786
return nil, err
7887
}
7988

80-
eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time](
89+
eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata](
8190
eoaActivityCacheSize,
8291
nil,
83-
config.EOAActivityCacheTTL,
92+
eoaActivityCacheTTL,
8493
)
8594
batchPool := &BatchTxPool{
86-
SingleTxPool: singleTxPool,
87-
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
88-
txMux: sync.Mutex{},
89-
eoaActivity: eoaActivity,
95+
SingleTxPool: singleTxPool,
96+
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
97+
txMux: sync.Mutex{},
98+
eoaActivityCache: eoaActivityCache,
9099
}
91100

92101
go batchPool.processPooledTransactions(ctx)
@@ -123,6 +132,21 @@ func (t *BatchTxPool) Add(
123132
return err
124133
}
125134

135+
eoaActivity, found := t.eoaActivityCache.Get(from)
136+
txHash := tx.Hash()
137+
138+
// Reject transactions that have already been submitted,
139+
// as they are *likely* to fail. Two transactions with
140+
// identical hashes, are expected to have the exact same
141+
// payload.
142+
if found && slices.Contains(eoaActivity.txHashes, txHash) {
143+
return fmt.Errorf(
144+
"%w: a tx with hash %s has already been submitted",
145+
errs.ErrInvalid,
146+
txHash,
147+
)
148+
}
149+
126150
// Scenarios
127151
// 1. EOA activity not found:
128152
// => We send the transaction individually, without adding it
@@ -140,27 +164,42 @@ func (t *BatchTxPool) Add(
140164
// For all 3 cases, we record the activity time for the next
141165
// transactions that might come from the same EOA.
142166
// [X] is equal to the configured `TxBatchInterval` duration.
143-
lastActivityTime, found := t.eoaActivity.Get(from)
144-
145167
if !found {
146168
// Case 1. EOA activity not found:
147169
err = t.submitSingleTransaction(ctx, hexEncodedTx)
148-
} else if time.Since(lastActivityTime) > t.config.TxBatchInterval {
149-
// Case 2. EOA activity found AND it was more than [X] seconds ago:
150-
err = t.submitSingleTransaction(ctx, hexEncodedTx)
170+
} else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval {
171+
if len(t.pooledTxs[from]) > 0 {
172+
// If the EOA has pooled transactions, which are not yet processed,
173+
// due to congestion or anything, make sure to include the current
174+
// tx on that batch.
175+
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()}
176+
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
177+
} else {
178+
// Case 2. EOA activity found AND it was more than [X] seconds ago:
179+
err = t.submitSingleTransaction(ctx, hexEncodedTx)
180+
}
151181
} else {
152182
// Case 3. EOA activity found AND it was less than [X] seconds ago:
153-
userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()}
154-
// Prevent submission of duplicate transactions, based on their tx hash
155-
if slices.Contains(t.pooledTxs[from], userTx) {
156-
return errs.ErrDuplicateTransaction
157-
}
183+
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()}
158184
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
159185
}
160186

161-
t.eoaActivity.Add(from, time.Now())
187+
if err != nil {
188+
return err
189+
}
190+
191+
// Update metadata for the last EOA activity only on successful add/submit.
192+
eoaActivity.lastSubmission = time.Now()
193+
eoaActivity.txHashes = append(eoaActivity.txHashes, txHash)
194+
// To avoid the slice of hashes from growing indefinitely,
195+
// maintain only a handful of the last tx hashes.
196+
if len(eoaActivity.txHashes) > maxTrackedTxHashesPerEOA {
197+
eoaActivity.txHashes = eoaActivity.txHashes[1:]
198+
}
199+
200+
t.eoaActivityCache.Add(from, eoaActivity)
162201

163-
return err
202+
return nil
164203
}
165204

166205
func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {

tests/tx_batching_test.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) {
332332
// activity of the EOA was X seconds ago, where:
333333
// X = `cfg.TxBatchInterval`.
334334
// For the E2E tests the `cfg.TxBatchInterval` is equal
335-
// to 2 seconds.
335+
// to 2.5 seconds.
336336
for i := range uint64(2) {
337337
signed, _, err := evmSign(
338338
big.NewInt(500_000),
@@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
446446
// activity of the EOA was X seconds ago, where:
447447
// X = `cfg.TxBatchInterval`.
448448
// For the E2E tests the `cfg.TxBatchInterval` is equal
449-
// to 2 seconds.
449+
// to 2.5 seconds.
450450
for i := range uint64(2) {
451451
signed, _, err := evmSign(
452452
big.NewInt(500_000),
@@ -514,7 +514,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
514514
)
515515
}
516516

517-
func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
517+
func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) {
518518
_, cfg, stop := setupGatewayNode(t)
519519
defer stop()
520520

@@ -525,36 +525,36 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
525525
eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey)
526526
require.NoError(t, err)
527527

528-
testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194")
529-
nonce := uint64(0)
530-
hashes := make([]common.Hash, 0)
531-
532-
signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil)
533-
require.NoError(t, err)
534-
535-
txHash, err := rpcTester.sendRawTx(signed)
536-
require.NoError(t, err)
537-
hashes = append(hashes, txHash)
528+
testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8")
529+
nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5}
538530

539-
// Increment nonce for the duplicate test transactions that follow
540-
nonce += 1
541-
dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil)
542-
require.NoError(t, err)
531+
var errors []error
532+
hashes := []common.Hash{}
533+
// transfer some funds to the test address
534+
for _, nonce := range nonces {
535+
signed, _, err := evmSign(big.NewInt(1_000_000_000), 23_500, eoaKey, nonce, &testAddr, nil)
536+
require.NoError(t, err)
543537

544-
// Submit 5 identical transactions to test duplicate detection:
545-
// the first should succeed, the rest should be rejected as duplicates.
546-
for i := range 5 {
547-
if i == 0 {
548-
txHash, err := rpcTester.sendRawTx(dupSigned)
549-
require.NoError(t, err)
550-
hashes = append(hashes, txHash)
538+
txHash, err := rpcTester.sendRawTx(signed)
539+
if err != nil {
540+
errors = append(errors, err)
551541
} else {
552-
_, err := rpcTester.sendRawTx(dupSigned)
553-
require.Error(t, err)
554-
require.ErrorContains(t, err, "invalid: transaction already in pool")
542+
hashes = append(hashes, txHash)
555543
}
556544
}
557545

546+
require.Len(t, errors, 2)
547+
assert.ErrorContains(
548+
t,
549+
errors[0],
550+
"a tx with hash 0x2bdf4aa4c3e273a624dddfdbde6614786b6a5329e246c531d3e0e9f92e79e04d has already been submitted",
551+
)
552+
assert.ErrorContains(
553+
t,
554+
errors[1],
555+
"a tx with hash 0xb72e1f83861a63b5ad4b927295af07fa9546b01aac5dfce046a5fb20f9be9f2f has already been submitted",
556+
)
557+
558558
assert.Eventually(t, func() bool {
559559
for _, h := range hashes {
560560
rcp, err := rpcTester.getReceipt(h.String())
@@ -604,9 +604,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
604604
EnforceGasPrice: true,
605605
LogLevel: zerolog.DebugLevel,
606606
LogWriter: testLogWriter(),
607-
TxStateValidation: config.TxSealValidation,
607+
TxStateValidation: config.LocalIndexValidation,
608608
TxBatchMode: true,
609-
TxBatchInterval: time.Second * 2,
609+
TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet
610610
}
611611

612612
bootstrapDone := make(chan struct{})
@@ -617,6 +617,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
617617
require.NoError(t, err)
618618
}()
619619

620+
// Allow the Gateway to catch up on indexing
621+
time.Sleep(time.Second * 2)
622+
620623
<-bootstrapDone
621624

622625
return emu, cfg, func() {

0 commit comments

Comments
 (0)