diff --git a/api/stream.go b/api/stream.go index 37807305..7213e1d8 100644 --- a/api/stream.go +++ b/api/stream.go @@ -3,7 +3,9 @@ package api import ( "context" "fmt" + "slices" + gethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" @@ -19,6 +21,9 @@ import ( "github.com/onflow/flow-evm-gateway/storage" ) +// The maximum number of transaction hash criteria allowed in a single subscription +const maxTxHashes = 200 + type StreamAPI struct { logger zerolog.Logger config config.Config @@ -27,6 +32,7 @@ type StreamAPI struct { receipts storage.ReceiptIndexer blocksPublisher *models.Publisher[*models.Block] transactionsPublisher *models.Publisher[*gethTypes.Transaction] + receiptsPublisher *models.Publisher[[]*models.Receipt] logsPublisher *models.Publisher[[]*gethTypes.Log] } @@ -38,6 +44,7 @@ func NewStreamAPI( receipts storage.ReceiptIndexer, blocksPublisher *models.Publisher[*models.Block], transactionsPublisher *models.Publisher[*gethTypes.Transaction], + receiptsPublisher *models.Publisher[[]*models.Receipt], logsPublisher *models.Publisher[[]*gethTypes.Log], ) *StreamAPI { return &StreamAPI{ @@ -48,6 +55,7 @@ func NewStreamAPI( receipts: receipts, blocksPublisher: blocksPublisher, transactionsPublisher: transactionsPublisher, + receiptsPublisher: receiptsPublisher, logsPublisher: logsPublisher, } } @@ -121,6 +129,59 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( ) } +// TransactionReceipts creates a subscription that fires transaction +// receipts when transactions are included in blocks. +func (s *StreamAPI) TransactionReceipts( + ctx context.Context, + filter *filters.TransactionReceiptsQuery, +) (*rpc.Subscription, error) { + // Validate transaction hashes limit + if filter != nil && len(filter.TransactionHashes) > maxTxHashes { + return nil, errs.ErrExceedMaxTxHashes + } + + var txHashes []gethCommon.Hash + + if filter != nil { + txHashes = filter.TransactionHashes + } + + return newSubscription( + ctx, + s.logger, + s.receiptsPublisher, + func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*models.Receipt) error { + return func(receipts []*models.Receipt) error { + // Convert to the same format as `eth_getTransactionReceipt` + marshaledReceipts := make([]map[string]any, 0) + + for _, receipt := range receipts { + // Check if the subscription is only interested for a given + // set of tx receipts. + if len(txHashes) > 0 && !slices.Contains(txHashes, receipt.TxHash) { + continue + } + + tx, err := s.transactions.Get(receipt.TxHash) + if err != nil { + return err + } + + txReceipt, err := ethTypes.MarshalReceipt(receipt, tx) + if err != nil { + return err + } + + marshaledReceipts = append(marshaledReceipts, txReceipt) + } + + // Send a batch of tx receipts in one notification + return notifier.Notify(sub.ID, marshaledReceipts) + } + }, + ) +} + func (s *StreamAPI) prepareBlockHeader( block *models.Block, ) (*ethTypes.BlockHeader, error) { diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index f5748c2e..00cca7a5 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -62,6 +62,7 @@ type Storages struct { type Publishers struct { Block *models.Publisher[*models.Block] Transaction *models.Publisher[*gethTypes.Transaction] + Receipts *models.Publisher[[]*models.Receipt] Logs *models.Publisher[[]*gethTypes.Log] } @@ -105,6 +106,7 @@ func New(config config.Config) (*Bootstrap, error) { publishers: &Publishers{ Block: models.NewPublisher[*models.Block](), Transaction: models.NewPublisher[*gethTypes.Transaction](), + Receipts: models.NewPublisher[[]*models.Receipt](), Logs: models.NewPublisher[[]*gethTypes.Log](), }, db: db, @@ -199,6 +201,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { b.storages.Traces, b.publishers.Block, b.publishers.Logs, + b.publishers.Receipts, b.logger, b.collector, replayerConfig, @@ -338,6 +341,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { b.storages.Receipts, b.publishers.Block, b.publishers.Transaction, + b.publishers.Receipts, b.publishers.Logs, ) diff --git a/models/errors/errors.go b/models/errors/errors.go index 614c7403..f28e8c8d 100644 --- a/models/errors/errors.go +++ b/models/errors/errors.go @@ -17,6 +17,7 @@ var ( ErrRateLimit = errors.New("limit of requests per second reached") ErrIndexOnlyMode = errors.New("transaction submission not allowed in index-only mode") ErrExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position") + ErrExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription") // General errors diff --git a/models/events.go b/models/events.go index ad6cdb4f..f9ca74d5 100644 --- a/models/events.go +++ b/models/events.go @@ -79,6 +79,7 @@ func NewCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { for _, l := range rcp.Logs { l.BlockNumber = rcp.BlockNumber.Uint64() l.BlockHash = rcp.BlockHash + l.BlockTimestamp = e.block.Timestamp l.TxHash = rcp.TxHash l.TxIndex = rcp.TransactionIndex l.Index = logIndex diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 04d012a0..c292065c 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -40,20 +40,21 @@ var _ models.Engine = &Engine{} type Engine struct { *models.EngineStatus - subscriber EventSubscriber - blocksProvider *replayer.BlocksProvider - store *pebble.Storage - registerStore *pebble.RegisterStorage - blocks storage.BlockIndexer - receipts storage.ReceiptIndexer - transactions storage.TransactionIndexer - traces storage.TraceIndexer - log zerolog.Logger - evmLastHeight *models.SequentialHeight - blocksPublisher *models.Publisher[*models.Block] - logsPublisher *models.Publisher[[]*gethTypes.Log] - collector metrics.Collector - replayerConfig replayer.Config + subscriber EventSubscriber + blocksProvider *replayer.BlocksProvider + store *pebble.Storage + registerStore *pebble.RegisterStorage + blocks storage.BlockIndexer + receipts storage.ReceiptIndexer + transactions storage.TransactionIndexer + traces storage.TraceIndexer + log zerolog.Logger + evmLastHeight *models.SequentialHeight + blocksPublisher *models.Publisher[*models.Block] + logsPublisher *models.Publisher[[]*gethTypes.Log] + receiptsPublisher *models.Publisher[[]*models.Receipt] + collector metrics.Collector + replayerConfig replayer.Config } func NewEventIngestionEngine( @@ -67,6 +68,7 @@ func NewEventIngestionEngine( traces storage.TraceIndexer, blocksPublisher *models.Publisher[*models.Block], logsPublisher *models.Publisher[[]*gethTypes.Log], + receiptsPublisher *models.Publisher[[]*models.Receipt], log zerolog.Logger, collector metrics.Collector, replayerConfig replayer.Config, @@ -76,19 +78,20 @@ func NewEventIngestionEngine( return &Engine{ EngineStatus: models.NewEngineStatus(), - subscriber: subscriber, - blocksProvider: blocksProvider, - store: store, - registerStore: registerStore, - blocks: blocks, - receipts: receipts, - transactions: transactions, - traces: traces, - log: log, - blocksPublisher: blocksPublisher, - logsPublisher: logsPublisher, - collector: collector, - replayerConfig: replayerConfig, + subscriber: subscriber, + blocksProvider: blocksProvider, + store: store, + registerStore: registerStore, + blocks: blocks, + receipts: receipts, + transactions: transactions, + traces: traces, + log: log, + blocksPublisher: blocksPublisher, + logsPublisher: logsPublisher, + receiptsPublisher: receiptsPublisher, + collector: collector, + replayerConfig: replayerConfig, } } @@ -204,6 +207,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { // emit block event and logs, only after we successfully commit the data e.blocksPublisher.Publish(events.Block()) + e.receiptsPublisher.Publish(events.Receipts()) for _, r := range events.Receipts() { if len(r.Logs) > 0 { e.logsPublisher.Publish(r.Logs) diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index 1e4e5692..71c14432 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -75,6 +75,7 @@ func TestSerialBlockIngestion(t *testing.T) { traces, models.NewPublisher[*models.Block](), models.NewPublisher[[]*gethTypes.Log](), + models.NewPublisher[[]*models.Receipt](), zerolog.Nop(), metrics.NopCollector, defaultReplayerConfig(), @@ -154,6 +155,7 @@ func TestSerialBlockIngestion(t *testing.T) { traces, models.NewPublisher[*models.Block](), models.NewPublisher[[]*gethTypes.Log](), + models.NewPublisher[[]*models.Receipt](), zerolog.Nop(), metrics.NopCollector, defaultReplayerConfig(), @@ -275,6 +277,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { traces, models.NewPublisher[*models.Block](), models.NewPublisher[[]*gethTypes.Log](), + models.NewPublisher[[]*models.Receipt](), zerolog.Nop(), metrics.NopCollector, defaultReplayerConfig(), @@ -383,6 +386,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { traces, models.NewPublisher[*models.Block](), models.NewPublisher[[]*gethTypes.Log](), + models.NewPublisher[[]*models.Receipt](), zerolog.Nop(), metrics.NopCollector, defaultReplayerConfig(), @@ -477,6 +481,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { traces, models.NewPublisher[*models.Block](), models.NewPublisher[[]*gethTypes.Log](), + models.NewPublisher[[]*models.Receipt](), zerolog.Nop(), metrics.NopCollector, defaultReplayerConfig(), diff --git a/tests/web3js/eth_streaming_test.js b/tests/web3js/eth_streaming_test.js index 2f82a90e..5802cbce 100644 --- a/tests/web3js/eth_streaming_test.js +++ b/tests/web3js/eth_streaming_test.js @@ -1,3 +1,4 @@ +const WebSocket = require('ws') const conf = require('./config') const helpers = require('./helpers') const { assert } = require('chai') @@ -69,6 +70,33 @@ it('streaming of blocks, transactions, logs using filters', async () => { } }) + let socket = new WebSocket('ws://127.0.0.1:8545') + // give some time for the connection to open + await new Promise((res) => setTimeout(() => res(), 1500)) + let payload = ` + { + "jsonrpc": "2.0", + "id": 2, + "method": "eth_subscribe", + "params": [ + "transactionReceipts", + { + "transactionHashes": [] + } + ] + } + ` + socket.send(payload) + + // subscribe to all new receipts being produced by transaction submissions below + let receipts = [] + socket.onmessage = (event) => { + let response = JSON.parse(event.data) + if (response.method == 'eth_subscription') { + receipts = receipts.concat(response.params.result) + } + } + let sentHashes = [] // produce events by submitting transactions for (const { A, B } of testValues) { @@ -120,4 +148,74 @@ it('streaming of blocks, transactions, logs using filters', async () => { assert.lengthOf(matchingLogs, 1) assert.deepEqual(log, matchingLogs[0]) } + + assert.equal(10, receipts.length) + for (let txHash of sentHashes) { + let txReceipt = await helpers.callRPCMethod( + 'eth_getTransactionReceipt', + [txHash] + ) + + for (let rcp of receipts) { + if (rcp.transactionHash == txHash) { + assert.deepEqual(rcp, txReceipt.body['result']) + } + } + } + + let signedTx = await conf.eoa.signTransaction({ + from: conf.eoa.address, + to: contractAddress, + data: deployed.contract.methods.sum(7, 7).encodeABI(), + gas: 1_000_000, + gasPrice: conf.minGasPrice + }) + + receipts = [] + let subID = null + socket.onmessage = (event) => { + let response = JSON.parse(event.data) + if (response.id == 12) { + subID = response.result + } + + if (response.method == 'eth_subscription' && response.params.subscription == subID) { + receipts = receipts.concat(response.params.result) + } + } + // Check that the subscription will notify the caller only for the given + // set of tx hashes. + payload = ` + { + "jsonrpc": "2.0", + "id": 12, + "method": "eth_subscribe", + "params": [ + "transactionReceipts", + { + "transactionHashes": [ + "0x7b45084668258f29cfc525494d00ea5171766d1d43436e41cea930380d96bf67", + "0xed970aa258b677d5e772125dd4342f38e5ccf4dec685d38fc5f04f18eff1a939", + "${signedTx.transactionHash}" + ] + } + ] + } + ` + socket.send(payload) + + // send transaction and make sure interaction was success + let txReceipt = await web3.eth.sendSignedTransaction(signedTx.rawTransaction) + assert.equal(txReceipt.status, conf.successStatus) + assert.equal(txReceipt.transactionHash, signedTx.transactionHash) + + await new Promise((res, rej) => setTimeout(() => res(), 1500)) + socket.close(1000, 'finished testing') + + assert.equal(1, receipts.length) + let expectedReceipt = await helpers.callRPCMethod( + 'eth_getTransactionReceipt', + [signedTx.transactionHash] + ) + assert.deepEqual(receipts[0], expectedReceipt.body['result']) })