Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,8 @@ func (b *Bootstrap) Run(
// mark ready
ready()

go b.trackOperatorBalance(ctx)

return nil
}

Expand All @@ -707,6 +709,26 @@ func (b *Bootstrap) Stop() {
b.StopDB()
}

func (b *Bootstrap) trackOperatorBalance(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
accBalance, err := b.client.GetAccountBalanceAtLatestBlock(ctx, b.config.COAAddress)
if err != nil {
b.logger.Warn().Err(err).Msg(
"failed to collect operator's balance metric",
)
}
b.collector.OperatorBalance(accBalance)
}
Comment on lines +721 to +728
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid zeroing operator_balance on fetch errors.

When GetAccountBalanceAtLatestBlock fails, accBalance remains the zero value (0), yet we still push it into the metric. That will falsely drop the operator_balance gauge to 0 and trigger alarms even though the real balance is unchanged. Bail out on the error instead.

-			accBalance, err := b.client.GetAccountBalanceAtLatestBlock(ctx, b.config.COAAddress)
-			if err != nil {
-				b.logger.Warn().Err(err).Msg(
-					"failed to collect operator's balance metric",
-				)
-			}
-			b.collector.OperatorBalance(accBalance)
+			accBalance, err := b.client.GetAccountBalanceAtLatestBlock(ctx, b.config.COAAddress)
+			if err != nil {
+				b.logger.Warn().Err(err).Msg(
+					"failed to collect operator's balance metric",
+				)
+				continue
+			}
+			b.collector.OperatorBalance(accBalance)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
accBalance, err := b.client.GetAccountBalanceAtLatestBlock(ctx, b.config.COAAddress)
if err != nil {
b.logger.Warn().Err(err).Msg(
"failed to collect operator's balance metric",
)
}
b.collector.OperatorBalance(accBalance)
}
accBalance, err := b.client.GetAccountBalanceAtLatestBlock(ctx, b.config.COAAddress)
if err != nil {
b.logger.Warn().Err(err).Msg(
"failed to collect operator's balance metric",
)
continue
}
b.collector.OperatorBalance(accBalance)
🤖 Prompt for AI Agents
In bootstrap/bootstrap.go around lines 721 to 728, the code logs an error from
GetAccountBalanceAtLatestBlock but still calls collector.OperatorBalance with
the zero-valued accBalance; change the flow so that when err != nil you log the
error and immediately skip updating the metric (e.g., continue/return from this
iteration) so you do not push a zero balance; only call
collector.OperatorBalance(accBalance) when err == nil.

}
}

// Run will run complete bootstrap of the EVM gateway with all the engines.
// Run is a blocking call, but it does signal readiness of the service
// through a channel provided as an argument.
Expand Down
7 changes: 3 additions & 4 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"time"

"github.com/onflow/flow-go-sdk"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -113,7 +112,7 @@ type Collector interface {
EVMTransactionIndexed(count int)
EVMAccountInteraction(address string)
MeasureRequestDuration(start time.Time, method string)
OperatorBalance(account *flow.Account)
OperatorBalance(balance uint64)
AvailableSigningKeys(count int)
GasEstimationIterations(count int)
BlockIngestionTime(blockCreation time.Time)
Expand Down Expand Up @@ -208,8 +207,8 @@ func (c *DefaultCollector) EVMAccountInteraction(address string) {
c.evmAccountCallCounters.With(prometheus.Labels{"address": address}).Inc()
}

func (c *DefaultCollector) OperatorBalance(account *flow.Account) {
c.operatorBalance.Set(float64(account.Balance))
func (c *DefaultCollector) OperatorBalance(balance uint64) {
c.operatorBalance.Set(float64(balance))
}

func (c *DefaultCollector) MeasureRequestDuration(start time.Time, method string) {
Expand Down
4 changes: 1 addition & 3 deletions metrics/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package metrics

import (
"time"

"github.com/onflow/flow-go-sdk"
)

type nopCollector struct{}
Expand All @@ -19,7 +17,7 @@ func (c *nopCollector) EVMHeightIndexed(uint64) {}
func (c *nopCollector) EVMTransactionIndexed(int) {}
func (c *nopCollector) EVMAccountInteraction(string) {}
func (c *nopCollector) MeasureRequestDuration(time.Time, string) {}
func (c *nopCollector) OperatorBalance(*flow.Account) {}
func (c *nopCollector) OperatorBalance(balance uint64) {}
func (c *nopCollector) AvailableSigningKeys(count int) {}
func (c *nopCollector) GasEstimationIterations(count int) {}
func (c *nopCollector) BlockIngestionTime(blockCreation time.Time) {}
Expand Down
12 changes: 5 additions & 7 deletions services/requester/batch_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
latestBlock, account, err := t.fetchFlowLatestBlockAndCOA(ctx)
latestBlock, err := t.client.GetLatestBlock(ctx, true)
if err != nil {
t.logger.Error().Err(err).Msg(
"failed to get COA / latest Flow block on batch tx submission",
"failed to get latest Flow block on batch tx submission",
)
continue
}
Expand All @@ -181,7 +181,6 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
err := t.batchSubmitTransactionsForSameAddress(
ctx,
latestBlock,
account,
pooledTxs,
)
if err != nil {
Expand All @@ -199,7 +198,6 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
func (t *BatchTxPool) batchSubmitTransactionsForSameAddress(
ctx context.Context,
latestBlock *flow.Block,
account *flow.Account,
pooledTxs []pooledEvmTx,
) error {
// Sort the transactions based on their nonce, to make sure
Expand All @@ -220,8 +218,8 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress(

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
latestBlock,
account,
script,
cadence.NewArray(hexEncodedTxs),
coinbaseAddress,
Expand All @@ -244,7 +242,7 @@ func (t *BatchTxPool) submitSingleTransaction(
ctx context.Context,
hexEncodedTx cadence.String,
) error {
latestBlock, account, err := t.fetchFlowLatestBlockAndCOA(ctx)
latestBlock, err := t.client.GetLatestBlock(ctx, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could throttle the calls to GetLatestBlock, given the latest block only changes about once per second for mainnet, and we are only using the block ID as reference block ID, it's ok to have a little bit delay. This could further reduce the load to AN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, there should be a component that is responsible for querying (or being subscribed to) the last block, which we then use here.

I think that might be best to put in a separate PR though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already opened a separate PR for that: #896

if err != nil {
return err
}
Expand All @@ -256,8 +254,8 @@ func (t *BatchTxPool) submitSingleTransaction(

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
latestBlock,
account,
script,
cadence.NewArray([]cadence.Value{hexEncodedTx}),
coinbaseAddress,
Expand Down
23 changes: 14 additions & 9 deletions services/requester/keystore/account_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,31 @@ func (k *AccountKey) SetLockMetadata(txID flowsdk.Identifier, referenceBlockHeig
// SetProposerPayerAndSign sets the proposer, payer, and signs the transaction with the key.
func (k *AccountKey) SetProposerPayerAndSign(
tx *flowsdk.Transaction,
account *flowsdk.Account,
address flowsdk.Address,
acckey *flowsdk.AccountKey,
) error {
if k.Address != account.Address {
if acckey == nil {
return fmt.Errorf("nil account key provided for address %s (index %d)", address, k.Index)
}

if k.Address != address {
return fmt.Errorf(
"expected address: %v, got address: %v",
"expected address: %s, got address: %s",
k.Address,
account.Address,
address,
)
}
if k.Index >= uint32(len(account.Keys)) {

if k.Index != acckey.Index {
return fmt.Errorf(
"key index: %d exceeds keys length: %d",
"expected account key with index: %d, got key with index: %d",
k.Index,
len(account.Keys),
acckey.Index,
)
}
seqNumber := account.Keys[k.Index].SequenceNumber

return tx.
SetProposalKey(k.Address, k.Index, seqNumber).
SetProposalKey(k.Address, k.Index, acckey.SequenceNumber).
SetPayer(k.Address).
SignEnvelope(k.Address, k.Index, k.Signer)
}
Expand Down
2 changes: 1 addition & 1 deletion services/requester/keystore/key_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestKeySigning(t *testing.T) {
}
tx := sdk.NewTransaction()

err = key.SetProposerPayerAndSign(tx, account)
err = key.SetProposerPayerAndSign(tx, address, accountKey)
require.NoError(t, err)

assert.Equal(t, account.Address, tx.ProposalKey.Address)
Expand Down
8 changes: 4 additions & 4 deletions services/requester/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewEVM(

if !config.IndexOnly {
address := config.COAAddress
acc, err := client.GetAccount(context.Background(), address)
accBalance, err := client.GetAccountBalanceAtLatestBlock(context.Background(), address)
if err != nil {
return nil, fmt.Errorf(
"could not fetch the configured COA account: %s make sure it exists: %w",
Expand All @@ -128,13 +128,13 @@ func NewEVM(
)
}
// initialize the operator balance metric since it is only updated when sending a tx
collector.OperatorBalance(acc)
collector.OperatorBalance(accBalance)

if acc.Balance < minFlowBalance {
if accBalance < minFlowBalance {
return nil, fmt.Errorf(
"COA account must be funded with at least %d Flow, but has balance of: %d",
minFlowBalance,
acc.Balance,
accBalance,
)
}
}
Expand Down
53 changes: 16 additions & 37 deletions services/requester/single_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
flowGo "github.com/onflow/flow-go/model/flow"
"github.com/rs/zerolog"
"github.com/sethvargo/go-retry"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
Expand Down Expand Up @@ -93,15 +92,15 @@ func (t *SingleTxPool) Add(
return err
}

latestBlock, account, err := t.fetchFlowLatestBlockAndCOA(ctx)
latestBlock, err := t.client.GetLatestBlock(ctx, true)
if err != nil {
return err
}

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
latestBlock,
account,
script,
cadence.NewArray([]cadence.Value{hexEncodedTx}),
coinbaseAddress,
Expand Down Expand Up @@ -157,8 +156,8 @@ func (t *SingleTxPool) Add(
// buildTransaction creates a Cadence transaction from the provided script,
// with the given arguments and signs it with the configured COA account.
func (t *SingleTxPool) buildTransaction(
ctx context.Context,
latestBlock *flow.Block,
account *flow.Account,
script []byte,
args ...cadence.Value,
) (*flow.Transaction, error) {
Expand All @@ -177,53 +176,33 @@ func (t *SingleTxPool) buildTransaction(
}
}

// building and signing transactions should be blocking,
// so we don't have keys conflict
t.mux.Lock()
defer t.mux.Unlock()
accKey, err := t.fetchSigningAccountKey()
if err != nil {
return nil, err
}

accKey, err := t.keystore.Take()
coaAddress := t.config.COAAddress
accountKey, err := t.client.GetAccountKeyAtLatestBlock(ctx, coaAddress, accKey.Index)
if err != nil {
return nil, err
}

if err := accKey.SetProposerPayerAndSign(flowTx, account); err != nil {
if err := accKey.SetProposerPayerAndSign(flowTx, coaAddress, accountKey); err != nil {
accKey.Done()
return nil, err
}

// now that the transaction is prepared, store the transaction's metadata
accKey.SetLockMetadata(flowTx.ID(), latestBlock.Height)

t.collector.OperatorBalance(account)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the OperatorBalance metric, because now we use GetAccountKeyAtLatestBlock, and we don't have access to the COA balance.
I am planning to add this back, on this PR: #896. Generally, we don't have to emit this metric on every tx submission, we can do it at a less frequent rate.


return flowTx, nil
Comment on lines +179 to 198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Resource leak when GetAccountKeyAtLatestBlock fails.

The signing key obtained from fetchSigningAccountKey() at line 179 is not released if the GetAccountKeyAtLatestBlock call at line 185 fails. This will cause the key to remain locked indefinitely.

Apply this diff to fix the resource leak:

 	accKey, err := t.fetchSigningAccountKey()
 	if err != nil {
 		return nil, err
 	}
 
 	coaAddress := t.config.COAAddress
 	accountKey, err := t.client.GetAccountKeyAtLatestBlock(ctx, coaAddress, accKey.Index)
 	if err != nil {
+		accKey.Done()
 		return nil, err
 	}
🤖 Prompt for AI Agents
In services/requester/single_tx_pool.go around lines 179 to 198, the signing key
returned by fetchSigningAccountKey() is not released if
client.GetAccountKeyAtLatestBlock fails, causing a resource leak; ensure you
call accKey.Done() before returning on that error (or use a short-lived defer
that is cancelled/cleared when the key must remain locked), so modify the error
path after GetAccountKeyAtLatestBlock to call accKey.Done() and then return nil,
err.

}

func (t *SingleTxPool) fetchFlowLatestBlockAndCOA(ctx context.Context) (
*flow.Block,
*flow.Account,
error,
) {
var (
g = errgroup.Group{}
err1, err2 error
latestBlock *flow.Block
account *flow.Account
)

// execute concurrently so we can speed up all the information we need for tx
g.Go(func() error {
latestBlock, err1 = t.client.GetLatestBlock(ctx, true)
return err1
})
g.Go(func() error {
account, err2 = t.client.GetAccount(ctx, t.config.COAAddress)
return err2
})
if err := g.Wait(); err != nil {
return nil, nil, err
}
func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) {
// building and signing transactions should be
// blocking, so we don't have conflicts with keys.
Comment on lines +202 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem accurate anymore since the lock only protects getting the key. should the building/signing also be atomic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @peterargue , good flag 👍 . I'll try to reason through this change, as I don't really remember why this comment was there in the first place 😅

The code previously looked like this:

// building and signing transactions should be blocking,
// so we don't have keys conflict
t.mux.Lock()
defer t.mux.Unlock()

accKey, err := t.keystore.Take()
if err != nil {
	return nil, err
}

if err := accKey.SetProposerPayerAndSign(flowTx, account); err != nil {
	accKey.Done()
	return nil, err
}

// now that the transaction is prepared, store the transaction's metadata
accKey.SetLockMetadata(flowTx.ID(), latestBlock.Height)

t.collector.OperatorBalance(account)

return flowTx, nil

The building the transaction part is quite straightforward, and it doesn't need to be protected by the lock.

flowTx := flow.NewTransaction().
	SetScript(script).
	SetReferenceBlockID(latestBlock.ID).
	SetComputeLimit(flowGo.DefaultMaxTransactionGasLimit)

for _, arg := range args {
	if err := flowTx.AddArgument(arg); err != nil {
		return nil, fmt.Errorf("failed to add argument: %s, with %w", arg, err)
	}
}

The signing the transaction part required taking an account key from the keystore and then signing the transaction with it:

t.mux.Lock()
defer t.mux.Unlock()

accKey, err := t.keystore.Take()
if err != nil {
	return nil, err
}

if err := accKey.SetProposerPayerAndSign(flowTx, account); err != nil {
	accKey.Done()
	return nil, err
}

So the lock acquisition was there to only prevent that no two go-routines would end up taking the same key, and operating on it. As soon as an account key is taken, the signing is done with accKey.SetProposerPayerAndSign(...), which doesn't seem to be needing any lock protection.

What's more, I am not even sure if taking an account key from the keystore does in fact need any lock protection.

select {
case key := <-k.availableKeys:
	if !key.lock() {
		// this should never happen and means there's a bug
		panic(fmt.Sprintf("key %d available, but locked", key.Index))
	}
	return key, nil
default:
	return nil, ErrNoKeysAvailable
}

Given that the keystore keeps the available account keys in a channel, is it even possible for two go-routines to end up taking the same key? 🤔

To conclude, I created the fetchSigningAccountKey(), because I didn't want the added AN call:

coaAddress := t.config.COAAddress
accountKey, err := t.client.GetAccountKeyAtLatestBlock(ctx, coaAddress, accKey.Index)
if err != nil {
	return nil, err
}

to be performed with the lock acquired. This would create other requests to block.

t.mux.Lock()
defer t.mux.Unlock()

return latestBlock, account, nil
return t.keystore.Take()
}